Projekt

Obecné

Profil

Stáhnout (11.8 KB) Statistiky
| Větev: | Revize:
1 d6ca840d petrh
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6
7
import logging
8
from datetime import date
9
10
# Path to crawled data
11
CRAWLED_DATA_PATH = "CrawledData/"
12
# Path to processed data
13
PROCESSED_DATA_PATH = "ProcessedData/"
14
# Path to crawler logs
15
CRAWLER_LOGS_PATH = "CrawlerLogs/"
16
# Path to dataset crawler implementations
17
CRAWLER_LIB_PATH = "DatasetCrawler."
18
# Path to dataset processor implementations
19
PROCESSOR_LIB_PATH = "DatasetProcessing."
20
21
#logger
22 81980e82 ballakt
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' +
23
                               date.today().strftime("%b-%Y") + '.log',
24
                               level=logging.INFO,
25
                               format='%(asctime)s %(message)s')
26 d6ca840d petrh
27
28
def check_last_update(config):
29
    """
30
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
31
    representing number of days from last update if number equals
32
    number in confing update period updates it and reset number of
33
    days to zero else increment the number
34
35
    Arguments:
36
        config loaded configuration file of dataset
37
38
    Returns:
39
       True if updating
40
       Else if incementing days from last update
41
    """
42
    dataset_name = config["dataset-name"]
43
44
    last_update = database_record_logs.load_updated(dataset_name)
45
46
    if config["update-period"] <= last_update:
47
        logging.info("Dataset " + dataset_name + " is being updated today")
48 81980e82 ballakt
        database_record_logs.update_updated(dataset_name, 0)
49 d6ca840d petrh
        return True
50
    else:
51
        last_update_days = last_update + 1
52 81980e82 ballakt
        logging.info("Dataset " + dataset_name + " will be updated in " +
53
                     str(int(config["update-period"]) - last_update_days) +
54
                     "days")
55
        database_record_logs.update_updated(dataset_name, last_update + 1)
56 d6ca840d petrh
        return False
57
58
59
def crawl_data(config):
60
    """
61
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
62
      runs crawler.
63
64
    Args:
65
        config: loaded configuration file of dataset
66
    """
67
    dataset_name = config["dataset-name"]
68
69 81980e82 ballakt
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler",
70
                            globals(), locals(), ['crawl']).crawl
71 d6ca840d petrh
    crawl_func(config)
72
73
    dataset_name += '/'
74
75
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88
89 81980e82 ballakt
    process_file_func = __import__(
90
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
91
        ['process_file']).process_file
92 d6ca840d petrh
93
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
94 81980e82 ballakt
    not_processed_files = folder_processor.list_of_all_new_files(
95
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
96
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
97
                 " not processed files")
98 d6ca840d petrh
99
    for not_processed_file in not_processed_files:
100
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
101
        date_dic = process_file_func(path)
102
        csv_utils.export_data_to_csv(path, date_dic)
103 81980e82 ballakt
        database_record_logs.update_ignore_set_processed(
104
            dataset_name, not_processed_file)
105 d6ca840d petrh
106 81980e82 ballakt
    logging.info(dataset_name + " has processed " +
107
                 str(len(not_processed_files)) + " newly crawled files")
108 d6ca840d petrh
109
110
def process_data_crone(config):
111
    """
112
    Goes trough every not processed file(list of processed files is saved in database)
113
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
114
    Runs processor on every file
115
    After successful processing updates database list of processed files
116
117
    Lightweight version for crone and production only
118
    - excludes checks for changes of config file and coordinates and names
119
    after these changes force_update_datasets.py should be called
120
121
    Args:
122
        dataset_name: name of dataset that has existing configuration file
123
    """
124
125
    dataset_name = config["dataset-name"]
126
    dataset_path = dataset_name + '/'
127
128 81980e82 ballakt
    process_file_func = __import__(
129
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
130
        ['process_file']).process_file
131 d6ca840d petrh
132
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
133 81980e82 ballakt
    not_processed_files = folder_processor.list_of_all_new_files(
134
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
135
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
136
                 " not processed files")
137 d6ca840d petrh
138
    for not_processed_file in not_processed_files:
139
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
140
        date_dic = process_file_func(path)
141
        csv_utils.export_data_to_csv(path, date_dic)
142 81980e82 ballakt
        database_record_logs.update_ignore_set_processed(
143
            dataset_name, not_processed_file)
144 d6ca840d petrh
145 81980e82 ballakt
    logging.info(dataset_name + " has processed " +
146
                 str(len(not_processed_files)) + " newly crawled files")
147 d6ca840d petrh
148
149
def validate_process_data(config):
150
    """
151
    Function goes through newly processed data and checks theirs status
152
153
    Args:
154
        config: loaded configuration file of dataset
155
156
    Returns:
157
        boolean variable TRUE/FALSE.
158
        Data processed correctly - TRUE
159
        Wrong format or NEW unknown devices - FALSE
160
    """
161
    dataset_name = config["dataset-name"]
162
163 81980e82 ballakt
    processed_devices_set = folder_processor.get_devices_set(
164
        dataset_name, PROCESSED_DATA_PATH + dataset_name + '/')
165
    unknown_devices_set = folder_processor.get_unknown_devices_set(
166
        config, processed_devices_set)
167 d6ca840d petrh
    unknown_devices_size = len(unknown_devices_set)
168
169
    if unknown_devices_size != 0:
170 81980e82 ballakt
        logging.info("There is " + str(unknown_devices_size) +
171
                     " unknown devices")
172 d6ca840d petrh
        logging.info("Adding devices to " + dataset_name + " config file")
173 81980e82 ballakt
        configure_functions.update_configuration(dataset_name,
174
                                                 unknown_devices_set)
175 d6ca840d petrh
        return False
176
177
    for device in config["devices"]:
178
        device = config["devices"][device]
179
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
180 81980e82 ballakt
            logging.info(
181
                dataset_name +
182
                " config file contains devices with UNKOWN! values please update them!!"
183
            )
184
            #return False
185 d6ca840d petrh
186
    return True
187
188
189
def load_data_to_database(config):
190
    """
191
    Goes trough every not loaded file(list of loaded files is saved in database)
192
    loads data appends coordination from configurations
193
    and exports it into the database
194
    After successful processing updates database list of loaded files
195
196
    Args:
197
        config: loaded configuration file of dataset
198
    """
199
    dataset_name = config["dataset-name"]
200
    dataset_path = dataset_name + '/'
201
202
    database_connection = database_loader.create_database_connection()
203
204 81980e82 ballakt
    database_loader.check_or_update_datasets_collection(
205
        database_connection, config)
206 d6ca840d petrh
207 753d424e petrh
    changes_in_devices = database_loader.update_devices_collection(config)
208
209
    if changes_in_devices == True:
210 81980e82 ballakt
        logging.info(
211
            dataset_name +
212
            " contains changes in devices configuration. Deleting old data and preparing new"
213
        )
214 753d424e petrh
        database_loader.reset_dataset_database(dataset_name)
215
216 d6ca840d petrh
    # get all unprocessed files from dataset
217
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
218 81980e82 ballakt
    not_loaded_files = folder_processor.list_of_all_new_files(
219
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
220 d6ca840d petrh
221
    # load every file
222
    for not_loaded_file in not_loaded_files:
223
        # load processed data
224 81980e82 ballakt
        processed_data = database_loader.get_data_from_file(
225
            not_loaded_file, config)
226 d6ca840d petrh
        # load processed data to database
227 81980e82 ballakt
        database_loader.load_data_to_database(database_connection,
228
                                              dataset_name, processed_data,
229
                                              not_loaded_file)
230
        database_record_logs.update_ignore_set_loaded(dataset_name,
231
                                                      not_loaded_file)
232 d6ca840d petrh
233 81980e82 ballakt
    logging.info(dataset_name + " has loaded to database " +
234
                 str(len(not_loaded_files)) + " newly processed files.")
235 d6ca840d petrh
236
    client = pymongo.MongoClient()
237
    client.close()
238
239
240
def load_data_to_database_crone(config):
241
    """
242
    Goes trough every not loaded file(list of loaded files is saved in database)
243
    loads data appends coordination from configurations
244
    and exports it into the database
245
    After successful processing updates database list of loaded files
246
    
247
    Lightweight version for crone and production only
248
    - excludes checks for changes of config file and coordinates and names
249
    after these changes force_update_datasets.py should be called
250
251
    Args:
252
        config: loaded configuration file of dataset
253
    """
254
    dataset_name = config["dataset-name"]
255
    dataset_path = dataset_name + '/'
256
257
    database_connection = database_loader.create_database_connection()
258
259
    # get all unprocessed files from dataset
260
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
261 81980e82 ballakt
    not_loaded_files = folder_processor.list_of_all_new_files(
262
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
263 d6ca840d petrh
264
    # load every file
265
    for not_loaded_file in not_loaded_files:
266
        # load processed data
267 81980e82 ballakt
        processed_data = database_loader.get_data_from_file(
268
            not_loaded_file, config)
269 d6ca840d petrh
        # load processed data to database
270 81980e82 ballakt
        database_loader.load_data_to_database(database_connection,
271
                                              dataset_name, processed_data,
272
                                              not_loaded_file)
273
        database_record_logs.update_ignore_set_loaded(dataset_name,
274
                                                      not_loaded_file)
275 d6ca840d petrh
276 81980e82 ballakt
    logging.info(dataset_name + " has loaded to database " +
277
                 str(len(not_loaded_files)) + " newly processed files.")
278 d6ca840d petrh
279
    client = pymongo.MongoClient()
280
    client.close()
281
282
283
def run_full_pipeline(dataset_name):
284
    """
285
    Loads config file and starts full pipeline
286
    -crawl data
287
    -process data
288
    -load data to database
289
290
    Args:
291
        dataset_name: name of dataset that has existing configuration file
292
    """
293
    logging.info("Starting pipeline for dataset " + dataset_name)
294 81980e82 ballakt
    print("Zpracovávám dataset " + dataset_name +
295
          " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
296
297 d6ca840d petrh
    config = configure_functions.load_configuration(dataset_name)
298
    crawl_data(config)
299
    process_data(config)
300
301
    validation_test = validate_process_data(config)
302
303
    if validation_test:
304
        load_data_to_database(config)
305
306
307
def run_full_pipeline_crone(dataset_name):
308
    """
309
    Loads config file and starts full pipeline
310
    -crawl data
311
    -process data
312
    -load data to database
313
314
    Args:
315
        dataset_name: name of dataset that has existing configuration file
316
    """
317
    logging.info("Starting pipeline for dataset " + dataset_name)
318
319
    config = configure_functions.load_configuration(dataset_name)
320
    update_test = check_last_update(config)
321
    if update_test:
322
        crawl_data(config)
323
        process_data_crone(config["dataset-name"])
324
325
        validation_test = validate_process_data(config)
326
327
        if validation_test:
328
            load_data_to_database_crone(config)