Projekt

Obecné

Profil

Stáhnout (12.4 KB) Statistiky
| Větev: | Revize:
1
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
from shared_types import ConfigType
5
import os
6
import pymongo
7

    
8
import logging
9
from datetime import date
10

    
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21

    
22
#logger
23
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' +
24
                    date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s')
27

    
28

    
29
def check_last_update(config: ConfigType) -> bool:
30
    """
31
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
32
    representing number of days from last update if number equals
33
    number in confing update period updates it and reset number of
34
    days to zero else increment the number
35

    
36
    Arguments:
37
        config loaded configuration file of dataset
38

    
39
    Returns:
40
       True if updating
41
       Else if incementing days from last update
42
    """
43
    dataset_name = config["dataset-name"]
44

    
45
    last_update = database_record_logs.load_updated(dataset_name)
46

    
47
    if config["update-period"] <= last_update:
48
        logging.info("Dataset " + dataset_name + " is being updated today")
49
        database_record_logs.update_updated(dataset_name, 0)
50
        return True
51
    else:
52
        last_update_days = last_update + 1
53
        logging.info("Dataset " + dataset_name + " will be updated in " +
54
                     str(int(config["update-period"]) - last_update_days) +
55
                     "days")
56
        database_record_logs.update_updated(dataset_name, last_update + 1)
57
        return False
58

    
59

    
60
def crawl_data(config: ConfigType) -> None:
61
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64

    
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69

    
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler",
71
                            globals(), locals(), ['crawl']).crawl
72
    crawl_func(config)
73

    
74
    dataset_name += '/'
75

    
76

    
77
def process_data(config: ConfigType) -> None:
78
    """
79
    Goes trough every not processed file(list of processed files is saved in databse)
80
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
81
    Runs processor on every file
82
    After successful processing updates database list of processed files
83

    
84
    Args:
85
        dataset_name: name of dataset that has existing configuration file
86
    """
87
    dataset_name = config["dataset-name"]
88
    dataset_path = dataset_name + '/'
89

    
90
    process_file_func = __import__(
91
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
92
        ['process_file']).process_file
93

    
94
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
95
    not_processed_files = folder_processor.list_of_all_new_files(
96
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
97
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
98
                 " not processed files")
99

    
100
    for not_processed_file in not_processed_files:
101
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
102
        date_dic = process_file_func(path)
103
        csv_utils.export_data_to_csv(path, date_dic)
104
        print("Vytvářím: " + not_processed_file)
105
        database_record_logs.update_ignore_set_processed(
106
            dataset_name, not_processed_file)
107

    
108
    logging.info(dataset_name + " has processed " +
109
                 str(len(not_processed_files)) + " newly crawled files")
110

    
111

    
112
def process_data_crone(config: ConfigType) -> None:
113
    """
114
    Goes trough every not processed file(list of processed files is saved in database)
115
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
116
    Runs processor on every file
117
    After successful processing updates database list of processed files
118

    
119
    Lightweight version for crone and production only
120
    - excludes checks for changes of config file and coordinates and names
121
    after these changes force_update_datasets.py should be called
122

    
123
    Args:
124
        dataset_name: name of dataset that has existing configuration file
125
    """
126

    
127
    dataset_name = config["dataset-name"]
128
    dataset_path = dataset_name + '/'
129

    
130
    process_file_func = __import__(
131
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
132
        ['process_file']).process_file
133

    
134
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
135
    not_processed_files = folder_processor.list_of_all_new_files(
136
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
137
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
138
                 " not processed files")
139

    
140
    for not_processed_file in not_processed_files:
141
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
142
        date_dic = process_file_func(path)
143
        csv_utils.export_data_to_csv(path, date_dic)
144
        database_record_logs.update_ignore_set_processed(
145
            dataset_name, not_processed_file)
146

    
147
    logging.info(dataset_name + " has processed " +
148
                 str(len(not_processed_files)) + " newly crawled files")
149

    
150

    
151
def validate_process_data(config: ConfigType) -> bool:
152
    """
153
    Function goes through newly processed data and checks theirs status
154

    
155
    Args:
156
        config: loaded configuration file of dataset
157

    
158
    Returns:
159
        boolean variable TRUE/FALSE.
160
        Data processed correctly - TRUE
161
        Wrong format or NEW unknown devices - FALSE
162
    """
163
    dataset_name = config["dataset-name"]
164

    
165
    processed_devices_set = folder_processor.get_devices_set(
166
        dataset_name, PROCESSED_DATA_PATH + dataset_name + '/')
167
    unknown_devices_set = folder_processor.get_unknown_devices_set(
168
        config, processed_devices_set)
169
    unknown_devices_size = len(unknown_devices_set)
170

    
171
    if unknown_devices_size != 0:
172
        logging.info("There is " + str(unknown_devices_size) +
173
                     " unknown devices")
174
        logging.info("Adding devices to " + dataset_name + " config file")
175
        configure_functions.update_configuration(dataset_name,
176
                                                 unknown_devices_set)
177
        return False
178

    
179
    for device in config["devices"]:
180
        device = config["devices"][device]
181
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
182
            logging.info(
183
                dataset_name +
184
                " config file contains devices with UNKOWN! values please update them!!"
185
            )
186
            #return False
187

    
188
    return True
189

    
190

    
191
def load_data_to_database(config: ConfigType) -> None:
192
    """
193
    Goes trough every not loaded file(list of loaded files is saved in database)
194
    loads data appends coordination from configurations
195
    and exports it into the database
196
    After successful processing updates database list of loaded files
197

    
198
    Args:
199
        config: loaded configuration file of dataset
200
    """
201
    dataset_name = config["dataset-name"]
202
    dataset_path = dataset_name + '/'
203

    
204
    database_connection = database_loader.create_database_connection()
205

    
206
    database_loader.check_or_update_datasets_collection(
207
        database_connection, config)
208

    
209
    changes_in_devices = database_loader.update_devices_collection(config)
210

    
211
    if changes_in_devices == True:
212
        logg_string = dataset_name + " contains changes in devices configuration. Deleting old data and preparing new"
213
        logg_string_cs = dataset_name + " obsahuje změny v konfiguračním souboru. Probíha odstraňování starých dat a připravení nových."
214
        logging.info(logg_string)
215
        print(logg_string_cs)
216
        database_loader.reset_dataset_database(dataset_name)
217

    
218
    # get all unprocessed files from dataset
219
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
220
    not_loaded_files = folder_processor.list_of_all_new_files(
221
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
222

    
223
    # load every file
224
    for not_loaded_file in not_loaded_files:
225
        # load processed data
226
        processed_data = database_loader.get_data_from_file(
227
            not_loaded_file, config)
228
        # load processed data to database
229
        database_loader.load_data_to_database(database_connection,
230
                                              dataset_name, processed_data,
231
                                              not_loaded_file)
232
        database_record_logs.update_ignore_set_loaded(dataset_name,
233
                                                      not_loaded_file)
234

    
235
    logg_string = dataset_name + " has loaded to database " + str(
236
        len(not_loaded_files)) + " newly processed files."
237
    logg_string_cs = dataset_name + " načetl " + str(
238
        len(not_loaded_files)) + " nových zpracovaných souborů \n"
239

    
240
    logging.info(logg_string)
241
    print(logg_string_cs)
242

    
243
    client = pymongo.MongoClient()
244
    client.close()
245

    
246

    
247
def load_data_to_database_crone(config: ConfigType) -> None:
248
    """
249
    Goes trough every not loaded file(list of loaded files is saved in database)
250
    loads data appends coordination from configurations
251
    and exports it into the database
252
    After successful processing updates database list of loaded files
253
    
254
    Lightweight version for crone and production only
255
    - excludes checks for changes of config file and coordinates and names
256
    after these changes force_update_datasets.py should be called
257

    
258
    Args:
259
        config: loaded configuration file of dataset
260
    """
261
    dataset_name = config["dataset-name"]
262
    dataset_path = dataset_name + '/'
263

    
264
    database_connection = database_loader.create_database_connection()
265

    
266
    # get all unprocessed files from dataset
267
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
268
    not_loaded_files = folder_processor.list_of_all_new_files(
269
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
270

    
271
    # load every file
272
    for not_loaded_file in not_loaded_files:
273
        # load processed data
274
        processed_data = database_loader.get_data_from_file(
275
            not_loaded_file, config)
276
        # load processed data to database
277
        database_loader.load_data_to_database(database_connection,
278
                                              dataset_name, processed_data,
279
                                              not_loaded_file)
280
        database_record_logs.update_ignore_set_loaded(dataset_name,
281
                                                      not_loaded_file)
282

    
283
    logging.info(dataset_name + " has loaded to database " +
284
                 str(len(not_loaded_files)) + " newly processed files.")
285

    
286
    client = pymongo.MongoClient()
287
    client.close()
288

    
289

    
290
def run_full_pipeline(dataset_name: str) -> None:
291
    """
292
    Loads config file and starts full pipeline
293
    -crawl data
294
    -process data
295
    -load data to database
296

    
297
    Args:
298
        dataset_name: name of dataset that has existing configuration file
299
    """
300
    logging.info("Starting pipeline for dataset " + dataset_name)
301
    print("Zpracovávám dataset " + dataset_name +
302
          ", průběh lze sledovat v logu umístěném v adresáři CrawlerLogs")
303

    
304
    config = configure_functions.load_configuration(dataset_name)
305
    crawl_data(config)
306
    process_data(config)
307

    
308
    validation_test = validate_process_data(config)
309

    
310
    if validation_test:
311
        load_data_to_database(config)
312

    
313

    
314
def run_full_pipeline_crone(dataset_name: str) -> None:
315
    """
316
    Loads config file and starts full pipeline
317
    -crawl data
318
    -process data
319
    -load data to database
320

    
321
    Args:
322
        dataset_name: name of dataset that has existing configuration file
323
    """
324
    logging.info("Starting pipeline for dataset " + dataset_name)
325

    
326
    config = configure_functions.load_configuration(dataset_name)
327
    update_test = check_last_update(config)
328
    if update_test:
329
        crawl_data(config)
330
        process_data_crone(config["dataset-name"])
331

    
332
        validation_test = validate_process_data(config)
333

    
334
        if validation_test:
335
            load_data_to_database_crone(config)
(7-7/13)