Projekt

Obecné

Profil

« Předchozí | Další » 

Revize f3873472

Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)

Re #8731 - file manager

Zobrazit rozdíly:

aswi2021vochomurka/service/file_manager.py
1
import time
2

  
3
from aswi2021vochomurka.model.Message import Message
4
from typing import TextIO
5

  
6

  
7
class FileManager:
8
    topic: str
9
    lastUpdate: float
10
    file: TextIO
11

  
12
    def __init__(self, topic: str, message: Message):
13
        self.topic = topic
14
        try:
15
            self.file = open(message.time + ".csv", "w+")
16
        except Exception as error:
17
            print(error)
18
        self.file.write("init file\n")
19
        self.lastUpdate = time.time()
20

  
21
    def write(self, message: Message):
22
        self.file.write("test append\n")
23
        self.lastUpdate = time.time()
24

  
25
    def close(self):
26
        self.file.flush()
27
        self.file.close()
aswi2021vochomurka/service/mqtt/mqtt_subscriber.py
2 2

  
3 3
from aswi2021vochomurka.service.message_parser import parse_mqtt_message, ParseException
4 4
from aswi2021vochomurka.service.subscriber import Subscriber
5
from apscheduler.schedulers.background import BackgroundScheduler
6 5

  
7 6

  
8 7
class MQTTSubscriber(Subscriber):
9
    sched = BackgroundScheduler()
10 8

  
11 9
    # The callback for when the client receives a CONNACK response from the server.
12 10
    def on_connect(self, client, userdata, flags, rc, properties=None):
13 11
        print("Connected with result code " + str(rc))
14 12
        self.callback.onConnected()
15 13

  
16
        # start scheduler to check closed topics
17
        self.sched.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit)
18
        self.sched.start()
19

  
20 14
        # Subscribing in on_connect() means that if we lose the connection and
21 15
        # reconnect then subscriptions will be renewed.
22 16
        for topic in self.params.topics:
......
29 23
            m = parse_mqtt_message(message)
30 24
            print(m)
31 25
            self.callback.onMessage(m)
26
            self.write_to_file(m)
32 27
        except ParseException as error:
33 28
            print('invalid message data format')
34 29
            # TODO better reaction on bad format
......
36 31

  
37 32
    def on_disconnect(self, client, userdata, rc):
38 33
        self.callback.onDisconnected()
39
        self.sched.shutdown()
40

  
41
    def check_closed_topics(self):
42
        print('checking closed topics')
43
        pass
34
        self.stop()
44 35

  
45 36
    def start(self):
37
        super().start()
46 38
        client = mqtt.Client()
47 39
        client.on_connect = self.on_connect
48 40
        client.on_message = self.on_message
aswi2021vochomurka/service/subscriber.py
1
import time
2

  
3
from apscheduler.schedulers.background import BackgroundScheduler
4

  
5
from aswi2021vochomurka.model.Message import Message
6
from aswi2021vochomurka.service.file_manager import FileManager
1 7
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback
2 8
from aswi2021vochomurka.service.subscriber_params import SubscriberParams
9
from typing import Dict
3 10

  
4 11

  
5 12
class Subscriber:
6 13
    callback: SubscriberCallback
7 14
    params: SubscriberParams
8 15

  
16
    scheduler = BackgroundScheduler()
17
    files: Dict[str, FileManager] = {}
18

  
9 19
    def __init__(self, callback: SubscriberCallback, params: SubscriberParams):
10 20
        self.callback = callback
11 21
        self.params = params
12 22

  
13
    def start(self): raise NotImplementedError
23
    def start(self):
24
        # start scheduler to check closed topics
25
        self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit)
26
        self.scheduler.start()
27

  
28
    def stop(self):
29
        self.scheduler.shutdown()
30
        self.close_files()
31

  
32
    def close_files(self):
33
        for topic in self.files:
34
            self.files.get(topic).close()
35
        self.files = {}
36

  
37
    def check_closed_topics(self):
38
        t = time.time()
39
        for topic in list(self.files):
40
            file = self.files.get(topic)
41
            if t - file.lastUpdate > self.params.closeLimit:
42
                file.close()
43
                self.files.pop(topic)
44

  
45
    def write_to_file(self, message: Message):
46
        if message.topic in self.files:
47
            self.files.get(message.topic).write(message)
48
        else:
49
            fm = FileManager(message.topic, message)
50
            self.files[message.topic] = fm

Také k dispozici: Unified diff