Projekt

Obecné

Profil

Stáhnout (11.1 KB) Statistiky
| Větev: | Revize:
1 d6ca840d petrh
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6
7
import logging
8
from datetime import date
9
10
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21
22
23
#logger
24
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s'
27
                    )
28
29
30
def check_last_update(config):
31
    """
32
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
33
    representing number of days from last update if number equals
34
    number in confing update period updates it and reset number of
35
    days to zero else increment the number
36
37
    Arguments:
38
        config loaded configuration file of dataset
39
40
    Returns:
41
       True if updating
42
       Else if incementing days from last update
43
    """
44
    dataset_name = config["dataset-name"]
45
46
    last_update = database_record_logs.load_updated(dataset_name)
47
48
    if config["update-period"] <= last_update:
49
        logging.info("Dataset " + dataset_name + " is being updated today")
50
        database_record_logs.update_updated(dataset_name,0)
51
        return True
52
    else:
53
        last_update_days = last_update + 1
54
        logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days")
55
        database_record_logs.update_updated(dataset_name,last_update + 1)
56
        return False
57
58
59
60
def crawl_data(config):
61
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl
71
    crawl_func(config)
72
73
    dataset_name += '/'
74
75
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88
89
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
90
                                   ['process_file']).process_file
91
92
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
93
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
94
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
95
96
    for not_processed_file in not_processed_files:
97
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
98
        date_dic = process_file_func(path)
99
        csv_utils.export_data_to_csv(path, date_dic)
100
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
101
102
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
103
104
105
def process_data_crone(config):
106
    """
107
    Goes trough every not processed file(list of processed files is saved in database)
108
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
109
    Runs processor on every file
110
    After successful processing updates database list of processed files
111
112
    Lightweight version for crone and production only
113
    - excludes checks for changes of config file and coordinates and names
114
    after these changes force_update_datasets.py should be called
115
116
    Args:
117
        dataset_name: name of dataset that has existing configuration file
118
    """
119
120
    dataset_name = config["dataset-name"]
121
    dataset_path = dataset_name + '/'
122
123
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
124
                                   ['process_file']).process_file
125
126
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
127
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
128
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
129
130
    for not_processed_file in not_processed_files:
131
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
132
        date_dic = process_file_func(path)
133
        csv_utils.export_data_to_csv(path, date_dic)
134
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
135
136
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
137
138
139
def validate_process_data(config):
140
    """
141
    Function goes through newly processed data and checks theirs status
142
143
    Args:
144
        config: loaded configuration file of dataset
145
146
    Returns:
147
        boolean variable TRUE/FALSE.
148
        Data processed correctly - TRUE
149
        Wrong format or NEW unknown devices - FALSE
150
    """
151
    dataset_name = config["dataset-name"]
152
153
    processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/')
154
    unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set)
155
    unknown_devices_size = len(unknown_devices_set)
156
157
    if unknown_devices_size != 0:
158
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
159
        logging.info("Adding devices to " + dataset_name + " config file")
160
        configure_functions.update_configuration(dataset_name, unknown_devices_set)
161
        return False
162
163
    for device in config["devices"]:
164
        device = config["devices"][device]
165
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
166
            logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!")
167
            return False
168
169
    return True
170
171
172
def load_data_to_database(config):
173
    """
174
    Goes trough every not loaded file(list of loaded files is saved in database)
175
    loads data appends coordination from configurations
176
    and exports it into the database
177
    After successful processing updates database list of loaded files
178
179
    Args:
180
        config: loaded configuration file of dataset
181
    """
182
    dataset_name = config["dataset-name"]
183
    dataset_path = dataset_name + '/'
184
185
    database_connection = database_loader.create_database_connection()
186
187
    database_loader.check_or_update_datasets_collection(database_connection,config)
188
189 753d424e petrh
    changes_in_devices = database_loader.update_devices_collection(config)
190
191
    if changes_in_devices == True:
192
        logging.info(dataset_name + " contains changes in devices configuration. Deleting old data and preparing new")
193
        database_loader.reset_dataset_database(dataset_name)
194
195 d6ca840d petrh
    # get all unprocessed files from dataset
196
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
197
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
198
199 fc552acd petrh
    print(ignore_set)
200
    print(not_loaded_files)
201
202 d6ca840d petrh
    # load every file
203
    for not_loaded_file in not_loaded_files:
204
        # load processed data
205
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
206
        # load processed data to database
207
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
208
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
209
210
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
211
212
    client = pymongo.MongoClient()
213
    client.close()
214
215
216
def load_data_to_database_crone(config):
217
    """
218
    Goes trough every not loaded file(list of loaded files is saved in database)
219
    loads data appends coordination from configurations
220
    and exports it into the database
221
    After successful processing updates database list of loaded files
222
    
223
    Lightweight version for crone and production only
224
    - excludes checks for changes of config file and coordinates and names
225
    after these changes force_update_datasets.py should be called
226
227
    Args:
228
        config: loaded configuration file of dataset
229
    """
230
    dataset_name = config["dataset-name"]
231
    dataset_path = dataset_name + '/'
232
233
    database_connection = database_loader.create_database_connection()
234
235
    # get all unprocessed files from dataset
236
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
237
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
238
239
    # load every file
240
    for not_loaded_file in not_loaded_files:
241
        # load processed data
242
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
243
        # load processed data to database
244
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
245
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
246
247
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
248
249
    client = pymongo.MongoClient()
250
    client.close()
251
252
253
254
def run_full_pipeline(dataset_name):
255
    """
256
    Loads config file and starts full pipeline
257
    -crawl data
258
    -process data
259
    -load data to database
260
261
    Args:
262
        dataset_name: name of dataset that has existing configuration file
263
    """
264
    logging.info("Starting pipeline for dataset " + dataset_name)
265 753d424e petrh
    print("Zpracovávám dataset " + dataset_name + " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
266 d6ca840d petrh
    
267
    config = configure_functions.load_configuration(dataset_name)
268
    crawl_data(config)
269
    process_data(config)
270
271
    validation_test = validate_process_data(config)
272
273
    if validation_test:
274
        load_data_to_database(config)
275
276
277
def run_full_pipeline_crone(dataset_name):
278
    """
279
    Loads config file and starts full pipeline
280
    -crawl data
281
    -process data
282
    -load data to database
283
284
    Args:
285
        dataset_name: name of dataset that has existing configuration file
286
    """
287
    logging.info("Starting pipeline for dataset " + dataset_name)
288
289
    config = configure_functions.load_configuration(dataset_name)
290
    update_test = check_last_update(config)
291
    if update_test:
292
        crawl_data(config)
293
        process_data_crone(config["dataset-name"])
294
295
        validation_test = validate_process_data(config)
296
297
        if validation_test:
298
            load_data_to_database_crone(config)
299