Revize 523382c1
Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)
aswi2021vochomurka/service/subscriber.py | ||
---|---|---|
11 | 11 | |
12 | 12 | |
13 | 13 |
class Subscriber: |
14 |
""" |
|
15 |
Subscriber is responsible for establishing communication with broker and notifying |
|
16 |
about new message via callback |
|
17 |
Subscriber must be started via 'start' method and stopped via 'stop' method |
|
18 |
""" |
|
14 | 19 |
callback: SubscriberCallback |
15 | 20 |
params: SubscriberParams |
16 | 21 | |
... | ... | |
18 | 23 |
files: Dict[str, FileManager] = {} |
19 | 24 | |
20 | 25 |
def __init__(self, callback: SubscriberCallback, params: SubscriberParams): |
26 |
""" |
|
27 |
Constructor |
|
28 |
:param callback: callback |
|
29 |
:param params: params |
|
30 |
""" |
|
21 | 31 |
self.callback = callback |
22 | 32 |
self.params = params |
23 | 33 | |
24 | 34 |
def start(self): |
35 |
""" |
|
36 |
Start subscriber |
|
37 |
""" |
|
25 | 38 |
# start scheduler to check closed topics |
26 | 39 |
self.scheduler = BackgroundScheduler() |
27 | 40 |
self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit) |
28 | 41 |
self.scheduler.start() |
29 | 42 | |
30 | 43 |
def stop(self): |
44 |
""" |
|
45 |
Stop subscriber |
|
46 |
""" |
|
31 | 47 |
if self.scheduler.state != STATE_STOPPED: |
32 | 48 |
self.scheduler.shutdown() |
33 | 49 |
self.close_files() |
34 | 50 | |
35 | 51 |
def close_files(self): |
52 |
""" |
|
53 |
Close all open files |
|
54 |
""" |
|
36 | 55 |
for topic in self.files: |
37 | 56 |
self.files.get(topic).close() |
38 | 57 |
self.files.clear() |
39 | 58 | |
40 | 59 |
def check_closed_topics(self): |
60 |
""" |
|
61 |
May be called periodically for checking for expired timeout for closing topic |
|
62 |
""" |
|
41 | 63 |
t = time.time() |
42 | 64 |
for topic in list(self.files): |
43 | 65 |
file = self.files.get(topic) |
... | ... | |
47 | 69 |
self.files.pop(topic) |
48 | 70 | |
49 | 71 |
def write_to_file(self, message: Message): |
72 |
""" |
|
73 |
Write message to file |
|
74 |
:param message: message |
|
75 |
""" |
|
50 | 76 |
if message.topic in self.files: |
77 |
# file exist, just append message |
|
51 | 78 |
self.files.get(message.topic).write(message) |
52 | 79 |
else: |
80 |
# new message for this topic, create new file |
|
53 | 81 |
fm = FileManager(message.topic, message) |
54 | 82 |
self.files[message.topic] = fm |
Také k dispozici: Unified diff
Re: #8997 - refactoring, comments