Projekt

Obecné

Profil

Stáhnout (11.1 KB) Statistiky
| Větev: | Revize:
1
from Utilities import folder_processor, configure_functions
2
from Utilities.Database import database_loader, database_record_logs
3
from Utilities.CSV import csv_utils
4
import os
5
import pymongo
6

    
7
import logging
8
from datetime import date
9

    
10

    
11
# Path to crawled data
12
CRAWLED_DATA_PATH = "CrawledData/"
13
# Path to processed data
14
PROCESSED_DATA_PATH = "ProcessedData/"
15
# Path to crawler logs
16
CRAWLER_LOGS_PATH = "CrawlerLogs/"
17
# Path to dataset crawler implementations
18
CRAWLER_LIB_PATH = "DatasetCrawler."
19
# Path to dataset processor implementations
20
PROCESSOR_LIB_PATH = "DatasetProcessing."
21

    
22

    
23
#logger
24
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s'
27
                    )
28

    
29

    
30
def check_last_update(config):
31
    """
32
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
33
    representing number of days from last update if number equals
34
    number in confing update period updates it and reset number of
35
    days to zero else increment the number
36

    
37
    Arguments:
38
        config loaded configuration file of dataset
39

    
40
    Returns:
41
       True if updating
42
       Else if incementing days from last update
43
    """
44
    dataset_name = config["dataset-name"]
45

    
46
    last_update = database_record_logs.load_updated(dataset_name)
47

    
48
    if config["update-period"] <= last_update:
49
        logging.info("Dataset " + dataset_name + " is being updated today")
50
        database_record_logs.update_updated(dataset_name,0)
51
        return True
52
    else:
53
        last_update_days = last_update + 1
54
        logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days")
55
        database_record_logs.update_updated(dataset_name,last_update + 1)
56
        return False
57

    
58

    
59

    
60
def crawl_data(config):
61
    """
62
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
63
      runs crawler.
64

    
65
    Args:
66
        config: loaded configuration file of dataset
67
    """
68
    dataset_name = config["dataset-name"]
69

    
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl
71
    crawl_func(config)
72

    
73
    dataset_name += '/'
74

    
75

    
76
def process_data(config):
77
    """
78
    Goes trough every not processed file(list of processed files is saved in databse)
79
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
80
    Runs processor on every file
81
    After successful processing updates database list of processed files
82

    
83
    Args:
84
        dataset_name: name of dataset that has existing configuration file
85
    """
86
    dataset_name = config["dataset-name"]
87
    dataset_path = dataset_name + '/'
88

    
89
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
90
                                   ['process_file']).process_file
91

    
92
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
93
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
94
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
95

    
96
    for not_processed_file in not_processed_files:
97
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
98
        date_dic = process_file_func(path)
99
        csv_utils.export_data_to_csv(path, date_dic)
100
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
101

    
102
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
103

    
104

    
105
def process_data_crone(config):
106
    """
107
    Goes trough every not processed file(list of processed files is saved in database)
108
    Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py
109
    Runs processor on every file
110
    After successful processing updates database list of processed files
111

    
112
    Lightweight version for crone and production only
113
    - excludes checks for changes of config file and coordinates and names
114
    after these changes force_update_datasets.py should be called
115

    
116
    Args:
117
        dataset_name: name of dataset that has existing configuration file
118
    """
119

    
120
    dataset_name = config["dataset-name"]
121
    dataset_path = dataset_name + '/'
122

    
123
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
124
                                   ['process_file']).process_file
125

    
126
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
127
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
128
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
129

    
130
    for not_processed_file in not_processed_files:
131
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
132
        date_dic = process_file_func(path)
133
        csv_utils.export_data_to_csv(path, date_dic)
134
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
135

    
136
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
137

    
138

    
139
def validate_process_data(config):
140
    """
141
    Function goes through newly processed data and checks theirs status
142

    
143
    Args:
144
        config: loaded configuration file of dataset
145

    
146
    Returns:
147
        boolean variable TRUE/FALSE.
148
        Data processed correctly - TRUE
149
        Wrong format or NEW unknown devices - FALSE
150
    """
151
    dataset_name = config["dataset-name"]
152

    
153
    processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/')
154
    unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set)
155
    unknown_devices_size = len(unknown_devices_set)
156

    
157
    if unknown_devices_size != 0:
158
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
159
        logging.info("Adding devices to " + dataset_name + " config file")
160
        configure_functions.update_configuration(dataset_name, unknown_devices_set)
161
        return False
162

    
163
    for device in config["devices"]:
164
        device = config["devices"][device]
165
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
166
            logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!")
167
            return False
168

    
169
    return True
170

    
171

    
172
def load_data_to_database(config):
173
    """
174
    Goes trough every not loaded file(list of loaded files is saved in database)
175
    loads data appends coordination from configurations
176
    and exports it into the database
177
    After successful processing updates database list of loaded files
178

    
179
    Args:
180
        config: loaded configuration file of dataset
181
    """
182
    dataset_name = config["dataset-name"]
183
    dataset_path = dataset_name + '/'
184

    
185
    database_connection = database_loader.create_database_connection()
186

    
187
    database_loader.check_or_update_datasets_collection(database_connection,config)
188

    
189
    changes_in_devices = database_loader.update_devices_collection(config)
190

    
191
    if changes_in_devices == True:
192
        logging.info(dataset_name + " contains changes in devices configuration. Deleting old data and preparing new")
193
        database_loader.reset_dataset_database(dataset_name)
194

    
195
    # get all unprocessed files from dataset
196
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
197
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
198

    
199
    print(ignore_set)
200
    print(not_loaded_files)
201

    
202
    # load every file
203
    for not_loaded_file in not_loaded_files:
204
        # load processed data
205
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
206
        # load processed data to database
207
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
208
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
209

    
210
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
211

    
212
    client = pymongo.MongoClient()
213
    client.close()
214

    
215

    
216
def load_data_to_database_crone(config):
217
    """
218
    Goes trough every not loaded file(list of loaded files is saved in database)
219
    loads data appends coordination from configurations
220
    and exports it into the database
221
    After successful processing updates database list of loaded files
222
    
223
    Lightweight version for crone and production only
224
    - excludes checks for changes of config file and coordinates and names
225
    after these changes force_update_datasets.py should be called
226

    
227
    Args:
228
        config: loaded configuration file of dataset
229
    """
230
    dataset_name = config["dataset-name"]
231
    dataset_path = dataset_name + '/'
232

    
233
    database_connection = database_loader.create_database_connection()
234

    
235
    # get all unprocessed files from dataset
236
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
237
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
238

    
239
    # load every file
240
    for not_loaded_file in not_loaded_files:
241
        # load processed data
242
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
243
        # load processed data to database
244
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
245
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
246

    
247
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
248

    
249
    client = pymongo.MongoClient()
250
    client.close()
251

    
252

    
253

    
254
def run_full_pipeline(dataset_name):
255
    """
256
    Loads config file and starts full pipeline
257
    -crawl data
258
    -process data
259
    -load data to database
260

    
261
    Args:
262
        dataset_name: name of dataset that has existing configuration file
263
    """
264
    logging.info("Starting pipeline for dataset " + dataset_name)
265
    print("Zpracovávám dataset " + dataset_name + " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
266
    
267
    config = configure_functions.load_configuration(dataset_name)
268
    crawl_data(config)
269
    process_data(config)
270

    
271
    validation_test = validate_process_data(config)
272

    
273
    if validation_test:
274
        load_data_to_database(config)
275

    
276

    
277
def run_full_pipeline_crone(dataset_name):
278
    """
279
    Loads config file and starts full pipeline
280
    -crawl data
281
    -process data
282
    -load data to database
283

    
284
    Args:
285
        dataset_name: name of dataset that has existing configuration file
286
    """
287
    logging.info("Starting pipeline for dataset " + dataset_name)
288

    
289
    config = configure_functions.load_configuration(dataset_name)
290
    update_test = check_last_update(config)
291
    if update_test:
292
        crawl_data(config)
293
        process_data_crone(config["dataset-name"])
294

    
295
        validation_test = validate_process_data(config)
296

    
297
        if validation_test:
298
            load_data_to_database_crone(config)
299
            
(7-7/12)