Revize 81980e82
Přidáno uživatelem Tomáš Ballák před asi 4 roky(ů)
modules/crawler/pipeline.py | ||
---|---|---|
7 | 7 |
import logging |
8 | 8 |
from datetime import date |
9 | 9 |
|
10 |
|
|
11 | 10 |
# Path to crawled data |
12 | 11 |
CRAWLED_DATA_PATH = "CrawledData/" |
13 | 12 |
# Path to processed data |
... | ... | |
19 | 18 |
# Path to dataset processor implementations |
20 | 19 |
PROCESSOR_LIB_PATH = "DatasetProcessing." |
21 | 20 |
|
22 |
|
|
23 | 21 |
#logger |
24 |
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
|
|
25 |
level=logging.INFO,
|
|
26 |
format='%(asctime)s %(message)s'
|
|
27 |
) |
|
22 |
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + |
|
23 |
date.today().strftime("%b-%Y") + '.log',
|
|
24 |
level=logging.INFO,
|
|
25 |
format='%(asctime)s %(message)s')
|
|
28 | 26 |
|
29 | 27 |
|
30 | 28 |
def check_last_update(config): |
... | ... | |
47 | 45 |
|
48 | 46 |
if config["update-period"] <= last_update: |
49 | 47 |
logging.info("Dataset " + dataset_name + " is being updated today") |
50 |
database_record_logs.update_updated(dataset_name,0) |
|
48 |
database_record_logs.update_updated(dataset_name, 0)
|
|
51 | 49 |
return True |
52 | 50 |
else: |
53 | 51 |
last_update_days = last_update + 1 |
54 |
logging.info("Dataset " + dataset_name + " will be updated in " + str(int(config["update-period"]) - last_update_days) + "days") |
|
55 |
database_record_logs.update_updated(dataset_name,last_update + 1) |
|
52 |
logging.info("Dataset " + dataset_name + " will be updated in " + |
|
53 |
str(int(config["update-period"]) - last_update_days) + |
|
54 |
"days") |
|
55 |
database_record_logs.update_updated(dataset_name, last_update + 1) |
|
56 | 56 |
return False |
57 | 57 |
|
58 | 58 |
|
59 |
|
|
60 | 59 |
def crawl_data(config): |
61 | 60 |
""" |
62 | 61 |
Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py |
... | ... | |
67 | 66 |
""" |
68 | 67 |
dataset_name = config["dataset-name"] |
69 | 68 |
|
70 |
crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", globals(), locals(), ['crawl']).crawl |
|
69 |
crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "_crawler", |
|
70 |
globals(), locals(), ['crawl']).crawl |
|
71 | 71 |
crawl_func(config) |
72 | 72 |
|
73 | 73 |
dataset_name += '/' |
... | ... | |
86 | 86 |
dataset_name = config["dataset-name"] |
87 | 87 |
dataset_path = dataset_name + '/' |
88 | 88 |
|
89 |
process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(), |
|
90 |
['process_file']).process_file |
|
89 |
process_file_func = __import__( |
|
90 |
PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(), |
|
91 |
['process_file']).process_file |
|
91 | 92 |
|
92 | 93 |
ignore_set = database_record_logs.load_ignore_set_processed(dataset_name) |
93 |
not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path) |
|
94 |
logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files") |
|
94 |
not_processed_files = folder_processor.list_of_all_new_files( |
|
95 |
ignore_set, CRAWLED_DATA_PATH + dataset_path) |
|
96 |
logging.info(dataset_name + " found " + str(len(not_processed_files)) + |
|
97 |
" not processed files") |
|
95 | 98 |
|
96 | 99 |
for not_processed_file in not_processed_files: |
97 | 100 |
path = CRAWLED_DATA_PATH + dataset_path + not_processed_file |
98 | 101 |
date_dic = process_file_func(path) |
99 | 102 |
csv_utils.export_data_to_csv(path, date_dic) |
100 |
database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file) |
|
103 |
database_record_logs.update_ignore_set_processed( |
|
104 |
dataset_name, not_processed_file) |
|
101 | 105 |
|
102 |
logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files") |
|
106 |
logging.info(dataset_name + " has processed " + |
|
107 |
str(len(not_processed_files)) + " newly crawled files") |
|
103 | 108 |
|
104 | 109 |
|
105 | 110 |
def process_data_crone(config): |
... | ... | |
120 | 125 |
dataset_name = config["dataset-name"] |
121 | 126 |
dataset_path = dataset_name + '/' |
122 | 127 |
|
123 |
process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(), |
|
124 |
['process_file']).process_file |
|
128 |
process_file_func = __import__( |
|
129 |
PROCESSOR_LIB_PATH + dataset_name + "_processor", globals(), locals(), |
|
130 |
['process_file']).process_file |
|
125 | 131 |
|
126 | 132 |
ignore_set = database_record_logs.load_ignore_set_processed(dataset_name) |
127 |
not_processed_files = folder_processor.list_of_all_new_files(ignore_set,CRAWLED_DATA_PATH + dataset_path) |
|
128 |
logging.info(dataset_name + " found " + str(len(not_processed_files)) + " not processed files") |
|
133 |
not_processed_files = folder_processor.list_of_all_new_files( |
|
134 |
ignore_set, CRAWLED_DATA_PATH + dataset_path) |
|
135 |
logging.info(dataset_name + " found " + str(len(not_processed_files)) + |
|
136 |
" not processed files") |
|
129 | 137 |
|
130 | 138 |
for not_processed_file in not_processed_files: |
131 | 139 |
path = CRAWLED_DATA_PATH + dataset_path + not_processed_file |
132 | 140 |
date_dic = process_file_func(path) |
133 | 141 |
csv_utils.export_data_to_csv(path, date_dic) |
134 |
database_record_logs.update_ignore_set_processed(dataset_name, not_processed_file) |
|
142 |
database_record_logs.update_ignore_set_processed( |
|
143 |
dataset_name, not_processed_file) |
|
135 | 144 |
|
136 |
logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files") |
|
145 |
logging.info(dataset_name + " has processed " + |
|
146 |
str(len(not_processed_files)) + " newly crawled files") |
|
137 | 147 |
|
138 | 148 |
|
139 | 149 |
def validate_process_data(config): |
... | ... | |
150 | 160 |
""" |
151 | 161 |
dataset_name = config["dataset-name"] |
152 | 162 |
|
153 |
processed_devices_set = folder_processor.get_devices_set(dataset_name,PROCESSED_DATA_PATH +dataset_name + '/') |
|
154 |
unknown_devices_set = folder_processor.get_unknown_devices_set(config, processed_devices_set) |
|
163 |
processed_devices_set = folder_processor.get_devices_set( |
|
164 |
dataset_name, PROCESSED_DATA_PATH + dataset_name + '/') |
|
165 |
unknown_devices_set = folder_processor.get_unknown_devices_set( |
|
166 |
config, processed_devices_set) |
|
155 | 167 |
unknown_devices_size = len(unknown_devices_set) |
156 | 168 |
|
157 | 169 |
if unknown_devices_size != 0: |
158 |
logging.info("There is " + str(unknown_devices_size) + " unknown devices") |
|
170 |
logging.info("There is " + str(unknown_devices_size) + |
|
171 |
" unknown devices") |
|
159 | 172 |
logging.info("Adding devices to " + dataset_name + " config file") |
160 |
configure_functions.update_configuration(dataset_name, unknown_devices_set) |
|
173 |
configure_functions.update_configuration(dataset_name, |
|
174 |
unknown_devices_set) |
|
161 | 175 |
return False |
162 | 176 |
|
163 | 177 |
for device in config["devices"]: |
164 | 178 |
device = config["devices"][device] |
165 | 179 |
if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!": |
166 |
logging.info(dataset_name + " config file contains devices with UNKOWN! values please update them!!") |
|
167 |
return False |
|
180 |
logging.info( |
|
181 |
dataset_name + |
|
182 |
" config file contains devices with UNKOWN! values please update them!!" |
|
183 |
) |
|
184 |
#return False |
|
168 | 185 |
|
169 | 186 |
return True |
170 | 187 |
|
... | ... | |
184 | 201 |
|
185 | 202 |
database_connection = database_loader.create_database_connection() |
186 | 203 |
|
187 |
database_loader.check_or_update_datasets_collection(database_connection,config) |
|
204 |
database_loader.check_or_update_datasets_collection( |
|
205 |
database_connection, config) |
|
188 | 206 |
|
189 | 207 |
changes_in_devices = database_loader.update_devices_collection(config) |
190 | 208 |
|
191 | 209 |
if changes_in_devices == True: |
192 |
logging.info(dataset_name + " contains changes in devices configuration. Deleting old data and preparing new") |
|
210 |
logging.info( |
|
211 |
dataset_name + |
|
212 |
" contains changes in devices configuration. Deleting old data and preparing new" |
|
213 |
) |
|
193 | 214 |
database_loader.reset_dataset_database(dataset_name) |
194 | 215 |
|
195 | 216 |
# get all unprocessed files from dataset |
196 | 217 |
ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name) |
197 |
not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path) |
|
218 |
not_loaded_files = folder_processor.list_of_all_new_files( |
|
219 |
ignore_set, PROCESSED_DATA_PATH + dataset_path) |
|
198 | 220 |
|
199 | 221 |
# load every file |
200 | 222 |
for not_loaded_file in not_loaded_files: |
201 | 223 |
# load processed data |
202 |
processed_data = database_loader.get_data_from_file(not_loaded_file, config) |
|
224 |
processed_data = database_loader.get_data_from_file( |
|
225 |
not_loaded_file, config) |
|
203 | 226 |
# load processed data to database |
204 |
database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file) |
|
205 |
database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file) |
|
227 |
database_loader.load_data_to_database(database_connection, |
|
228 |
dataset_name, processed_data, |
|
229 |
not_loaded_file) |
|
230 |
database_record_logs.update_ignore_set_loaded(dataset_name, |
|
231 |
not_loaded_file) |
|
206 | 232 |
|
207 |
logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.") |
|
233 |
logging.info(dataset_name + " has loaded to database " + |
|
234 |
str(len(not_loaded_files)) + " newly processed files.") |
|
208 | 235 |
|
209 | 236 |
client = pymongo.MongoClient() |
210 | 237 |
client.close() |
... | ... | |
231 | 258 |
|
232 | 259 |
# get all unprocessed files from dataset |
233 | 260 |
ignore_set = database_record_logs.load_ignore_set_loaded(dataset_name) |
234 |
not_loaded_files = folder_processor.list_of_all_new_files(ignore_set,PROCESSED_DATA_PATH + dataset_path) |
|
261 |
not_loaded_files = folder_processor.list_of_all_new_files( |
|
262 |
ignore_set, PROCESSED_DATA_PATH + dataset_path) |
|
235 | 263 |
|
236 | 264 |
# load every file |
237 | 265 |
for not_loaded_file in not_loaded_files: |
238 | 266 |
# load processed data |
239 |
processed_data = database_loader.get_data_from_file(not_loaded_file, config) |
|
267 |
processed_data = database_loader.get_data_from_file( |
|
268 |
not_loaded_file, config) |
|
240 | 269 |
# load processed data to database |
241 |
database_loader.load_data_to_database(database_connection, dataset_name, processed_data, not_loaded_file) |
|
242 |
database_record_logs.update_ignore_set_loaded(dataset_name, not_loaded_file) |
|
270 |
database_loader.load_data_to_database(database_connection, |
|
271 |
dataset_name, processed_data, |
|
272 |
not_loaded_file) |
|
273 |
database_record_logs.update_ignore_set_loaded(dataset_name, |
|
274 |
not_loaded_file) |
|
243 | 275 |
|
244 |
logging.info(dataset_name + " has loaded to database " + str(len(not_loaded_files)) + " newly processed files.") |
|
276 |
logging.info(dataset_name + " has loaded to database " + |
|
277 |
str(len(not_loaded_files)) + " newly processed files.") |
|
245 | 278 |
|
246 | 279 |
client = pymongo.MongoClient() |
247 | 280 |
client.close() |
248 | 281 |
|
249 | 282 |
|
250 |
|
|
251 | 283 |
def run_full_pipeline(dataset_name): |
252 | 284 |
""" |
253 | 285 |
Loads config file and starts full pipeline |
... | ... | |
259 | 291 |
dataset_name: name of dataset that has existing configuration file |
260 | 292 |
""" |
261 | 293 |
logging.info("Starting pipeline for dataset " + dataset_name) |
262 |
print("Zpracovávám dataset " + dataset_name + " průběh lze sledovat v logu umístěném v in CrawlerLogs folder") |
|
263 |
|
|
294 |
print("Zpracovávám dataset " + dataset_name + |
|
295 |
" průběh lze sledovat v logu umístěném v in CrawlerLogs folder") |
|
296 |
|
|
264 | 297 |
config = configure_functions.load_configuration(dataset_name) |
265 | 298 |
crawl_data(config) |
266 | 299 |
process_data(config) |
... | ... | |
293 | 326 |
|
294 | 327 |
if validation_test: |
295 | 328 |
load_data_to_database_crone(config) |
296 |
|
Také k dispozici: Unified diff
Re #8160 new dataset