Projekt

Obecné

Profil

Stáhnout (11.1 KB) Statistiky
| Větev: | Revize:
1 d6ca840d petrh
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6
7
import logging
8
from datetime import date
9
10
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21
22
23
#logger
24
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s'
27
                    )
28
29
30
def check_last_update(config):
31
    """
32
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
33
    representing number of days from last update if number equals
34
    number in confing update period updates it and reset number of
35
    days to zero else increment the number
36
37
    Arguments:
38
        config loaded configuration file of dataset
39
40
    Returns:
41
       True if updating
42
       Else if incementing days from last update
43
    """
44
    dataset_name = config["dataset-name"]
45
46
    last_update = database_record_logs.load_updated(dataset_name)
47
48
    if config["update-period"] <= last_update:
49
        logging.info("Dataset " + dataset_name + " is being updated today")
50
        database_record_logs.update_updated(dataset_name,0)
51
        return True
52
    else:
53
        last_update_days = last_update + 1
54
        logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days")
55
        database_record_logs.update_updated(dataset_name,last_update + 1)
56
        return False
57
58
59
60
def crawl_data(config):
61
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl
71
    crawl_func(config)
72
73
    dataset_name += '/'
74
75
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88
89
    changes_in_devices = database_loader.update_devices_collection(config)
90
91
    if changes_in_devices == True:
92
        logging.info(dataset_name + " contains changes in devices configuration. Deleteing old data and preparing new")
93
        database_loader.reset_dataset_database(dataset_name)
94
        folder_processor.clean_folder(PROCESSED_DATA_PATH + dataset_path)
95
96
97
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
98
                                   ['process_file']).process_file
99
100
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
101
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
102
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
103
104
    for not_processed_file in not_processed_files:
105
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
106
        date_dic = process_file_func(path)
107
        csv_utils.export_data_to_csv(path, date_dic)
108
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
109
110
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
111
112
113
def process_data_crone(config):
114
    """
115
    Goes trough every not processed file(list of processed files is saved in database)
116
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
117
    Runs processor on every file
118
    After successful processing updates database list of processed files
119
120
    Lightweight version for crone and production only
121
    - excludes checks for changes of config file and coordinates and names
122
    after these changes force_update_datasets.py should be called
123
124
    Args:
125
        dataset_name: name of dataset that has existing configuration file
126
    """
127
128
    dataset_name = config["dataset-name"]
129
    dataset_path = dataset_name + '/'
130
131
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
132
                                   ['process_file']).process_file
133
134
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
135
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
136
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
137
138
    for not_processed_file in not_processed_files:
139
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
140
        date_dic = process_file_func(path)
141
        csv_utils.export_data_to_csv(path, date_dic)
142
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
143
144
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
145
146
147
def validate_process_data(config):
148
    """
149
    Function goes through newly processed data and checks theirs status
150
151
    Args:
152
        config: loaded configuration file of dataset
153
154
    Returns:
155
        boolean variable TRUE/FALSE.
156
        Data processed correctly - TRUE
157
        Wrong format or NEW unknown devices - FALSE
158
    """
159
    dataset_name = config["dataset-name"]
160
161
    processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/')
162
    unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set)
163
    unknown_devices_size = len(unknown_devices_set)
164
165
    if unknown_devices_size != 0:
166
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
167
        logging.info("Adding devices to " + dataset_name + " config file")
168
        configure_functions.update_configuration(dataset_name, unknown_devices_set)
169
        return False
170
171
    for device in config["devices"]:
172
        device = config["devices"][device]
173
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
174
            logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!")
175
            return False
176
177
    return True
178
179
180
def load_data_to_database(config):
181
    """
182
    Goes trough every not loaded file(list of loaded files is saved in database)
183
    loads data appends coordination from configurations
184
    and exports it into the database
185
    After successful processing updates database list of loaded files
186
187
    Args:
188
        config: loaded configuration file of dataset
189
    """
190
    dataset_name = config["dataset-name"]
191
    dataset_path = dataset_name + '/'
192
193
    database_connection = database_loader.create_database_connection()
194
195
    database_loader.check_or_update_datasets_collection(database_connection,config)
196
197
    # get all unprocessed files from dataset
198
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
199
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
200
201
    # load every file
202
    for not_loaded_file in not_loaded_files:
203
        # load processed data
204
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
205
        # load processed data to database
206
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
207
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
208
209
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
210
211
    client = pymongo.MongoClient()
212
    client.close()
213
214
215
def load_data_to_database_crone(config):
216
    """
217
    Goes trough every not loaded file(list of loaded files is saved in database)
218
    loads data appends coordination from configurations
219
    and exports it into the database
220
    After successful processing updates database list of loaded files
221
    
222
    Lightweight version for crone and production only
223
    - excludes checks for changes of config file and coordinates and names
224
    after these changes force_update_datasets.py should be called
225
226
    Args:
227
        config: loaded configuration file of dataset
228
    """
229
    dataset_name = config["dataset-name"]
230
    dataset_path = dataset_name + '/'
231
232
    database_connection = database_loader.create_database_connection()
233
234
    # get all unprocessed files from dataset
235
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
236
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
237
238
    # load every file
239
    for not_loaded_file in not_loaded_files:
240
        # load processed data
241
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
242
        # load processed data to database
243
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
244
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
245
246
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
247
248
    client = pymongo.MongoClient()
249
    client.close()
250
251
252
253
def run_full_pipeline(dataset_name):
254
    """
255
    Loads config file and starts full pipeline
256
    -crawl data
257
    -process data
258
    -load data to database
259
260
    Args:
261
        dataset_name: name of dataset that has existing configuration file
262
    """
263
    logging.info("Starting pipeline for dataset " + dataset_name)
264
    print("Processing dataset " + dataset_name + " you can watch the progress in log contained in CrawlerLogs folder")
265
    
266
    config = configure_functions.load_configuration(dataset_name)
267
    crawl_data(config)
268
    process_data(config)
269
270
    validation_test = validate_process_data(config)
271
272
    if validation_test:
273
        load_data_to_database(config)
274
275
276
def run_full_pipeline_crone(dataset_name):
277
    """
278
    Loads config file and starts full pipeline
279
    -crawl data
280
    -process data
281
    -load data to database
282
283
    Args:
284
        dataset_name: name of dataset that has existing configuration file
285
    """
286
    logging.info("Starting pipeline for dataset " + dataset_name)
287
288
    config = configure_functions.load_configuration(dataset_name)
289
    update_test = check_last_update(config)
290
    if update_test:
291
        crawl_data(config)
292
        process_data_crone(config["dataset-name"])
293
294
        validation_test = validate_process_data(config)
295
296
        if validation_test:
297
            load_data_to_database_crone(config)
298