1
|
import time
|
2
|
from typing import Dict
|
3
|
|
4
|
from apscheduler.schedulers.background import BackgroundScheduler
|
5
|
from apscheduler.schedulers.base import STATE_STOPPED
|
6
|
|
7
|
from aswi2021vochomurka.model.Message import Message
|
8
|
from aswi2021vochomurka.service.file_manager import FileManager
|
9
|
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback
|
10
|
from aswi2021vochomurka.service.subscriber_params import SubscriberParams
|
11
|
|
12
|
|
13
|
class Subscriber:
|
14
|
"""
|
15
|
Subscriber is responsible for establishing communication with broker and notifying
|
16
|
about new message via callback
|
17
|
Subscriber must be started via 'start' method and stopped via 'stop' method
|
18
|
"""
|
19
|
callback: SubscriberCallback
|
20
|
params: SubscriberParams
|
21
|
|
22
|
scheduler: BackgroundScheduler
|
23
|
files: Dict[str, FileManager] = {}
|
24
|
|
25
|
def __init__(self, callback: SubscriberCallback, params: SubscriberParams):
|
26
|
"""
|
27
|
Constructor
|
28
|
:param callback: callback
|
29
|
:param params: params
|
30
|
"""
|
31
|
self.callback = callback
|
32
|
self.params = params
|
33
|
|
34
|
def start(self):
|
35
|
"""
|
36
|
Start subscriber
|
37
|
"""
|
38
|
# start scheduler to check closed topics
|
39
|
self.scheduler = BackgroundScheduler()
|
40
|
self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit)
|
41
|
self.scheduler.start()
|
42
|
|
43
|
def stop(self):
|
44
|
"""
|
45
|
Stop subscriber
|
46
|
"""
|
47
|
if self.scheduler.state != STATE_STOPPED:
|
48
|
self.scheduler.shutdown()
|
49
|
self.close_files()
|
50
|
|
51
|
def close_files(self):
|
52
|
"""
|
53
|
Close all open files
|
54
|
"""
|
55
|
for topic in self.files:
|
56
|
self.files.get(topic).close()
|
57
|
self.files.clear()
|
58
|
|
59
|
def check_closed_topics(self):
|
60
|
"""
|
61
|
May be called periodically for checking for expired timeout for closing topic
|
62
|
"""
|
63
|
t = time.time()
|
64
|
for topic in list(self.files):
|
65
|
file = self.files.get(topic)
|
66
|
if t - file.lastUpdate > self.params.closeLimit:
|
67
|
self.callback.onCloseTopic(topic)
|
68
|
file.close()
|
69
|
self.files.pop(topic)
|
70
|
|
71
|
def write_to_file(self, message: Message):
|
72
|
"""
|
73
|
Write message to file
|
74
|
:param message: message
|
75
|
"""
|
76
|
if message.topic in self.files:
|
77
|
# file exist, just append message
|
78
|
self.files.get(message.topic).write(message)
|
79
|
else:
|
80
|
# new message for this topic, create new file
|
81
|
fm = FileManager(message.topic, message)
|
82
|
self.files[message.topic] = fm
|