Projekt

Obecné

Profil

Stáhnout (7.08 KB) Statistiky
| Větev: | Revize:
1
from Utilities import FolderProcessor, ConfigureFunctions
2
from Utilities.Database import DatabaseLoader
3

    
4
import logging
5
from datetime import date
6

    
7

    
8
# Path to crawled data
9
CRAWLED_DATA_PATH = "CrawledData/"
10
# Path to processed data
11
PROCESSED_DATA_PATH = "ProcessedData/"
12
# Path to crawler logs
13
CRAWLER_LOGS_PATH = "CrawlerLogs/"
14
# Path to dataset crawler implementations
15
CRAWLER_LIB_PATH = "DatasetCrawler."
16
# Path to dataset processor implementations
17
PROCESSOR_LIB_PATH = "DatasetProcessing."
18

    
19

    
20
#logger
21
logging.basicConfig(filename=CRAWLER_LOGS_PATH + "CommonRecords/" + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
22
                    level=logging.INFO,
23
                    format='%(asctime)s %(message)s'
24
                    )
25

    
26

    
27
def check_last_update(config):
28
    """
29
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
30
    representing number of days from last update if number equals
31
    number in confing update period updates it and reset number of
32
    days to zero else increment the number
33

    
34
    Arguments:
35
        config loaded configuration file of dataset
36

    
37
    Returns:
38
       True if updating
39
       Else if incementing days from last update
40
    """
41
    dataset_name = config["dataset-name"]
42

    
43
    with open(CRAWLER_LOGS_PATH + dataset_name + "/" + "updated.txt", "r+") as file:
44
        last_update = file.read()
45
        last_update = int(last_update)
46
        file.seek(0)
47

    
48
        confing_update_period = int(config["update-period"])
49

    
50
        if config["update-period"] <= last_update:
51
            logging.info("Dataset " + dataset_name + " is being updated today")
52
            file.write("0")
53
            file.truncate()
54
            return True
55
        else:
56
            last_update_days = last_update + 1
57
            logging.info("Dataset " + dataset_name + " will be updated in " + str(confing_update_period - last_update_days) + "days")
58
            file.write(str(last_update_days))
59
            file.truncate()
60
            return False
61

    
62

    
63

    
64
def crawl_data(config):
65
    """
66
      Imports dataset crawler in DatasetCrawler/"dataset_name"Crawler.py
67
      runs crawler.
68

    
69
    Args:
70
        config: loaded configuration file of dataset
71
    """
72
    dataset_name = config["dataset-name"]
73

    
74
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "Crawler", globals(), locals(), ['crawl']).crawl
75
    crawl_func(config)
76

    
77
    dataset_name += '/'
78

    
79

    
80
def process_data(dataset_name):
81
    """
82
    Goes trough every not processed file(not contained in CrawledData/"dataset_name"/ignore.txt)
83
    Imports dataset processor in DatasetProcessing/"dataset_name"Processor.py
84
    Runs processor on every file
85
    After successful processing updates ignore.txt
86

    
87
    Args:
88
        dataset_name: name of dataset that has existing configuration file
89
    """
90
    dataset_path = dataset_name + '/'
91

    
92
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "Processor", globals(), locals(),
93
                                   ['process_file']).process_file
94

    
95
    not_processed_files = FolderProcessor.list_of_all_files(CRAWLED_DATA_PATH + dataset_path)
96
    logging.info(dataset_name + " has downloaded " + str(len(not_processed_files)) + " new files")
97

    
98
    for not_processed_file in not_processed_files:
99
        process_file_func(CRAWLED_DATA_PATH + dataset_path + not_processed_file)
100
        FolderProcessor.update_ignore_set(CRAWLED_DATA_PATH + dataset_path, not_processed_file)
101

    
102
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
103

    
104

    
105
def validate_process_data(config):
106
    """
107
    Function goes through newly processed data and checks theirs status
108

    
109
    Args:
110
        config: loaded configuration file of dataset
111

    
112
    Returns:
113
        boolean variable TRUE/FALSE.
114
        Data processed correctly - TRUE
115
        Wrong format or NEW unknown devices - FALSE
116
    """
117
    processed_devices_set = FolderProcessor.get_devices_set(PROCESSED_DATA_PATH + config["dataset-name"] + '/')
118
    unknown_devices_set = FolderProcessor.get_unknown_devices_set(config, processed_devices_set)
119
    unknown_devices_size = len(unknown_devices_set)
120

    
121
    if unknown_devices_size != 0:
122
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
123
        logging.info("Adding devices to " + config["dataset-name"] + " config file")
124
        ConfigureFunctions.update_configuration(config["dataset-name"], unknown_devices_set)
125
        return False
126

    
127
    for device in config["devices"]:
128
        device = config["devices"][device]
129
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
130
            logging.info(config["dataset-name"] + " config file contains devices with UNKOWN! values please update them!!")
131
            return False
132

    
133
    return True
134

    
135

    
136
def load_data_to_database(config):
137
    """
138
    Goes trough every not loaded file(not contained in ProcessedData/ignore.txt)
139
    loads data appends coordination from configurations
140
    and exports it into the database
141
    After successful exporting updates ignore.txt
142

    
143
    Args:
144
        config: loaded configuration file of dataset
145
    """
146
    dataset_name = config["dataset-name"]
147
    dataset_path = dataset_name + '/'
148

    
149
    # get all unprocessed files from dataset
150
    not_loaded_files = FolderProcessor.list_of_all_files(PROCESSED_DATA_PATH + dataset_path)
151

    
152
    # load every file
153
    for not_loaded_file in not_loaded_files:
154
        # load processed data
155
        processed_data = DatabaseLoader.get_data_from_file(not_loaded_file, config)
156
        # load processed data to database
157
        DatabaseLoader.load_data_to_database(dataset_name, processed_data)
158
        FolderProcessor.update_ignore_set(PROCESSED_DATA_PATH + dataset_path, not_loaded_file)
159

    
160
    logging.info(dataset_name + " has loaded to databse " + str(len(not_loaded_files)) + " newly processed files.")
161

    
162

    
163
def run_full_pipeline(dataset_name):
164
    """
165
    Loads config file and starts full pipeline
166
    -crawl data
167
    -process data
168
    -load data to database
169

    
170
    Args:
171
        dataset_name: name of dataset that has existing configuration file
172
    """
173
    logging.info("Starting pipeline for dataset " + dataset_name)
174
    
175
    config = ConfigureFunctions.load_configuration(dataset_name)
176
    crawl_data(config)
177
    process_data(config["dataset-name"])
178

    
179
    validation_test = validate_process_data(config)
180

    
181
    if validation_test:
182
        load_data_to_database(config)
183

    
184

    
185

    
186
def run_full_pipeline_crone(dataset_name):
187
    """
188
    Loads config file and starts full pipeline
189
    -crawl data
190
    -process data
191
    -load data to database
192

    
193
    Args:
194
        dataset_name: name of dataset that has existing configuration file
195
    """
196
    logging.info("Starting pipeline for dataset " + dataset_name)
197

    
198
    config = ConfigureFunctions.load_configuration(dataset_name)
199
    update_test = check_last_update(config)
200
    if update_test:
201
        crawl_data(config)
202
        process_data(config["dataset-name"])
203

    
204
        validation_test = validate_process_data(config)
205

    
206
        if validation_test:
207
            load_data_to_database(config)
208
            
(5-5/14)