Projekt

Obecné

Profil

Stáhnout (7.2 KB) Statistiky
| Větev: | Revize:
1
from Utilities import FolderProcessor, ConfigureFunctions
2
from Utilities.Database import DatabaseLoader
3
from Utilities.CSV import CSVutils
4

    
5
import logging
6
from datetime import date
7

    
8

    
9
# Path to crawled data
10
CRAWLED_DATA_PATH = "CrawledData/"
11
# Path to processed data
12
PROCESSED_DATA_PATH = "ProcessedData/"
13
# Path to crawler logs
14
CRAWLER_LOGS_PATH = "CrawlerLogs/"
15
# Path to dataset crawler implementations
16
CRAWLER_LIB_PATH = "DatasetCrawler."
17
# Path to dataset processor implementations
18
PROCESSOR_LIB_PATH = "DatasetProcessing."
19

    
20

    
21
#logger
22
logging.basicConfig(filename=CRAWLER_LOGS_PATH + "CommonRecords/" + 'Applicationlog-' + date.today().strftime("%b-%Y") + '.log',
23
                    level=logging.INFO,
24
                    format='%(asctime)s %(message)s'
25
                    )
26

    
27

    
28
def check_last_update(config):
29
    """
30
    Loads integer from updated.txt in CrawlerLogs/"dataset_name"
31
    representing number of days from last update if number equals
32
    number in confing update period updates it and reset number of
33
    days to zero else increment the number
34

    
35
    Arguments:
36
        config loaded configuration file of dataset
37

    
38
    Returns:
39
       True if updating
40
       Else if incementing days from last update
41
    """
42
    dataset_name = config["dataset-name"]
43

    
44
    with open(CRAWLER_LOGS_PATH + dataset_name + "/" + "updated.txt", "r+") as file:
45
        last_update = file.read()
46
        last_update = int(last_update)
47
        file.seek(0)
48

    
49
        confing_update_period = int(config["update-period"])
50

    
51
        if config["update-period"] <= last_update:
52
            logging.info("Dataset " + dataset_name + " is being updated today")
53
            file.write("0")
54
            file.truncate()
55
            return True
56
        else:
57
            last_update_days = last_update + 1
58
            logging.info("Dataset " + dataset_name + " will be updated in " + str(confing_update_period - last_update_days) + "days")
59
            file.write(str(last_update_days))
60
            file.truncate()
61
            return False
62

    
63

    
64

    
65
def crawl_data(config):
66
    """
67
      Imports dataset crawler in DatasetCrawler/"dataset_name"Crawler.py
68
      runs crawler.
69

    
70
    Args:
71
        config: loaded configuration file of dataset
72
    """
73
    dataset_name = config["dataset-name"]
74

    
75
    crawl_func = __import__(CRAWLER_LIB_PATH + dataset_name + "Crawler", globals(), locals(), ['crawl']).crawl
76
    crawl_func(config)
77

    
78
    dataset_name += '/'
79

    
80

    
81
def process_data(dataset_name):
82
    """
83
    Goes trough every not processed file(not contained in CrawledData/"dataset_name"/ignore.txt)
84
    Imports dataset processor in DatasetProcessing/"dataset_name"Processor.py
85
    Runs processor on every file
86
    After successful processing updates ignore.txt
87

    
88
    Args:
89
        dataset_name: name of dataset that has existing configuration file
90
    """
91
    dataset_path = dataset_name + '/'
92

    
93
    process_file_func = __import__(PROCESSOR_LIB_PATH + dataset_name + "Processor", globals(), locals(),
94
                                   ['process_file']).process_file
95

    
96
    not_processed_files = FolderProcessor.list_of_all_files(CRAWLED_DATA_PATH + dataset_path)
97
    logging.info(dataset_name + " has downloaded " + str(len(not_processed_files)) + " new files")
98

    
99
    for not_processed_file in not_processed_files:
100
        path = CRAWLED_DATA_PATH + dataset_path + not_processed_file
101
        date_dic = process_file_func(path)
102
        CSVutils.export_data_to_csv(path, date_dic)
103
        FolderProcessor.update_ignore_set(CRAWLED_DATA_PATH + dataset_path, not_processed_file)
104

    
105
    logging.info(dataset_name + " has processed " + str(len(not_processed_files)) + " newly crawled files")
106

    
107

    
108
def validate_process_data(config):
109
    """
110
    Function goes through newly processed data and checks theirs status
111

    
112
    Args:
113
        config: loaded configuration file of dataset
114

    
115
    Returns:
116
        boolean variable TRUE/FALSE.
117
        Data processed correctly - TRUE
118
        Wrong format or NEW unknown devices - FALSE
119
    """
120
    processed_devices_set = FolderProcessor.get_devices_set(PROCESSED_DATA_PATH + config["dataset-name"] + '/')
121
    unknown_devices_set = FolderProcessor.get_unknown_devices_set(config, processed_devices_set)
122
    unknown_devices_size = len(unknown_devices_set)
123

    
124
    if unknown_devices_size != 0:
125
        logging.info("There is " + str(unknown_devices_size) + " unknown devices")
126
        logging.info("Adding devices to " + config["dataset-name"] + " config file")
127
        ConfigureFunctions.update_configuration(config["dataset-name"], unknown_devices_set)
128
        return False
129

    
130
    for device in config["devices"]:
131
        device = config["devices"][device]
132
        if device["x"] == "UNKNOWN!" or device["y"] == "UNKNOWN!":
133
            logging.info(config["dataset-name"] + " config file contains devices with UNKOWN! values please update them!!")
134
            return False
135

    
136
    return True
137

    
138

    
139
def load_data_to_database(config):
140
    """
141
    Goes trough every not loaded file(not contained in ProcessedData/ignore.txt)
142
    loads data appends coordination from configurations
143
    and exports it into the database
144
    After successful exporting updates ignore.txt
145

    
146
    Args:
147
        config: loaded configuration file of dataset
148
    """
149
    dataset_name = config["dataset-name"]
150
    dataset_path = dataset_name + '/'
151

    
152
    # get all unprocessed files from dataset
153
    not_loaded_files = FolderProcessor.list_of_all_files(PROCESSED_DATA_PATH + dataset_path)
154

    
155
    # load every file
156
    for not_loaded_file in not_loaded_files:
157
        # load processed data
158
        processed_data = DatabaseLoader.get_data_from_file(not_loaded_file, config)
159
        # load processed data to database
160
        DatabaseLoader.load_data_to_database(dataset_name, processed_data)
161
        FolderProcessor.update_ignore_set(PROCESSED_DATA_PATH + dataset_path, not_loaded_file)
162

    
163
    logging.info(dataset_name + " has loaded to databse " + str(len(not_loaded_files)) + " newly processed files.")
164

    
165

    
166
def run_full_pipeline(dataset_name):
167
    """
168
    Loads config file and starts full pipeline
169
    -crawl data
170
    -process data
171
    -load data to database
172

    
173
    Args:
174
        dataset_name: name of dataset that has existing configuration file
175
    """
176
    logging.info("Starting pipeline for dataset " + dataset_name)
177
    
178
    config = ConfigureFunctions.load_configuration(dataset_name)
179
    crawl_data(config)
180
    process_data(config["dataset-name"])
181

    
182
    validation_test = validate_process_data(config)
183

    
184
    if validation_test:
185
        load_data_to_database(config)
186

    
187

    
188

    
189
def run_full_pipeline_crone(dataset_name):
190
    """
191
    Loads config file and starts full pipeline
192
    -crawl data
193
    -process data
194
    -load data to database
195

    
196
    Args:
197
        dataset_name: name of dataset that has existing configuration file
198
    """
199
    logging.info("Starting pipeline for dataset " + dataset_name)
200

    
201
    config = ConfigureFunctions.load_configuration(dataset_name)
202
    update_test = check_last_update(config)
203
    if update_test:
204
        crawl_data(config)
205
        process_data(config["dataset-name"])
206

    
207
        validation_test = validate_process_data(config)
208

    
209
        if validation_test:
210
            load_data_to_database(config)
211
            
(5-5/14)