Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 81980e82

Přidáno uživatelem Tomáš Ballák před asi 4 roky(ů)

Re #8160 new dataset

Zobrazit rozdíly:

modules/crawler/pipeline.py
7 7
import logging
8 8
from datetime import date
9 9

  
10

  
11 10
# Path to crawled data
12 11
CRAWLED_DATA_PATH = "CrawledData/"
13 12
# Path to processed data
......
19 18
# Path to dataset processor implementations
20 19
PROCESSOR_LIB_PATH = "DatasetProcessing."
21 20

  
22

  
23 21
#logger
24
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
25
                    level=logging.INFO,
26
                    format='%(asctime)s %(message)s'
27
                    )
22
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' +
23
                               date.today().strftime("%b-%Y") + '.log',
24
                               level=logging.INFO,
25
                               format='%(asctime)s %(message)s')
28 26

  
29 27

  
30 28
def check_last_update(config):
......
47 45

  
48 46
    if config["update-period"] <= last_update:
49 47
        logging.info("Dataset " + dataset_name + " is being updated today")
50
        database_record_logs.update_updated(dataset_name,0)
48
        database_record_logs.update_updated(dataset_name, 0)
51 49
        return True
52 50
    else:
53 51
        last_update_days = last_update + 1
54
        logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days")
55
        database_record_logs.update_updated(dataset_name,last_update + 1)
52
        logging.info("Dataset " + dataset_name + " will be updated in " +
53
                     str(int(config["update-period"]) - last_update_days) +
54
                     "days")
55
        database_record_logs.update_updated(dataset_name, last_update + 1)
56 56
        return False
57 57

  
58 58

  
59

  
60 59
def crawl_data(config):
61 60
    """
62 61
      Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py
......
67 66
    """
68 67
    dataset_name = config["dataset-name"]
69 68

  
70
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl
69
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler",
70
                            globals(), locals(), ['crawl']).crawl
71 71
    crawl_func(config)
72 72

  
73 73
    dataset_name += '/'
......
86 86
    dataset_name = config["dataset-name"]
87 87
    dataset_path = dataset_name + '/'
88 88

  
89
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
90
                                   ['process_file']).process_file
89
    process_file_func = __import__(
90
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
91
        ['process_file']).process_file
91 92

  
92 93
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
93
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
94
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
94
    not_processed_files = folder_processor.list_of_all_new_files(
95
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
96
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
97
                 " not processed files")
95 98

  
96 99
    for not_processed_file in not_processed_files:
97 100
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
98 101
        date_dic = process_file_func(path)
99 102
        csv_utils.export_data_to_csv(path, date_dic)
100
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
103
        database_record_logs.update_ignore_set_processed(
104
            dataset_name, not_processed_file)
101 105

  
102
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
106
    logging.info(dataset_name + " has processed " +
107
                 str(len(not_processed_files)) + " newly crawled files")
103 108

  
104 109

  
105 110
def process_data_crone(config):
......
120 125
    dataset_name = config["dataset-name"]
121 126
    dataset_path = dataset_name + '/'
122 127

  
123
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
124
                                   ['process_file']).process_file
128
    process_file_func = __import__(
129
        PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(),
130
        ['process_file']).process_file
125 131

  
126 132
    ignore_set = database_record_logs.load_ignore_set_processed(dataset_name)
127
    not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path)
128
    logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files")
133
    not_processed_files = folder_processor.list_of_all_new_files(
134
        ignore_set, CRAWLED_DATA_PATH + dataset_path)
135
    logging.info(dataset_name + " found " + str(len(not_processed_files)) +
136
                 " not processed files")
129 137

  
130 138
    for not_processed_file in not_processed_files:
131 139
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
132 140
        date_dic = process_file_func(path)
133 141
        csv_utils.export_data_to_csv(path, date_dic)
134
        database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file)
142
        database_record_logs.update_ignore_set_processed(
143
            dataset_name, not_processed_file)
135 144

  
136
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
145
    logging.info(dataset_name + " has processed " +
146
                 str(len(not_processed_files)) + " newly crawled files")
137 147

  
138 148

  
139 149
def validate_process_data(config):
......
150 160
    """
151 161
    dataset_name = config["dataset-name"]
152 162

  
153
    processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/')
154
    unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set)
163
    processed_devices_set = folder_processor.get_devices_set(
164
        dataset_name, PROCESSED_DATA_PATH + dataset_name + '/')
165
    unknown_devices_set = folder_processor.get_unknown_devices_set(
166
        config, processed_devices_set)
155 167
    unknown_devices_size = len(unknown_devices_set)
156 168

  
157 169
    if unknown_devices_size != 0:
158
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
170
        logging.info("There is " + str(unknown_devices_size) +
171
                     " unknown devices")
159 172
        logging.info("Adding devices to " + dataset_name + " config file")
160
        configure_functions.update_configuration(dataset_name, unknown_devices_set)
173
        configure_functions.update_configuration(dataset_name,
174
                                                 unknown_devices_set)
161 175
        return False
162 176

  
163 177
    for device in config["devices"]:
164 178
        device = config["devices"][device]
165 179
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
166
            logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!")
167
            return False
180
            logging.info(
181
                dataset_name +
182
                " config file contains devices with UNKOWN! values please update them!!"
183
            )
184
            #return False
168 185

  
169 186
    return True
170 187

  
......
184 201

  
185 202
    database_connection = database_loader.create_database_connection()
186 203

  
187
    database_loader.check_or_update_datasets_collection(database_connection,config)
204
    database_loader.check_or_update_datasets_collection(
205
        database_connection, config)
188 206

  
189 207
    changes_in_devices = database_loader.update_devices_collection(config)
190 208

  
191 209
    if changes_in_devices == True:
192
        logging.info(dataset_name + " contains changes in devices configuration. Deleting old data and preparing new")
210
        logging.info(
211
            dataset_name +
212
            " contains changes in devices configuration. Deleting old data and preparing new"
213
        )
193 214
        database_loader.reset_dataset_database(dataset_name)
194 215

  
195 216
    # get all unprocessed files from dataset
196 217
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
197
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
218
    not_loaded_files = folder_processor.list_of_all_new_files(
219
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
198 220

  
199 221
    # load every file
200 222
    for not_loaded_file in not_loaded_files:
201 223
        # load processed data
202
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
224
        processed_data = database_loader.get_data_from_file(
225
            not_loaded_file, config)
203 226
        # load processed data to database
204
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
205
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
227
        database_loader.load_data_to_database(database_connection,
228
                                              dataset_name, processed_data,
229
                                              not_loaded_file)
230
        database_record_logs.update_ignore_set_loaded(dataset_name,
231
                                                      not_loaded_file)
206 232

  
207
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
233
    logging.info(dataset_name + " has loaded to database " +
234
                 str(len(not_loaded_files)) + " newly processed files.")
208 235

  
209 236
    client = pymongo.MongoClient()
210 237
    client.close()
......
231 258

  
232 259
    # get all unprocessed files from dataset
233 260
    ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name)
234
    not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path)
261
    not_loaded_files = folder_processor.list_of_all_new_files(
262
        ignore_set, PROCESSED_DATA_PATH + dataset_path)
235 263

  
236 264
    # load every file
237 265
    for not_loaded_file in not_loaded_files:
238 266
        # load processed data
239
        processed_data = database_loader.get_data_from_file(not_loaded_file, config)
267
        processed_data = database_loader.get_data_from_file(
268
            not_loaded_file, config)
240 269
        # load processed data to database
241
        database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file)
242
        database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file)
270
        database_loader.load_data_to_database(database_connection,
271
                                              dataset_name, processed_data,
272
                                              not_loaded_file)
273
        database_record_logs.update_ignore_set_loaded(dataset_name,
274
                                                      not_loaded_file)
243 275

  
244
    logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.")
276
    logging.info(dataset_name + " has loaded to database " +
277
                 str(len(not_loaded_files)) + " newly processed files.")
245 278

  
246 279
    client = pymongo.MongoClient()
247 280
    client.close()
248 281

  
249 282

  
250

  
251 283
def run_full_pipeline(dataset_name):
252 284
    """
253 285
    Loads config file and starts full pipeline
......
259 291
        dataset_name: name of dataset that has existing configuration file
260 292
    """
261 293
    logging.info("Starting pipeline for dataset " + dataset_name)
262
    print("Zpracovávám dataset " + dataset_name + " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
263
    
294
    print("Zpracovávám dataset " + dataset_name +
295
          " průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
296

  
264 297
    config = configure_functions.load_configuration(dataset_name)
265 298
    crawl_data(config)
266 299
    process_data(config)
......
293 326

  
294 327
        if validation_test:
295 328
            load_data_to_database_crone(config)
296
            

Také k dispozici: Unified diff