Projekt

Obecné

Profil

Stáhnout (2.53 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from typing import Dict
3

    
4
from apscheduler.schedulers.background import BackgroundScheduler
5
from apscheduler.schedulers.base import STATE_STOPPED
6

    
7
from aswi2021vochomurka.model.Message import Message
8
from aswi2021vochomurka.service.file_manager import FileManager
9
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback
10
from aswi2021vochomurka.service.subscriber_params import SubscriberParams
11

    
12

    
13
class Subscriber:
14
    """
15
    Subscriber is responsible for establishing communication with broker and notifying
16
    about new message via callback
17
    Subscriber must be started via 'start' method and stopped via 'stop' method
18
    """
19
    callback: SubscriberCallback
20
    params: SubscriberParams
21

    
22
    scheduler: BackgroundScheduler
23
    files: Dict[str, FileManager] = {}
24

    
25
    def __init__(self, callback: SubscriberCallback, params: SubscriberParams):
26
        """
27
        Constructor
28
        :param callback: callback
29
        :param params: params
30
        """
31
        self.callback = callback
32
        self.params = params
33

    
34
    def start(self):
35
        """
36
        Start subscriber
37
        """
38
        # start scheduler to check closed topics
39
        self.scheduler = BackgroundScheduler()
40
        self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit)
41
        self.scheduler.start()
42

    
43
    def stop(self):
44
        """
45
        Stop subscriber
46
        """
47
        if self.scheduler.state != STATE_STOPPED:
48
            self.scheduler.shutdown()
49
        self.close_files()
50

    
51
    def close_files(self):
52
        """
53
        Close all open files
54
        """
55
        for topic in self.files:
56
            self.files.get(topic).close()
57
        self.files.clear()
58

    
59
    def check_closed_topics(self):
60
        """
61
        May be called periodically for checking for expired timeout for closing topic
62
        """
63
        t = time.time()
64
        for topic in list(self.files):
65
            file = self.files.get(topic)
66
            if t - file.lastUpdate > self.params.closeLimit:
67
                self.callback.onCloseTopic(topic)
68
                file.close()
69
                self.files.pop(topic)
70

    
71
    def write_to_file(self, message: Message):
72
        """
73
        Write message to file
74
        :param message: message
75
        """
76
        if message.topic in self.files:
77
            # file exist, just append message
78
            self.files.get(message.topic).write(message)
79
        else:
80
            # new message for this topic, create new file
81
            fm = FileManager(message.topic, message)
82
            self.files[message.topic] = fm
(4-4/6)