Revize 523382c1
Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)
aswi2021vochomurka/model/Message.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
|
4 | 4 |
class Message(RecordClass): |
5 |
""" |
|
6 |
Message wrapper |
|
7 |
""" |
|
5 | 8 |
topic: str |
6 | 9 |
index: int |
7 | 10 |
date: str |
aswi2021vochomurka/service/file_manager.py | ||
---|---|---|
15 | 15 |
".": "_"}) |
16 | 16 |
|
17 | 17 |
|
18 |
def create_filename(message: Message): |
|
18 |
def create_filename(message: Message) -> str: |
|
19 |
""" |
|
20 |
Create file name based on message data |
|
21 |
:param message: message |
|
22 |
:return: filename |
|
23 |
""" |
|
19 | 24 |
name = "data/" + message.topic.translate(trans) + "/" + message.date + "_" + message.time + ".csv" |
20 | 25 |
return name |
21 | 26 |
|
22 | 27 |
|
23 | 28 |
class FileManager: |
29 |
""" |
|
30 |
Helper class for writing incoming message to files |
|
31 |
Each topic has created own instance of this class |
|
32 |
""" |
|
24 | 33 |
topic: str |
25 | 34 |
lastUpdate: float |
26 | 35 |
file: TextIO |
27 | 36 |
|
28 | 37 |
def __init__(self, topic: str, message: Message): |
38 |
""" |
|
39 |
Constructing new FileManager will create new file and write first message |
|
40 |
:param topic: topic |
|
41 |
:param message: message |
|
42 |
:except when creating new file fails |
|
43 |
""" |
|
29 | 44 |
self.topic = topic |
30 | 45 |
logging.debug('opening file ' + self.topic) |
31 | 46 |
|
... | ... | |
39 | 54 |
self.write(message) |
40 | 55 |
|
41 | 56 |
def write(self, message: Message): |
57 |
""" |
|
58 |
Append message to file |
|
59 |
:param message: message |
|
60 |
""" |
|
42 | 61 |
self.file.write(message.date + ";" + message.time + ";" + str(message.index) + ";" + str(message.value) + "\n") |
43 | 62 |
self.lastUpdate = time.time() |
44 | 63 |
|
45 | 64 |
def close(self): |
65 |
""" |
|
66 |
Close file |
|
67 |
""" |
|
46 | 68 |
logging.debug('closing file ' + self.topic) |
47 | 69 |
self.file.flush() |
48 | 70 |
self.file.close() |
aswi2021vochomurka/service/message_parser.py | ||
---|---|---|
6 | 6 |
|
7 | 7 |
|
8 | 8 |
class ParseException(Exception): |
9 |
""" |
|
10 |
May be throw when message has incorrect format |
|
11 |
""" |
|
9 | 12 |
pass |
10 | 13 |
|
11 | 14 |
|
12 | 15 |
def parse_mqtt_message(message: MQTTMessage) -> Message: |
16 |
""" |
|
17 |
Parse MQTTMessage to Message |
|
18 |
:param message: messsage |
|
19 |
:return: message |
|
20 |
:except: when message has incorrect format |
|
21 |
""" |
|
13 | 22 |
data = message.payload.decode("utf-8") |
14 | 23 |
parts = data.split(";") |
15 | 24 |
logging.debug('Parsing message: ' + data + ', parts: ' + str(len(parts))) |
aswi2021vochomurka/service/mqtt/mqtt_subscriber.py | ||
---|---|---|
7 | 7 |
|
8 | 8 |
|
9 | 9 |
class MQTTSubscriber(Subscriber): |
10 |
""" |
|
11 |
MQTT subscriber, implementation of Subscriber over MQTT protocol |
|
12 |
""" |
|
10 | 13 |
client: mqtt.Client = None |
11 | 14 |
|
12 |
# The callback for when the client receives a CONNACK response from the server. |
|
13 | 15 |
def on_connect(self, client, userdata, flags, rc, properties=None): |
16 |
""" |
|
17 |
The callback for when the client receives a CONNACK response from the server. |
|
18 |
See: mqtt.Client.on_connect for info about params |
|
19 |
""" |
|
14 | 20 |
logging.info('Connected with result code ' + str(rc)) |
15 | 21 |
self.callback.onConnected() |
16 | 22 |
|
... | ... | |
20 | 26 |
logging.info('Subscribed to topic: ' + topic) |
21 | 27 |
client.subscribe(topic) |
22 | 28 |
|
23 |
# The callback for when a PUBLISH message is received from the server. |
|
24 | 29 |
def on_message(self, client, userdata, message: mqtt.MQTTMessage): |
30 |
""" |
|
31 |
The callback for when a PUBLISH message is received from the server. |
|
32 |
See: mqtt.Client.on_message for info about params |
|
33 |
""" |
|
25 | 34 |
try: |
26 | 35 |
m = parse_mqtt_message(message) |
27 | 36 |
logging.info('Message: ' + str(m)) |
... | ... | |
34 | 43 |
pass |
35 | 44 |
|
36 | 45 |
def on_disconnect(self, client, userdata, rc): |
46 |
""" |
|
47 |
The callback for when the client disconnects from the server. |
|
48 |
See: mqtt.Client.on_disconnect for info about params |
|
49 |
""" |
|
37 | 50 |
logging.info('Disconnected') |
38 | 51 |
self.callback.onDisconnected() |
39 | 52 |
self.stop() |
40 | 53 |
|
41 | 54 |
def start(self): |
55 |
""" |
|
56 |
Start mqtt client |
|
57 |
""" |
|
42 | 58 |
super().start() |
43 | 59 |
client = mqtt.Client() |
44 | 60 |
self.client = client |
... | ... | |
66 | 82 |
client.loop_forever() |
67 | 83 |
|
68 | 84 |
def stop(self): |
85 |
""" |
|
86 |
Stop mqtt client |
|
87 |
""" |
|
69 | 88 |
super().stop() |
70 | 89 |
if self.client is not None: |
71 | 90 |
logging.info("Disconnecting from broker") |
aswi2021vochomurka/service/subscriber.py | ||
---|---|---|
11 | 11 |
|
12 | 12 |
|
13 | 13 |
class Subscriber: |
14 |
""" |
|
15 |
Subscriber is responsible for establishing communication with broker and notifying |
|
16 |
about new message via callback |
|
17 |
Subscriber must be started via 'start' method and stopped via 'stop' method |
|
18 |
""" |
|
14 | 19 |
callback: SubscriberCallback |
15 | 20 |
params: SubscriberParams |
16 | 21 |
|
... | ... | |
18 | 23 |
files: Dict[str, FileManager] = {} |
19 | 24 |
|
20 | 25 |
def __init__(self, callback: SubscriberCallback, params: SubscriberParams): |
26 |
""" |
|
27 |
Constructor |
|
28 |
:param callback: callback |
|
29 |
:param params: params |
|
30 |
""" |
|
21 | 31 |
self.callback = callback |
22 | 32 |
self.params = params |
23 | 33 |
|
24 | 34 |
def start(self): |
35 |
""" |
|
36 |
Start subscriber |
|
37 |
""" |
|
25 | 38 |
# start scheduler to check closed topics |
26 | 39 |
self.scheduler = BackgroundScheduler() |
27 | 40 |
self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit) |
28 | 41 |
self.scheduler.start() |
29 | 42 |
|
30 | 43 |
def stop(self): |
44 |
""" |
|
45 |
Stop subscriber |
|
46 |
""" |
|
31 | 47 |
if self.scheduler.state != STATE_STOPPED: |
32 | 48 |
self.scheduler.shutdown() |
33 | 49 |
self.close_files() |
34 | 50 |
|
35 | 51 |
def close_files(self): |
52 |
""" |
|
53 |
Close all open files |
|
54 |
""" |
|
36 | 55 |
for topic in self.files: |
37 | 56 |
self.files.get(topic).close() |
38 | 57 |
self.files.clear() |
39 | 58 |
|
40 | 59 |
def check_closed_topics(self): |
60 |
""" |
|
61 |
May be called periodically for checking for expired timeout for closing topic |
|
62 |
""" |
|
41 | 63 |
t = time.time() |
42 | 64 |
for topic in list(self.files): |
43 | 65 |
file = self.files.get(topic) |
... | ... | |
47 | 69 |
self.files.pop(topic) |
48 | 70 |
|
49 | 71 |
def write_to_file(self, message: Message): |
72 |
""" |
|
73 |
Write message to file |
|
74 |
:param message: message |
|
75 |
""" |
|
50 | 76 |
if message.topic in self.files: |
77 |
# file exist, just append message |
|
51 | 78 |
self.files.get(message.topic).write(message) |
52 | 79 |
else: |
80 |
# new message for this topic, create new file |
|
53 | 81 |
fm = FileManager(message.topic, message) |
54 | 82 |
self.files[message.topic] = fm |
aswi2021vochomurka/service/subscriber_params.py | ||
---|---|---|
3 | 3 |
|
4 | 4 |
|
5 | 5 |
class ConnectionParams(RecordClass): |
6 |
""" |
|
7 |
Connection params to connect to broker |
|
8 |
""" |
|
6 | 9 |
host: str |
7 | 10 |
port: int |
8 | 11 |
timeout: int |
9 | 12 |
|
10 | 13 |
|
11 | 14 |
class SubscriberParams(RecordClass): |
15 |
""" |
|
16 |
Params for Subscriber |
|
17 |
""" |
|
12 | 18 |
# list of topics to subscribe |
13 | 19 |
topics: List[str] |
14 | 20 |
# close limit in seconds |
Také k dispozici: Unified diff
Re: #8997 - refactoring, comments