Projekt

Obecné

Profil

Stáhnout (11.1 KB) Statistiky
| Větev: | Revize:
1
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6

    
7
import logging
8
from datetime import date
9

    
10

    
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21

    
22

    
23
#logger
24
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s'
27
                    )
28

    
29

    
30
def check_last_update(config):
31
    """
32
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
33
    representing number of days from last update if number equals
34
    number in confing update period updates it and reset number of
35
    days to zero else increment the number
36

    
37
    Arguments:
38
        config loaded configuration file of dataset
39

    
40
    Returns:
41
       True if updating
42
       Else if incementing days from last update
43
    """
44
    dataset_name = config["dataset-name"]
45

    
46
    last_update = database_record_logs.load_updated(dataset_name)
47

    
48
    if config["update-period"] <= last_update:
49
        logging.info("Dataset " + dataset_name + " is being updated today")
50
        database_record_logs.update_updated(dataset_name,0)
51
        return True
52
    else:
53
        last_update_days = last_update + 1
54
        logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days")
55
        database_record_logs.update_updated(dataset_name,last_update + 1)
56
        return False
57

    
58

    
59

    
60
def crawl_data(config):
61
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64

    
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69

    
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl
71
    crawl_func(config)
72

    
73
    dataset_name += '/'
74

    
75

    
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82

    
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88

    
89
    changes_in_devices = database_loader.update_devices_collection(config)
90

    
91
    if changes_in_devices == True:
92
        logging.info(dataset_name + " contains changes in devices configuration. Deleteing old data and preparing new")
93
        database_loader.reset_dataset_database(dataset_name)
94
        folder_processor.clean_folder(PROCESSED_DATA_PATH + dataset_path)
95

    
96

    
97
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
98
                                   ['process_file']).process_file
99

    
100
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
101
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
102
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
103

    
104
    for not_processed_file in not_processed_files:
105
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
106
        date_dic = process_file_func(path)
107
        csv_utils.export_data_to_csv(path, date_dic)
108
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
109

    
110
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
111

    
112

    
113
def process_data_crone(config):
114
    """
115
    Goes trough every not processed file(list of processed files is saved in database)
116
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
117
    Runs processor on every file
118
    After successful processing updates database list of processed files
119

    
120
    Lightweight version for crone and production only
121
    - excludes checks for changes of config file and coordinates and names
122
    after these changes force_update_datasets.py should be called
123

    
124
    Args:
125
        dataset_name: name of dataset that has existing configuration file
126
    """
127

    
128
    dataset_name = config["dataset-name"]
129
    dataset_path = dataset_name + '/'
130

    
131
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
132
                                   ['process_file']).process_file
133

    
134
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
135
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
136
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
137

    
138
    for not_processed_file in not_processed_files:
139
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
140
        date_dic = process_file_func(path)
141
        csv_utils.export_data_to_csv(path, date_dic)
142
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
143

    
144
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
145

    
146

    
147
def validate_process_data(config):
148
    """
149
    Function goes through newly processed data and checks theirs status
150

    
151
    Args:
152
        config: loaded configuration file of dataset
153

    
154
    Returns:
155
        boolean variable TRUE/FALSE.
156
        Data processed correctly - TRUE
157
        Wrong format or NEW unknown devices - FALSE
158
    """
159
    dataset_name = config["dataset-name"]
160

    
161
    processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/')
162
    unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set)
163
    unknown_devices_size = len(unknown_devices_set)
164

    
165
    if unknown_devices_size != 0:
166
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
167
        logging.info("Adding devices to " + dataset_name + " config file")
168
        configure_functions.update_configuration(dataset_name, unknown_devices_set)
169
        return False
170

    
171
    for device in config["devices"]:
172
        device = config["devices"][device]
173
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
174
            logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!")
175
            return False
176

    
177
    return True
178

    
179

    
180
def load_data_to_database(config):
181
    """
182
    Goes trough every not loaded file(list of loaded files is saved in database)
183
    loads data appends coordination from configurations
184
    and exports it into the database
185
    After successful processing updates database list of loaded files
186

    
187
    Args:
188
        config: loaded configuration file of dataset
189
    """
190
    dataset_name = config["dataset-name"]
191
    dataset_path = dataset_name + '/'
192

    
193
    database_connection = database_loader.create_database_connection()
194

    
195
    database_loader.check_or_update_datasets_collection(database_connection,config)
196

    
197
    # get all unprocessed files from dataset
198
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
199
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
200

    
201
    # load every file
202
    for not_loaded_file in not_loaded_files:
203
        # load processed data
204
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
205
        # load processed data to database
206
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
207
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
208

    
209
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
210

    
211
    client = pymongo.MongoClient()
212
    client.close()
213

    
214

    
215
def load_data_to_database_crone(config):
216
    """
217
    Goes trough every not loaded file(list of loaded files is saved in database)
218
    loads data appends coordination from configurations
219
    and exports it into the database
220
    After successful processing updates database list of loaded files
221
    
222
    Lightweight version for crone and production only
223
    - excludes checks for changes of config file and coordinates and names
224
    after these changes force_update_datasets.py should be called
225

    
226
    Args:
227
        config: loaded configuration file of dataset
228
    """
229
    dataset_name = config["dataset-name"]
230
    dataset_path = dataset_name + '/'
231

    
232
    database_connection = database_loader.create_database_connection()
233

    
234
    # get all unprocessed files from dataset
235
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
236
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
237

    
238
    # load every file
239
    for not_loaded_file in not_loaded_files:
240
        # load processed data
241
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
242
        # load processed data to database
243
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
244
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
245

    
246
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
247

    
248
    client = pymongo.MongoClient()
249
    client.close()
250

    
251

    
252

    
253
def run_full_pipeline(dataset_name):
254
    """
255
    Loads config file and starts full pipeline
256
    -crawl data
257
    -process data
258
    -load data to database
259

    
260
    Args:
261
        dataset_name: name of dataset that has existing configuration file
262
    """
263
    logging.info("Starting pipeline for dataset " + dataset_name)
264
    print("Processing dataset " + dataset_name + " you can watch the progress in log contained in CrawlerLogs folder")
265
    
266
    config = configure_functions.load_configuration(dataset_name)
267
    crawl_data(config)
268
    process_data(config)
269

    
270
    validation_test = validate_process_data(config)
271

    
272
    if validation_test:
273
        load_data_to_database(config)
274

    
275

    
276
def run_full_pipeline_crone(dataset_name):
277
    """
278
    Loads config file and starts full pipeline
279
    -crawl data
280
    -process data
281
    -load data to database
282

    
283
    Args:
284
        dataset_name: name of dataset that has existing configuration file
285
    """
286
    logging.info("Starting pipeline for dataset " + dataset_name)
287

    
288
    config = configure_functions.load_configuration(dataset_name)
289
    update_test = check_last_update(config)
290
    if update_test:
291
        crawl_data(config)
292
        process_data_crone(config["dataset-name"])
293

    
294
        validation_test = validate_process_data(config)
295

    
296
        if validation_test:
297
            load_data_to_database_crone(config)
298
            
(6-6/11)