Projekt

Obecné

Profil

Stáhnout (7.2 KB) Statistiky
| Větev: | Revize:
1 70e660a8 petrh
from Utilities import FolderProcessor, ConfigureFunctions
2 c8f3051b petrh
from Utilities.Database import DatabaseLoader
3 2d129043 petrh
from Utilities.CSV import CSVutils
4 c8f3051b petrh
5 43697fec petrh
import logging
6
from datetime import date
7
8
9 04a2b5a4 petrh
# Path to crawled data
10 c8f3051b petrh
CRAWLED_DATA_PATH = "CrawledData/"
11 04a2b5a4 petrh
# Path to processed data
12 c8f3051b petrh
PROCESSED_DATA_PATH = "ProcessedData/"
13 34baf808 petrh
# Path to crawler logs
14
CRAWLER_LOGS_PATH = "CrawlerLogs/"
15 04a2b5a4 petrh
# Path to dataset crawler implementations
16 c8f3051b petrh
CRAWLER_LIB_PATH = "DatasetCrawler."
17 04a2b5a4 petrh
# Path to dataset processor implementations
18 c8f3051b petrh
PROCESSOR_LIB_PATH = "DatasetProcessing."
19
20
21 43697fec petrh
#logger
22
logging.basicConfig(filename=CRAWLER_LOGS_PATH + "CommonRecords/" + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
23
                    level=logging.INFO,
24
                    format='%(asctime)s %(message)s'
25
                    )
26
27
28 34baf808 petrh
def check_last_update(config):
29
    """
30
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
31
    representing number of days from last update if number equals
32
    number in confing update period updates it and reset number of
33
    days to zero else increment the number
34
35
    Arguments:
36
        config loaded configuration file of dataset
37
38
    Returns:
39
       True if updating
40
       Else if incementing days from last update
41
    """
42
    dataset_name = config["dataset-name"]
43
44
    with open(CRAWLER_LOGS_PATH + dataset_name + "/" + "updated.txt", "r+") as file:
45
        last_update = file.read()
46
        last_update = int(last_update)
47
        file.seek(0)
48
49
        confing_update_period = int(config["update-period"])
50
51
        if config["update-period"] <= last_update:
52 43697fec petrh
            logging.info("Dataset " + dataset_name + " is being updated today")
53 34baf808 petrh
            file.write("0")
54
            file.truncate()
55
            return True
56
        else:
57
            last_update_days = last_update + 1
58 43697fec petrh
            logging.info("Dataset " + dataset_name + " will be updated in " + str(confing_update_period - last_update_days) + "days")
59 34baf808 petrh
            file.write(str(last_update_days))
60
            file.truncate()
61
            return False
62
63
64
65 c8f3051b petrh
def crawl_data(config):
66 04a2b5a4 petrh
    """
67
      Imports dataset crawler in DatasetCrawler/"dataset_name"Crawler.py
68
      runs crawler.
69 c8f3051b petrh
70 04a2b5a4 petrh
    Args:
71
        config: loaded configuration file of dataset
72
    """
73 c8f3051b petrh
    dataset_name = config["dataset-name"]
74
75 43697fec petrh
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "Crawler", globals(), locals(), ['crawl']).crawl
76
    crawl_func(config)
77 c8f3051b petrh
78
    dataset_name += '/'
79
80
81
def process_data(dataset_name):
82 04a2b5a4 petrh
    """
83
    Goes trough every not processed file(not contained in CrawledData/"dataset_name"/ignore.txt)
84
    Imports dataset processor in DatasetProcessing/"dataset_name"Processor.py
85
    Runs processor on every file
86
    After successful processing updates ignore.txt
87
88
    Args:
89
        dataset_name: name of dataset that has existing configuration file
90
    """
91 c8f3051b petrh
    dataset_path = dataset_name + '/'
92
93 04a2b5a4 petrh
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "Processor", globals(), locals(),
94 c8f3051b petrh
                                   ['process_file']).process_file
95
96
    not_processed_files = FolderProcessor.list_of_all_files(CRAWLED_DATA_PATH + dataset_path)
97 43697fec petrh
    logging.info(dataset_name + " has downloaded " + str(len(not_processed_files)) + " new files")
98 c8f3051b petrh
99
    for not_processed_file in not_processed_files:
100 2d129043 petrh
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
101
        date_dic = process_file_func(path)
102
        CSVutils.export_data_to_csv(path, date_dic)
103 c8f3051b petrh
        FolderProcessor.update_ignore_set(CRAWLED_DATA_PATH + dataset_path, not_processed_file)
104
105 43697fec petrh
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
106
107 c8f3051b petrh
108 70e660a8 petrh
def validate_process_data(config):
109 04a2b5a4 petrh
    """
110
    Function goes through newly processed data and checks theirs status
111
112
    Args:
113
        config: loaded configuration file of dataset
114
115
    Returns:
116
        boolean variable TRUE/FALSE.
117
        Data processed correctly - TRUE
118
        Wrong format or NEW unknown devices - FALSE
119
    """
120 70e660a8 petrh
    processed_devices_set = FolderProcessor.get_devices_set(PROCESSED_DATA_PATH + config["dataset-name"] + '/')
121 04a2b5a4 petrh
    unknown_devices_set = FolderProcessor.get_unknown_devices_set(config, processed_devices_set)
122 70e660a8 petrh
    unknown_devices_size = len(unknown_devices_set)
123
124
    if unknown_devices_size != 0:
125 43697fec petrh
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
126
        logging.info("Adding devices to " + config["dataset-name"] + " config file")
127 04a2b5a4 petrh
        ConfigureFunctions.update_configuration(config["dataset-name"], unknown_devices_set)
128 70e660a8 petrh
        return False
129
130 1187e871 petrh
    for device in config["devices"]:
131
        device = config["devices"][device]
132
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
133 43697fec petrh
            logging.info(config["dataset-name"] + " config file contains devices with UNKOWN! values please update them!!")
134 1187e871 petrh
            return False
135
136
    return True
137
138 70e660a8 petrh
139 c8f3051b petrh
def load_data_to_database(config):
140 04a2b5a4 petrh
    """
141
    Goes trough every not loaded file(not contained in ProcessedData/ignore.txt)
142
    loads data appends coordination from configurations
143
    and exports it into the database
144
    After successful exporting updates ignore.txt
145
146
    Args:
147
        config: loaded configuration file of dataset
148
    """
149 c8f3051b petrh
    dataset_name = config["dataset-name"]
150
    dataset_path = dataset_name + '/'
151
152
    # get all unprocessed files from dataset
153
    not_loaded_files = FolderProcessor.list_of_all_files(PROCESSED_DATA_PATH + dataset_path)
154
155
    # load every file
156
    for not_loaded_file in not_loaded_files:
157
        # load processed data
158 04a2b5a4 petrh
        processed_data = DatabaseLoader.get_data_from_file(not_loaded_file, config)
159 c8f3051b petrh
        # load processed data to database
160
        DatabaseLoader.load_data_to_database(dataset_name, processed_data)
161
        FolderProcessor.update_ignore_set(PROCESSED_DATA_PATH + dataset_path, not_loaded_file)
162
163 43697fec petrh
    logging.info(dataset_name + " has loaded to databse " + str(len(not_loaded_files)) + " newly processed files.")
164
165 c8f3051b petrh
166
def run_full_pipeline(dataset_name):
167 04a2b5a4 petrh
    """
168
    Loads config file and starts full pipeline
169
    -crawl data
170
    -process data
171
    -load data to database
172
173
    Args:
174
        dataset_name: name of dataset that has existing configuration file
175
    """
176 43697fec petrh
    logging.info("Starting pipeline for dataset " + dataset_name)
177
    
178 04a2b5a4 petrh
    config = ConfigureFunctions.load_configuration(dataset_name)
179 c8f3051b petrh
    crawl_data(config)
180
    process_data(config["dataset-name"])
181 70e660a8 petrh
182
    validation_test = validate_process_data(config)
183
184
    if validation_test:
185
        load_data_to_database(config)
186 34baf808 petrh
187
188
189
def run_full_pipeline_crone(dataset_name):
190
    """
191
    Loads config file and starts full pipeline
192
    -crawl data
193
    -process data
194
    -load data to database
195
196
    Args:
197
        dataset_name: name of dataset that has existing configuration file
198
    """
199 43697fec petrh
    logging.info("Starting pipeline for dataset " + dataset_name)
200
201 34baf808 petrh
    config = ConfigureFunctions.load_configuration(dataset_name)
202
    update_test = check_last_update(config)
203
    if update_test:
204
        crawl_data(config)
205
        process_data(config["dataset-name"])
206
207
        validation_test = validate_process_data(config)
208 70e660a8 petrh
209 34baf808 petrh
        if validation_test:
210 43697fec petrh
            load_data_to_database(config)
211