Projekt

Obecné

Profil

Stáhnout (12.4 KB) Statistiky
| Větev: | Revize:
1 d6ca840d petrh
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4 af7609b5 Tomáš Ballák
from shared_types import ConfigType
5 d6ca840d petrh
import os
6
import pymongo
7
8
import logging
9
from datetime import date
10
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21
22
#logger
23 81980e82 ballakt
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' +
24 af7609b5 Tomáš Ballák
                    date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s')
27 d6ca840d petrh
28
29 af7609b5 Tomáš Ballák
def check_last_update(config: ConfigType) -> bool:
30 d6ca840d petrh
    """
31
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
32
    representing number of days from last update if number equals
33
    number in confing update period updates it and reset number of
34
    days to zero else increment the number
35
36
    Arguments:
37
        config loaded configuration file of dataset
38
39
    Returns:
40
       True if updating
41
       Else if incementing days from last update
42
    """
43
    dataset_name = config["dataset-name"]
44
45
    last_update = database_record_logs.load_updated(dataset_name)
46
47
    if config["update-period"] <= last_update:
48
        logging.info("Dataset " + dataset_name + " is being updated today")
49 81980e82 ballakt
        database_record_logs.update_updated(dataset_name, 0)
50 d6ca840d petrh
        return True
51
    else:
52
        last_update_days = last_update + 1
53 81980e82 ballakt
        logging.info("Dataset " + dataset_name + " will be updated in " +
54
                     str(int(config["update-period"]) - last_update_days) +
55
                     "days")
56
        database_record_logs.update_updated(dataset_name, last_update + 1)
57 d6ca840d petrh
        return False
58
59
60 af7609b5 Tomáš Ballák
def crawl_data(config: ConfigType) -> None:
61 d6ca840d petrh
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69
70 81980e82 ballakt
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler",
71
                            globals(), locals(), ['crawl']).crawl
72 d6ca840d petrh
    crawl_func(config)
73
74
    dataset_name += '/'
75
76
77 af7609b5 Tomáš Ballák
def process_data(config: ConfigType) -> None:
78 d6ca840d petrh
    """
79
    Goes trough every not processed file(list of processed files is saved in databse)
80
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
81
    Runs processor on every file
82
    After successful processing updates database list of processed files
83
84
    Args:
85
        dataset_name: name of dataset that has existing configuration file
86
    """
87
    dataset_name = config["dataset-name"]
88
    dataset_path = dataset_name + '/'
89
90 81980e82 ballakt
    process_file_func = __import__(
91
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
92
        ['process_file']).process_file
93 d6ca840d petrh
94
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
95 81980e82 ballakt
    not_processed_files = folder_processor.list_of_all_new_files(
96
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
97
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
98
                 " not processed files")
99 d6ca840d petrh
100
    for not_processed_file in not_processed_files:
101
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
102
        date_dic = process_file_func(path)
103
        csv_utils.export_data_to_csv(path, date_dic)
104 af7609b5 Tomáš Ballák
        print("Vytvářím: " + not_processed_file)
105 81980e82 ballakt
        database_record_logs.update_ignore_set_processed(
106
            dataset_name, not_processed_file)
107 d6ca840d petrh
108 81980e82 ballakt
    logging.info(dataset_name + " has processed " +
109
                 str(len(not_processed_files)) + " newly crawled files")
110 d6ca840d petrh
111
112 af7609b5 Tomáš Ballák
def process_data_crone(config: ConfigType) -> None:
113 d6ca840d petrh
    """
114
    Goes trough every not processed file(list of processed files is saved in database)
115
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
116
    Runs processor on every file
117
    After successful processing updates database list of processed files
118
119
    Lightweight version for crone and production only
120
    - excludes checks for changes of config file and coordinates and names
121
    after these changes force_update_datasets.py should be called
122
123
    Args:
124
        dataset_name: name of dataset that has existing configuration file
125
    """
126
127
    dataset_name = config["dataset-name"]
128
    dataset_path = dataset_name + '/'
129
130 81980e82 ballakt
    process_file_func = __import__(
131
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
132
        ['process_file']).process_file
133 d6ca840d petrh
134
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
135 81980e82 ballakt
    not_processed_files = folder_processor.list_of_all_new_files(
136
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
137
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
138
                 " not processed files")
139 d6ca840d petrh
140
    for not_processed_file in not_processed_files:
141
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
142
        date_dic = process_file_func(path)
143
        csv_utils.export_data_to_csv(path, date_dic)
144 81980e82 ballakt
        database_record_logs.update_ignore_set_processed(
145
            dataset_name, not_processed_file)
146 d6ca840d petrh
147 81980e82 ballakt
    logging.info(dataset_name + " has processed " +
148
                 str(len(not_processed_files)) + " newly crawled files")
149 d6ca840d petrh
150
151 af7609b5 Tomáš Ballák
def validate_process_data(config: ConfigType) -> bool:
152 d6ca840d petrh
    """
153
    Function goes through newly processed data and checks theirs status
154
155
    Args:
156
        config: loaded configuration file of dataset
157
158
    Returns:
159
        boolean variable TRUE/FALSE.
160
        Data processed correctly - TRUE
161
        Wrong format or NEW unknown devices - FALSE
162
    """
163
    dataset_name = config["dataset-name"]
164
165 81980e82 ballakt
    processed_devices_set = folder_processor.get_devices_set(
166
        dataset_name, PROCESSED_DATA_PATH + dataset_name + '/')
167
    unknown_devices_set = folder_processor.get_unknown_devices_set(
168
        config, processed_devices_set)
169 d6ca840d petrh
    unknown_devices_size = len(unknown_devices_set)
170
171
    if unknown_devices_size != 0:
172 81980e82 ballakt
        logging.info("There is " + str(unknown_devices_size) +
173
                     " unknown devices")
174 d6ca840d petrh
        logging.info("Adding devices to " + dataset_name + " config file")
175 81980e82 ballakt
        configure_functions.update_configuration(dataset_name,
176
                                                 unknown_devices_set)
177 d6ca840d petrh
        return False
178
179
    for device in config["devices"]:
180
        device = config["devices"][device]
181
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
182 81980e82 ballakt
            logging.info(
183
                dataset_name +
184
                " config file contains devices with UNKOWN! values please update them!!"
185
            )
186
            #return False
187 d6ca840d petrh
188
    return True
189
190
191 af7609b5 Tomáš Ballák
def load_data_to_database(config: ConfigType) -> None:
192 d6ca840d petrh
    """
193
    Goes trough every not loaded file(list of loaded files is saved in database)
194
    loads data appends coordination from configurations
195
    and exports it into the database
196
    After successful processing updates database list of loaded files
197
198
    Args:
199
        config: loaded configuration file of dataset
200
    """
201
    dataset_name = config["dataset-name"]
202
    dataset_path = dataset_name + '/'
203
204
    database_connection = database_loader.create_database_connection()
205
206 81980e82 ballakt
    database_loader.check_or_update_datasets_collection(
207
        database_connection, config)
208 d6ca840d petrh
209 753d424e petrh
    changes_in_devices = database_loader.update_devices_collection(config)
210
211
    if changes_in_devices == True:
212 af7609b5 Tomáš Ballák
        logg_string = dataset_name + " contains changes in devices configuration. Deleting old data and preparing new"
213
        logg_string_cs = dataset_name + " obsahuje změny v konfiguračním souboru. Probíha odstraňování starých dat a připravení nových."
214
        logging.info(logg_string)
215
        print(logg_string_cs)
216 753d424e petrh
        database_loader.reset_dataset_database(dataset_name)
217
218 d6ca840d petrh
    # get all unprocessed files from dataset
219
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
220 81980e82 ballakt
    not_loaded_files = folder_processor.list_of_all_new_files(
221
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
222 d6ca840d petrh
223
    # load every file
224
    for not_loaded_file in not_loaded_files:
225
        # load processed data
226 81980e82 ballakt
        processed_data = database_loader.get_data_from_file(
227
            not_loaded_file, config)
228 d6ca840d petrh
        # load processed data to database
229 81980e82 ballakt
        database_loader.load_data_to_database(database_connection,
230
                                              dataset_name, processed_data,
231
                                              not_loaded_file)
232
        database_record_logs.update_ignore_set_loaded(dataset_name,
233
                                                      not_loaded_file)
234 d6ca840d petrh
235 af7609b5 Tomáš Ballák
    logg_string = dataset_name + " has loaded to database " + str(
236
        len(not_loaded_files)) + " newly processed files."
237
    logg_string_cs = dataset_name + " načetl " + str(
238
        len(not_loaded_files)) + " nových zpracovaných souborů \n"
239
240
    logging.info(logg_string)
241
    print(logg_string_cs)
242 d6ca840d petrh
243
    client = pymongo.MongoClient()
244
    client.close()
245
246
247 af7609b5 Tomáš Ballák
def load_data_to_database_crone(config: ConfigType) -> None:
248 d6ca840d petrh
    """
249
    Goes trough every not loaded file(list of loaded files is saved in database)
250
    loads data appends coordination from configurations
251
    and exports it into the database
252
    After successful processing updates database list of loaded files
253
    
254
    Lightweight version for crone and production only
255
    - excludes checks for changes of config file and coordinates and names
256
    after these changes force_update_datasets.py should be called
257
258
    Args:
259
        config: loaded configuration file of dataset
260
    """
261
    dataset_name = config["dataset-name"]
262
    dataset_path = dataset_name + '/'
263
264
    database_connection = database_loader.create_database_connection()
265
266
    # get all unprocessed files from dataset
267
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
268 81980e82 ballakt
    not_loaded_files = folder_processor.list_of_all_new_files(
269
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
270 d6ca840d petrh
271
    # load every file
272
    for not_loaded_file in not_loaded_files:
273
        # load processed data
274 81980e82 ballakt
        processed_data = database_loader.get_data_from_file(
275
            not_loaded_file, config)
276 d6ca840d petrh
        # load processed data to database
277 81980e82 ballakt
        database_loader.load_data_to_database(database_connection,
278
                                              dataset_name, processed_data,
279
                                              not_loaded_file)
280
        database_record_logs.update_ignore_set_loaded(dataset_name,
281
                                                      not_loaded_file)
282 d6ca840d petrh
283 81980e82 ballakt
    logging.info(dataset_name + " has loaded to database " +
284
                 str(len(not_loaded_files)) + " newly processed files.")
285 d6ca840d petrh
286
    client = pymongo.MongoClient()
287
    client.close()
288
289
290 af7609b5 Tomáš Ballák
def run_full_pipeline(dataset_name: str) -> None:
291 d6ca840d petrh
    """
292
    Loads config file and starts full pipeline
293
    -crawl data
294
    -process data
295
    -load data to database
296
297
    Args:
298
        dataset_name: name of dataset that has existing configuration file
299
    """
300
    logging.info("Starting pipeline for dataset " + dataset_name)
301 81980e82 ballakt
    print("Zpracovávám dataset " + dataset_name +
302 af7609b5 Tomáš Ballák
          ", průběh lze sledovat v logu umístěném v adresáři CrawlerLogs")
303 81980e82 ballakt
304 d6ca840d petrh
    config = configure_functions.load_configuration(dataset_name)
305
    crawl_data(config)
306
    process_data(config)
307
308
    validation_test = validate_process_data(config)
309
310
    if validation_test:
311
        load_data_to_database(config)
312
313
314 af7609b5 Tomáš Ballák
def run_full_pipeline_crone(dataset_name: str) -> None:
315 d6ca840d petrh
    """
316
    Loads config file and starts full pipeline
317
    -crawl data
318
    -process data
319
    -load data to database
320
321
    Args:
322
        dataset_name: name of dataset that has existing configuration file
323
    """
324
    logging.info("Starting pipeline for dataset " + dataset_name)
325
326
    config = configure_functions.load_configuration(dataset_name)
327
    update_test = check_last_update(config)
328
    if update_test:
329
        crawl_data(config)
330
        process_data_crone(config["dataset-name"])
331
332
        validation_test = validate_process_data(config)
333
334
        if validation_test:
335
            load_data_to_database_crone(config)