Projekt

Obecné

Profil

Stáhnout (11.8 KB) Statistiky
| Větev: | Revize:
1
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6

    
7
import logging
8
from datetime import date
9

    
10
# Path to crawled data
11
CRAWLED_DATA_PATH = "CrawledData/"
12
# Path to processed data
13
PROCESSED_DATA_PATH = "ProcessedData/"
14
# Path to crawler logs
15
CRAWLER_LOGS_PATH = "CrawlerLogs/"
16
# Path to dataset crawler implementations
17
CRAWLER_LIB_PATH = "DatasetCrawler."
18
# Path to dataset processor implementations
19
PROCESSOR_LIB_PATH = "DatasetProcessing."
20

    
21
#logger
22
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' +
23
                               date.today().strftime("%b-%Y") + '.log',
24
                               level=logging.INFO,
25
                               format='%(asctime)s %(message)s')
26

    
27

    
28
def check_last_update(config):
29
    """
30
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
31
    representing number of days from last update if number equals
32
    number in confing update period updates it and reset number of
33
    days to zero else increment the number
34

    
35
    Arguments:
36
        config loaded configuration file of dataset
37

    
38
    Returns:
39
       True if updating
40
       Else if incementing days from last update
41
    """
42
    dataset_name = config["dataset-name"]
43

    
44
    last_update = database_record_logs.load_updated(dataset_name)
45

    
46
    if config["update-period"] <= last_update:
47
        logging.info("Dataset " + dataset_name + " is being updated today")
48
        database_record_logs.update_updated(dataset_name, 0)
49
        return True
50
    else:
51
        last_update_days = last_update + 1
52
        logging.info("Dataset " + dataset_name + " will be updated in " +
53
                     str(int(config["update-period"]) - last_update_days) +
54
                     "days")
55
        database_record_logs.update_updated(dataset_name, last_update + 1)
56
        return False
57

    
58

    
59
def crawl_data(config):
60
    """
61
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
62
      runs crawler.
63

    
64
    Args:
65
        config: loaded configuration file of dataset
66
    """
67
    dataset_name = config["dataset-name"]
68

    
69
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler",
70
                            globals(), locals(), ['crawl']).crawl
71
    crawl_func(config)
72

    
73
    dataset_name += '/'
74

    
75

    
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82

    
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88

    
89
    process_file_func = __import__(
90
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
91
        ['process_file']).process_file
92

    
93
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
94
    not_processed_files = folder_processor.list_of_all_new_files(
95
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
96
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
97
                 " not processed files")
98

    
99
    for not_processed_file in not_processed_files:
100
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
101
        date_dic = process_file_func(path)
102
        csv_utils.export_data_to_csv(path, date_dic)
103
        database_record_logs.update_ignore_set_processed(
104
            dataset_name, not_processed_file)
105

    
106
    logging.info(dataset_name + " has processed " +
107
                 str(len(not_processed_files)) + " newly crawled files")
108

    
109

    
110
def process_data_crone(config):
111
    """
112
    Goes trough every not processed file(list of processed files is saved in database)
113
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
114
    Runs processor on every file
115
    After successful processing updates database list of processed files
116

    
117
    Lightweight version for crone and production only
118
    - excludes checks for changes of config file and coordinates and names
119
    after these changes force_update_datasets.py should be called
120

    
121
    Args:
122
        dataset_name: name of dataset that has existing configuration file
123
    """
124

    
125
    dataset_name = config["dataset-name"]
126
    dataset_path = dataset_name + '/'
127

    
128
    process_file_func = __import__(
129
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
130
        ['process_file']).process_file
131

    
132
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
133
    not_processed_files = folder_processor.list_of_all_new_files(
134
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
135
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
136
                 " not processed files")
137

    
138
    for not_processed_file in not_processed_files:
139
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
140
        date_dic = process_file_func(path)
141
        csv_utils.export_data_to_csv(path, date_dic)
142
        database_record_logs.update_ignore_set_processed(
143
            dataset_name, not_processed_file)
144

    
145
    logging.info(dataset_name + " has processed " +
146
                 str(len(not_processed_files)) + " newly crawled files")
147

    
148

    
149
def validate_process_data(config):
150
    """
151
    Function goes through newly processed data and checks theirs status
152

    
153
    Args:
154
        config: loaded configuration file of dataset
155

    
156
    Returns:
157
        boolean variable TRUE/FALSE.
158
        Data processed correctly - TRUE
159
        Wrong format or NEW unknown devices - FALSE
160
    """
161
    dataset_name = config["dataset-name"]
162

    
163
    processed_devices_set = folder_processor.get_devices_set(
164
        dataset_name, PROCESSED_DATA_PATH + dataset_name + '/')
165
    unknown_devices_set = folder_processor.get_unknown_devices_set(
166
        config, processed_devices_set)
167
    unknown_devices_size = len(unknown_devices_set)
168

    
169
    if unknown_devices_size != 0:
170
        logging.info("There is " + str(unknown_devices_size) +
171
                     " unknown devices")
172
        logging.info("Adding devices to " + dataset_name + " config file")
173
        configure_functions.update_configuration(dataset_name,
174
                                                 unknown_devices_set)
175
        return False
176

    
177
    for device in config["devices"]:
178
        device = config["devices"][device]
179
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
180
            logging.info(
181
                dataset_name +
182
                " config file contains devices with UNKOWN! values please update them!!"
183
            )
184
            #return False
185

    
186
    return True
187

    
188

    
189
def load_data_to_database(config):
190
    """
191
    Goes trough every not loaded file(list of loaded files is saved in database)
192
    loads data appends coordination from configurations
193
    and exports it into the database
194
    After successful processing updates database list of loaded files
195

    
196
    Args:
197
        config: loaded configuration file of dataset
198
    """
199
    dataset_name = config["dataset-name"]
200
    dataset_path = dataset_name + '/'
201

    
202
    database_connection = database_loader.create_database_connection()
203

    
204
    database_loader.check_or_update_datasets_collection(
205
        database_connection, config)
206

    
207
    changes_in_devices = database_loader.update_devices_collection(config)
208

    
209
    if changes_in_devices == True:
210
        logging.info(
211
            dataset_name +
212
            " contains changes in devices configuration. Deleting old data and preparing new"
213
        )
214
        database_loader.reset_dataset_database(dataset_name)
215

    
216
    # get all unprocessed files from dataset
217
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
218
    not_loaded_files = folder_processor.list_of_all_new_files(
219
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
220

    
221
    # load every file
222
    for not_loaded_file in not_loaded_files:
223
        # load processed data
224
        processed_data = database_loader.get_data_from_file(
225
            not_loaded_file, config)
226
        # load processed data to database
227
        database_loader.load_data_to_database(database_connection,
228
                                              dataset_name, processed_data,
229
                                              not_loaded_file)
230
        database_record_logs.update_ignore_set_loaded(dataset_name,
231
                                                      not_loaded_file)
232

    
233
    logging.info(dataset_name + " has loaded to database " +
234
                 str(len(not_loaded_files)) + " newly processed files.")
235

    
236
    client = pymongo.MongoClient()
237
    client.close()
238

    
239

    
240
def load_data_to_database_crone(config):
241
    """
242
    Goes trough every not loaded file(list of loaded files is saved in database)
243
    loads data appends coordination from configurations
244
    and exports it into the database
245
    After successful processing updates database list of loaded files
246
    
247
    Lightweight version for crone and production only
248
    - excludes checks for changes of config file and coordinates and names
249
    after these changes force_update_datasets.py should be called
250

    
251
    Args:
252
        config: loaded configuration file of dataset
253
    """
254
    dataset_name = config["dataset-name"]
255
    dataset_path = dataset_name + '/'
256

    
257
    database_connection = database_loader.create_database_connection()
258

    
259
    # get all unprocessed files from dataset
260
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
261
    not_loaded_files = folder_processor.list_of_all_new_files(
262
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
263

    
264
    # load every file
265
    for not_loaded_file in not_loaded_files:
266
        # load processed data
267
        processed_data = database_loader.get_data_from_file(
268
            not_loaded_file, config)
269
        # load processed data to database
270
        database_loader.load_data_to_database(database_connection,
271
                                              dataset_name, processed_data,
272
                                              not_loaded_file)
273
        database_record_logs.update_ignore_set_loaded(dataset_name,
274
                                                      not_loaded_file)
275

    
276
    logging.info(dataset_name + " has loaded to database " +
277
                 str(len(not_loaded_files)) + " newly processed files.")
278

    
279
    client = pymongo.MongoClient()
280
    client.close()
281

    
282

    
283
def run_full_pipeline(dataset_name):
284
    """
285
    Loads config file and starts full pipeline
286
    -crawl data
287
    -process data
288
    -load data to database
289

    
290
    Args:
291
        dataset_name: name of dataset that has existing configuration file
292
    """
293
    logging.info("Starting pipeline for dataset " + dataset_name)
294
    print("Zpracovávám dataset " + dataset_name +
295
          " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
296

    
297
    config = configure_functions.load_configuration(dataset_name)
298
    crawl_data(config)
299
    process_data(config)
300

    
301
    validation_test = validate_process_data(config)
302

    
303
    if validation_test:
304
        load_data_to_database(config)
305

    
306

    
307
def run_full_pipeline_crone(dataset_name):
308
    """
309
    Loads config file and starts full pipeline
310
    -crawl data
311
    -process data
312
    -load data to database
313

    
314
    Args:
315
        dataset_name: name of dataset that has existing configuration file
316
    """
317
    logging.info("Starting pipeline for dataset " + dataset_name)
318

    
319
    config = configure_functions.load_configuration(dataset_name)
320
    update_test = check_last_update(config)
321
    if update_test:
322
        crawl_data(config)
323
        process_data_crone(config["dataset-name"])
324

    
325
        validation_test = validate_process_data(config)
326

    
327
        if validation_test:
328
            load_data_to_database_crone(config)
(7-7/12)