Revize af7609b5
Přidáno uživatelem Tomáš Ballák před více než 3 roky(ů)
modules/crawler/pipeline.py | ||
---|---|---|
1 | 1 |
from Utilities import folder_processor, configure_functions |
2 | 2 |
from Utilities.Database import database_loader, database_record_logs |
3 | 3 |
from Utilities.CSV import csv_utils |
4 |
from shared_types import ConfigType |
|
4 | 5 |
import os |
5 | 6 |
import pymongo |
6 | 7 |
|
... | ... | |
20 | 21 |
|
21 | 22 |
#logger |
22 | 23 |
logging.basicConfig(filename=CRAWLER_LOGS_PATH + 'Applicationlog-' + |
23 |
date.today().strftime("%b-%Y") + '.log',
|
|
24 |
level=logging.INFO,
|
|
25 |
format='%(asctime)s %(message)s')
|
|
24 |
date.today().strftime("%b-%Y") + '.log', |
|
25 |
level=logging.INFO, |
|
26 |
format='%(asctime)s %(message)s') |
|
26 | 27 |
|
27 | 28 |
|
28 |
def check_last_update(config):
|
|
29 |
def check_last_update(config: ConfigType) -> bool:
|
|
29 | 30 |
""" |
30 | 31 |
Loads integer from updated.txt in CrawlerLogs/"dataset_name" |
31 | 32 |
representing number of days from last update if number equals |
... | ... | |
56 | 57 |
return False |
57 | 58 |
|
58 | 59 |
|
59 |
def crawl_data(config):
|
|
60 |
def crawl_data(config: ConfigType) -> None:
|
|
60 | 61 |
""" |
61 | 62 |
Imports dataset crawler in DatasetCrawler/"dataset_name"_crawler.py |
62 | 63 |
runs crawler. |
... | ... | |
73 | 74 |
dataset_name += '/' |
74 | 75 |
|
75 | 76 |
|
76 |
def process_data(config):
|
|
77 |
def process_data(config: ConfigType) -> None:
|
|
77 | 78 |
""" |
78 | 79 |
Goes trough every not processed file(list of processed files is saved in databse) |
79 | 80 |
Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py |
... | ... | |
100 | 101 |
path = CRAWLED_DATA_PATH + dataset_path + not_processed_file |
101 | 102 |
date_dic = process_file_func(path) |
102 | 103 |
csv_utils.export_data_to_csv(path, date_dic) |
104 |
print("Vytvářím: " + not_processed_file) |
|
103 | 105 |
database_record_logs.update_ignore_set_processed( |
104 | 106 |
dataset_name, not_processed_file) |
105 | 107 |
|
... | ... | |
107 | 109 |
str(len(not_processed_files)) + " newly crawled files") |
108 | 110 |
|
109 | 111 |
|
110 |
def process_data_crone(config):
|
|
112 |
def process_data_crone(config: ConfigType) -> None:
|
|
111 | 113 |
""" |
112 | 114 |
Goes trough every not processed file(list of processed files is saved in database) |
113 | 115 |
Imports dataset processor in DatasetProcessing/"dataset_name"_processor.py |
... | ... | |
146 | 148 |
str(len(not_processed_files)) + " newly crawled files") |
147 | 149 |
|
148 | 150 |
|
149 |
def validate_process_data(config):
|
|
151 |
def validate_process_data(config: ConfigType) -> bool:
|
|
150 | 152 |
""" |
151 | 153 |
Function goes through newly processed data and checks theirs status |
152 | 154 |
|
... | ... | |
186 | 188 |
return True |
187 | 189 |
|
188 | 190 |
|
189 |
def load_data_to_database(config):
|
|
191 |
def load_data_to_database(config: ConfigType) -> None:
|
|
190 | 192 |
""" |
191 | 193 |
Goes trough every not loaded file(list of loaded files is saved in database) |
192 | 194 |
loads data appends coordination from configurations |
... | ... | |
207 | 209 |
changes_in_devices = database_loader.update_devices_collection(config) |
208 | 210 |
|
209 | 211 |
if changes_in_devices == True: |
210 |
logging.info(
|
|
211 |
dataset_name +
|
|
212 |
" contains changes in devices configuration. Deleting old data and preparing new"
|
|
213 |
) |
|
212 |
logg_string = dataset_name + " contains changes in devices configuration. Deleting old data and preparing new"
|
|
213 |
logg_string_cs = dataset_name + " obsahuje změny v konfiguračním souboru. Probíha odstraňování starých dat a připravení nových."
|
|
214 |
logging.info(logg_string)
|
|
215 |
print(logg_string_cs)
|
|
214 | 216 |
database_loader.reset_dataset_database(dataset_name) |
215 | 217 |
|
216 | 218 |
# get all unprocessed files from dataset |
... | ... | |
230 | 232 |
database_record_logs.update_ignore_set_loaded(dataset_name, |
231 | 233 |
not_loaded_file) |
232 | 234 |
|
233 |
logging.info(dataset_name + " has loaded to database " + |
|
234 |
str(len(not_loaded_files)) + " newly processed files.") |
|
235 |
logg_string = dataset_name + " has loaded to database " + str( |
|
236 |
len(not_loaded_files)) + " newly processed files." |
|
237 |
logg_string_cs = dataset_name + " načetl " + str( |
|
238 |
len(not_loaded_files)) + " nových zpracovaných souborů \n" |
|
239 |
|
|
240 |
logging.info(logg_string) |
|
241 |
print(logg_string_cs) |
|
235 | 242 |
|
236 | 243 |
client = pymongo.MongoClient() |
237 | 244 |
client.close() |
238 | 245 |
|
239 | 246 |
|
240 |
def load_data_to_database_crone(config):
|
|
247 |
def load_data_to_database_crone(config: ConfigType) -> None:
|
|
241 | 248 |
""" |
242 | 249 |
Goes trough every not loaded file(list of loaded files is saved in database) |
243 | 250 |
loads data appends coordination from configurations |
... | ... | |
280 | 287 |
client.close() |
281 | 288 |
|
282 | 289 |
|
283 |
def run_full_pipeline(dataset_name):
|
|
290 |
def run_full_pipeline(dataset_name: str) -> None:
|
|
284 | 291 |
""" |
285 | 292 |
Loads config file and starts full pipeline |
286 | 293 |
-crawl data |
... | ... | |
292 | 299 |
""" |
293 | 300 |
logging.info("Starting pipeline for dataset " + dataset_name) |
294 | 301 |
print("Zpracovávám dataset " + dataset_name + |
295 |
" průběh lze sledovat v logu umístěném v in CrawlerLogs folder")
|
|
302 |
", průběh lze sledovat v logu umístěném v adresáři CrawlerLogs")
|
|
296 | 303 |
|
297 | 304 |
config = configure_functions.load_configuration(dataset_name) |
298 | 305 |
crawl_data(config) |
... | ... | |
304 | 311 |
load_data_to_database(config) |
305 | 312 |
|
306 | 313 |
|
307 |
def run_full_pipeline_crone(dataset_name):
|
|
314 |
def run_full_pipeline_crone(dataset_name: str) -> None:
|
|
308 | 315 |
""" |
309 | 316 |
Loads config file and starts full pipeline |
310 | 317 |
-crawl data |
Také k dispozici: Unified diff
Re #8193 - refactoring crawler