Projekt

Obecné

Profil

Stáhnout (11.1 KB) Statistiky
| Větev: | Revize:
1
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6

    
7
import logging
8
from datetime import date
9

    
10

    
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21

    
22

    
23
#logger
24
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s'
27
                    )
28

    
29

    
30
def check_last_update(config):
31
    """
32
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
33
    representing number of days from last update if number equals
34
    number in confing update period updates it and reset number of
35
    days to zero else increment the number
36

    
37
    Arguments:
38
        config loaded configuration file of dataset
39

    
40
    Returns:
41
       True if updating
42
       Else if incementing days from last update
43
    """
44
    dataset_name = config["dataset-name"]
45

    
46
    last_update = database_record_logs.load_updated(dataset_name)
47

    
48
    if config["update-period"] <= last_update:
49
        logging.info("Dataset " + dataset_name + " is being updated today")
50
        database_record_logs.update_updated(dataset_name,0)
51
        return True
52
    else:
53
        last_update_days = last_update + 1
54
        logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days")
55
        database_record_logs.update_updated(dataset_name,last_update + 1)
56
        return False
57

    
58

    
59

    
60
def crawl_data(config):
61
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64

    
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69

    
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl
71
    crawl_func(config)
72

    
73
    dataset_name += '/'
74

    
75

    
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82

    
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88

    
89
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
90
                                   ['process_file']).process_file
91

    
92
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
93
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
94
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
95

    
96
    for not_processed_file in not_processed_files:
97
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
98
        date_dic = process_file_func(path)
99
        csv_utils.export_data_to_csv(path, date_dic)
100
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
101

    
102
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
103

    
104

    
105
def process_data_crone(config):
106
    """
107
    Goes trough every not processed file(list of processed files is saved in database)
108
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
109
    Runs processor on every file
110
    After successful processing updates database list of processed files
111

    
112
    Lightweight version for crone and production only
113
    - excludes checks for changes of config file and coordinates and names
114
    after these changes force_update_datasets.py should be called
115

    
116
    Args:
117
        dataset_name: name of dataset that has existing configuration file
118
    """
119

    
120
    dataset_name = config["dataset-name"]
121
    dataset_path = dataset_name + '/'
122

    
123
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
124
                                   ['process_file']).process_file
125

    
126
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
127
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
128
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
129

    
130
    for not_processed_file in not_processed_files:
131
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
132
        date_dic = process_file_func(path)
133
        csv_utils.export_data_to_csv(path, date_dic)
134
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
135

    
136
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
137

    
138

    
139
def validate_process_data(config):
140
    """
141
    Function goes through newly processed data and checks theirs status
142

    
143
    Args:
144
        config: loaded configuration file of dataset
145

    
146
    Returns:
147
        boolean variable TRUE/FALSE.
148
        Data processed correctly - TRUE
149
        Wrong format or NEW unknown devices - FALSE
150
    """
151
    dataset_name = config["dataset-name"]
152

    
153
    processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/')
154
    unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set)
155
    unknown_devices_size = len(unknown_devices_set)
156

    
157
    if unknown_devices_size != 0:
158
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
159
        logging.info("Adding devices to " + dataset_name + " config file")
160
        configure_functions.update_configuration(dataset_name, unknown_devices_set)
161
        return False
162

    
163
    for device in config["devices"]:
164
        device = config["devices"][device]
165
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
166
            logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!")
167
            return False
168

    
169
    return True
170

    
171

    
172
def load_data_to_database(config):
173
    """
174
    Goes trough every not loaded file(list of loaded files is saved in database)
175
    loads data appends coordination from configurations
176
    and exports it into the database
177
    After successful processing updates database list of loaded files
178

    
179
    Args:
180
        config: loaded configuration file of dataset
181
    """
182
    dataset_name = config["dataset-name"]
183
    dataset_path = dataset_name + '/'
184

    
185
    database_connection = database_loader.create_database_connection()
186

    
187
    database_loader.check_or_update_datasets_collection(database_connection,config)
188

    
189
    changes_in_devices = database_loader.update_devices_collection(config)
190

    
191
    if changes_in_devices == True:
192
        logging.info(dataset_name + " contains changes in devices configuration. Deleting old data and preparing new")
193
        database_loader.reset_dataset_database(dataset_name)
194

    
195
    # get all unprocessed files from dataset
196
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
197
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
198

    
199
    # load every file
200
    for not_loaded_file in not_loaded_files:
201
        # load processed data
202
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
203
        # load processed data to database
204
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
205
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
206

    
207
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
208

    
209
    client = pymongo.MongoClient()
210
    client.close()
211

    
212

    
213
def load_data_to_database_crone(config):
214
    """
215
    Goes trough every not loaded file(list of loaded files is saved in database)
216
    loads data appends coordination from configurations
217
    and exports it into the database
218
    After successful processing updates database list of loaded files
219
    
220
    Lightweight version for crone and production only
221
    - excludes checks for changes of config file and coordinates and names
222
    after these changes force_update_datasets.py should be called
223

    
224
    Args:
225
        config: loaded configuration file of dataset
226
    """
227
    dataset_name = config["dataset-name"]
228
    dataset_path = dataset_name + '/'
229

    
230
    database_connection = database_loader.create_database_connection()
231

    
232
    # get all unprocessed files from dataset
233
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
234
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
235

    
236
    # load every file
237
    for not_loaded_file in not_loaded_files:
238
        # load processed data
239
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
240
        # load processed data to database
241
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
242
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
243

    
244
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
245

    
246
    client = pymongo.MongoClient()
247
    client.close()
248

    
249

    
250

    
251
def run_full_pipeline(dataset_name):
252
    """
253
    Loads config file and starts full pipeline
254
    -crawl data
255
    -process data
256
    -load data to database
257

    
258
    Args:
259
        dataset_name: name of dataset that has existing configuration file
260
    """
261
    logging.info("Starting pipeline for dataset " + dataset_name)
262
    print("Zpracovávám dataset " + dataset_name + " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
263
    
264
    config = configure_functions.load_configuration(dataset_name)
265
    crawl_data(config)
266
    process_data(config)
267

    
268
    validation_test = validate_process_data(config)
269

    
270
    if validation_test:
271
        load_data_to_database(config)
272

    
273

    
274
def run_full_pipeline_crone(dataset_name):
275
    """
276
    Loads config file and starts full pipeline
277
    -crawl data
278
    -process data
279
    -load data to database
280

    
281
    Args:
282
        dataset_name: name of dataset that has existing configuration file
283
    """
284
    logging.info("Starting pipeline for dataset " + dataset_name)
285

    
286
    config = configure_functions.load_configuration(dataset_name)
287
    update_test = check_last_update(config)
288
    if update_test:
289
        crawl_data(config)
290
        process_data_crone(config["dataset-name"])
291

    
292
        validation_test = validate_process_data(config)
293

    
294
        if validation_test:
295
            load_data_to_database_crone(config)
296
            
(7-7/12)