Projekt

Obecné

Profil

Stáhnout (7.08 KB) Statistiky
| Větev: | Revize:
1 70e660a8 petrh
from Utilities import FolderProcessor, ConfigureFunctions
2 c8f3051b petrh
from Utilities.Database import DatabaseLoader
3
4 43697fec petrh
import logging
5
from datetime import date
6
7
8 04a2b5a4 petrh
# Path to crawled data
9 c8f3051b petrh
CRAWLED_DATA_PATH = "CrawledData/"
10 04a2b5a4 petrh
# Path to processed data
11 c8f3051b petrh
PROCESSED_DATA_PATH = "ProcessedData/"
12 34baf808 petrh
# Path to crawler logs
13
CRAWLER_LOGS_PATH = "CrawlerLogs/"
14 04a2b5a4 petrh
# Path to dataset crawler implementations
15 c8f3051b petrh
CRAWLER_LIB_PATH = "DatasetCrawler."
16 04a2b5a4 petrh
# Path to dataset processor implementations
17 c8f3051b petrh
PROCESSOR_LIB_PATH = "DatasetProcessing."
18
19
20 43697fec petrh
#logger
21
logging.basicConfig(filename=CRAWLER_LOGS_PATH + "CommonRecords/" + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
22
                    level=logging.INFO,
23
                    format='%(asctime)s %(message)s'
24
                    )
25
26
27 34baf808 petrh
def check_last_update(config):
28
    """
29
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
30
    representing number of days from last update if number equals
31
    number in confing update period updates it and reset number of
32
    days to zero else increment the number
33
34
    Arguments:
35
        config loaded configuration file of dataset
36
37
    Returns:
38
       True if updating
39
       Else if incementing days from last update
40
    """
41
    dataset_name = config["dataset-name"]
42
43
    with open(CRAWLER_LOGS_PATH + dataset_name + "/" + "updated.txt", "r+") as file:
44
        last_update = file.read()
45
        last_update = int(last_update)
46
        file.seek(0)
47
48
        confing_update_period = int(config["update-period"])
49
50
        if config["update-period"] <= last_update:
51 43697fec petrh
            logging.info("Dataset " + dataset_name + " is being updated today")
52 34baf808 petrh
            file.write("0")
53
            file.truncate()
54
            return True
55
        else:
56
            last_update_days = last_update + 1
57 43697fec petrh
            logging.info("Dataset " + dataset_name + " will be updated in " + str(confing_update_period - last_update_days) + "days")
58 34baf808 petrh
            file.write(str(last_update_days))
59
            file.truncate()
60
            return False
61
62
63
64 c8f3051b petrh
def crawl_data(config):
65 04a2b5a4 petrh
    """
66
      Imports dataset crawler in DatasetCrawler/"dataset_name"Crawler.py
67
      runs crawler.
68 c8f3051b petrh
69 04a2b5a4 petrh
    Args:
70
        config: loaded configuration file of dataset
71
    """
72 c8f3051b petrh
    dataset_name = config["dataset-name"]
73
74 43697fec petrh
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "Crawler", globals(), locals(), ['crawl']).crawl
75
    crawl_func(config)
76 c8f3051b petrh
77
    dataset_name += '/'
78
79
80
def process_data(dataset_name):
81 04a2b5a4 petrh
    """
82
    Goes trough every not processed file(not contained in CrawledData/"dataset_name"/ignore.txt)
83
    Imports dataset processor in DatasetProcessing/"dataset_name"Processor.py
84
    Runs processor on every file
85
    After successful processing updates ignore.txt
86
87
    Args:
88
        dataset_name: name of dataset that has existing configuration file
89
    """
90 c8f3051b petrh
    dataset_path = dataset_name + '/'
91
92 04a2b5a4 petrh
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "Processor", globals(), locals(),
93 c8f3051b petrh
                                   ['process_file']).process_file
94
95
    not_processed_files = FolderProcessor.list_of_all_files(CRAWLED_DATA_PATH + dataset_path)
96 43697fec petrh
    logging.info(dataset_name + " has downloaded " + str(len(not_processed_files)) + " new files")
97 c8f3051b petrh
98
    for not_processed_file in not_processed_files:
99
        process_file_func(CRAWLED_DATA_PATH + dataset_path + not_processed_file)
100
        FolderProcessor.update_ignore_set(CRAWLED_DATA_PATH + dataset_path, not_processed_file)
101
102 43697fec petrh
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
103
104 c8f3051b petrh
105 70e660a8 petrh
def validate_process_data(config):
106 04a2b5a4 petrh
    """
107
    Function goes through newly processed data and checks theirs status
108
109
    Args:
110
        config: loaded configuration file of dataset
111
112
    Returns:
113
        boolean variable TRUE/FALSE.
114
        Data processed correctly - TRUE
115
        Wrong format or NEW unknown devices - FALSE
116
    """
117 70e660a8 petrh
    processed_devices_set = FolderProcessor.get_devices_set(PROCESSED_DATA_PATH + config["dataset-name"] + '/')
118 04a2b5a4 petrh
    unknown_devices_set = FolderProcessor.get_unknown_devices_set(config, processed_devices_set)
119 70e660a8 petrh
    unknown_devices_size = len(unknown_devices_set)
120
121
    if unknown_devices_size != 0:
122 43697fec petrh
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
123
        logging.info("Adding devices to " + config["dataset-name"] + " config file")
124 04a2b5a4 petrh
        ConfigureFunctions.update_configuration(config["dataset-name"], unknown_devices_set)
125 70e660a8 petrh
        return False
126
127 1187e871 petrh
    for device in config["devices"]:
128
        device = config["devices"][device]
129
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
130 43697fec petrh
            logging.info(config["dataset-name"] + " config file contains devices with UNKOWN! values please update them!!")
131 1187e871 petrh
            return False
132
133
    return True
134
135 70e660a8 petrh
136 c8f3051b petrh
def load_data_to_database(config):
137 04a2b5a4 petrh
    """
138
    Goes trough every not loaded file(not contained in ProcessedData/ignore.txt)
139
    loads data appends coordination from configurations
140
    and exports it into the database
141
    After successful exporting updates ignore.txt
142
143
    Args:
144
        config: loaded configuration file of dataset
145
    """
146 c8f3051b petrh
    dataset_name = config["dataset-name"]
147
    dataset_path = dataset_name + '/'
148
149
    # get all unprocessed files from dataset
150
    not_loaded_files = FolderProcessor.list_of_all_files(PROCESSED_DATA_PATH + dataset_path)
151
152
    # load every file
153
    for not_loaded_file in not_loaded_files:
154
        # load processed data
155 04a2b5a4 petrh
        processed_data = DatabaseLoader.get_data_from_file(not_loaded_file, config)
156 c8f3051b petrh
        # load processed data to database
157
        DatabaseLoader.load_data_to_database(dataset_name, processed_data)
158
        FolderProcessor.update_ignore_set(PROCESSED_DATA_PATH + dataset_path, not_loaded_file)
159
160 43697fec petrh
    logging.info(dataset_name + " has loaded to databse " + str(len(not_loaded_files)) + " newly processed files.")
161
162 c8f3051b petrh
163
def run_full_pipeline(dataset_name):
164 04a2b5a4 petrh
    """
165
    Loads config file and starts full pipeline
166
    -crawl data
167
    -process data
168
    -load data to database
169
170
    Args:
171
        dataset_name: name of dataset that has existing configuration file
172
    """
173 43697fec petrh
    logging.info("Starting pipeline for dataset " + dataset_name)
174
    
175 04a2b5a4 petrh
    config = ConfigureFunctions.load_configuration(dataset_name)
176 c8f3051b petrh
    crawl_data(config)
177
    process_data(config["dataset-name"])
178 70e660a8 petrh
179
    validation_test = validate_process_data(config)
180
181
    if validation_test:
182
        load_data_to_database(config)
183 34baf808 petrh
184
185
186
def run_full_pipeline_crone(dataset_name):
187
    """
188
    Loads config file and starts full pipeline
189
    -crawl data
190
    -process data
191
    -load data to database
192
193
    Args:
194
        dataset_name: name of dataset that has existing configuration file
195
    """
196 43697fec petrh
    logging.info("Starting pipeline for dataset " + dataset_name)
197
198 34baf808 petrh
    config = ConfigureFunctions.load_configuration(dataset_name)
199
    update_test = check_last_update(config)
200
    if update_test:
201
        crawl_data(config)
202
        process_data(config["dataset-name"])
203
204
        validation_test = validate_process_data(config)
205 70e660a8 petrh
206 34baf808 petrh
        if validation_test:
207 43697fec petrh
            load_data_to_database(config)
208