Revize f3873472
Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)
aswi2021vochomurka/service/file_manager.py | ||
---|---|---|
1 |
import time |
|
2 |
|
|
3 |
from aswi2021vochomurka.model.Message import Message |
|
4 |
from typing import TextIO |
|
5 |
|
|
6 |
|
|
7 |
class FileManager: |
|
8 |
topic: str |
|
9 |
lastUpdate: float |
|
10 |
file: TextIO |
|
11 |
|
|
12 |
def __init__(self, topic: str, message: Message): |
|
13 |
self.topic = topic |
|
14 |
try: |
|
15 |
self.file = open(message.time + ".csv", "w+") |
|
16 |
except Exception as error: |
|
17 |
print(error) |
|
18 |
self.file.write("init file\n") |
|
19 |
self.lastUpdate = time.time() |
|
20 |
|
|
21 |
def write(self, message: Message): |
|
22 |
self.file.write("test append\n") |
|
23 |
self.lastUpdate = time.time() |
|
24 |
|
|
25 |
def close(self): |
|
26 |
self.file.flush() |
|
27 |
self.file.close() |
aswi2021vochomurka/service/mqtt/mqtt_subscriber.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
from aswi2021vochomurka.service.message_parser import parse_mqtt_message, ParseException |
4 | 4 |
from aswi2021vochomurka.service.subscriber import Subscriber |
5 |
from apscheduler.schedulers.background import BackgroundScheduler |
|
6 | 5 |
|
7 | 6 |
|
8 | 7 |
class MQTTSubscriber(Subscriber): |
9 |
sched = BackgroundScheduler() |
|
10 | 8 |
|
11 | 9 |
# The callback for when the client receives a CONNACK response from the server. |
12 | 10 |
def on_connect(self, client, userdata, flags, rc, properties=None): |
13 | 11 |
print("Connected with result code " + str(rc)) |
14 | 12 |
self.callback.onConnected() |
15 | 13 |
|
16 |
# start scheduler to check closed topics |
|
17 |
self.sched.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit) |
|
18 |
self.sched.start() |
|
19 |
|
|
20 | 14 |
# Subscribing in on_connect() means that if we lose the connection and |
21 | 15 |
# reconnect then subscriptions will be renewed. |
22 | 16 |
for topic in self.params.topics: |
... | ... | |
29 | 23 |
m = parse_mqtt_message(message) |
30 | 24 |
print(m) |
31 | 25 |
self.callback.onMessage(m) |
26 |
self.write_to_file(m) |
|
32 | 27 |
except ParseException as error: |
33 | 28 |
print('invalid message data format') |
34 | 29 |
# TODO better reaction on bad format |
... | ... | |
36 | 31 |
|
37 | 32 |
def on_disconnect(self, client, userdata, rc): |
38 | 33 |
self.callback.onDisconnected() |
39 |
self.sched.shutdown() |
|
40 |
|
|
41 |
def check_closed_topics(self): |
|
42 |
print('checking closed topics') |
|
43 |
pass |
|
34 |
self.stop() |
|
44 | 35 |
|
45 | 36 |
def start(self): |
37 |
super().start() |
|
46 | 38 |
client = mqtt.Client() |
47 | 39 |
client.on_connect = self.on_connect |
48 | 40 |
client.on_message = self.on_message |
aswi2021vochomurka/service/subscriber.py | ||
---|---|---|
1 |
import time |
|
2 |
|
|
3 |
from apscheduler.schedulers.background import BackgroundScheduler |
|
4 |
|
|
5 |
from aswi2021vochomurka.model.Message import Message |
|
6 |
from aswi2021vochomurka.service.file_manager import FileManager |
|
1 | 7 |
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback |
2 | 8 |
from aswi2021vochomurka.service.subscriber_params import SubscriberParams |
9 |
from typing import Dict |
|
3 | 10 |
|
4 | 11 |
|
5 | 12 |
class Subscriber: |
6 | 13 |
callback: SubscriberCallback |
7 | 14 |
params: SubscriberParams |
8 | 15 |
|
16 |
scheduler = BackgroundScheduler() |
|
17 |
files: Dict[str, FileManager] = {} |
|
18 |
|
|
9 | 19 |
def __init__(self, callback: SubscriberCallback, params: SubscriberParams): |
10 | 20 |
self.callback = callback |
11 | 21 |
self.params = params |
12 | 22 |
|
13 |
def start(self): raise NotImplementedError |
|
23 |
def start(self): |
|
24 |
# start scheduler to check closed topics |
|
25 |
self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit) |
|
26 |
self.scheduler.start() |
|
27 |
|
|
28 |
def stop(self): |
|
29 |
self.scheduler.shutdown() |
|
30 |
self.close_files() |
|
31 |
|
|
32 |
def close_files(self): |
|
33 |
for topic in self.files: |
|
34 |
self.files.get(topic).close() |
|
35 |
self.files = {} |
|
36 |
|
|
37 |
def check_closed_topics(self): |
|
38 |
t = time.time() |
|
39 |
for topic in list(self.files): |
|
40 |
file = self.files.get(topic) |
|
41 |
if t - file.lastUpdate > self.params.closeLimit: |
|
42 |
file.close() |
|
43 |
self.files.pop(topic) |
|
44 |
|
|
45 |
def write_to_file(self, message: Message): |
|
46 |
if message.topic in self.files: |
|
47 |
self.files.get(message.topic).write(message) |
|
48 |
else: |
|
49 |
fm = FileManager(message.topic, message) |
|
50 |
self.files[message.topic] = fm |
Také k dispozici: Unified diff
Re #8731 - file manager