Revize 91b91053
Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)
aswi2021vochomurka/service/message_parser.py | ||
---|---|---|
1 |
from paho.mqtt.client import MQTTMessage |
|
2 |
|
|
3 |
from aswi2021vochomurka.model.Message import Message |
|
4 |
|
|
5 |
|
|
6 |
class ParseException(Exception): |
|
7 |
pass |
|
8 |
|
|
9 |
|
|
10 |
def parse_mqtt_message(message: MQTTMessage) -> Message: |
|
11 |
data = message.payload.decode("utf-8") |
|
12 |
parts = data.split(";") |
|
13 |
if len(parts) != 4: |
|
14 |
raise ParseException |
|
15 |
|
|
16 |
try: |
|
17 |
return Message( |
|
18 |
message.topic, |
|
19 |
int(parts[2]), |
|
20 |
parts[0], |
|
21 |
parts[1], |
|
22 |
float(parts[3]) |
|
23 |
) |
|
24 |
except Exception as error: |
|
25 |
raise ParseException from error |
aswi2021vochomurka/service/mqtt/mqtt_subscriber.py | ||
---|---|---|
1 | 1 |
import paho.mqtt.client as mqtt |
2 | 2 |
|
3 |
from aswi2021vochomurka.service.message_parser import parse_mqtt_message, ParseException |
|
3 | 4 |
from aswi2021vochomurka.service.subscriber import Subscriber |
4 |
from aswi2021vochomurka.model.Message import Message |
|
5 | 5 |
|
6 | 6 |
|
7 | 7 |
class MQTTSubscriber(Subscriber): |
... | ... | |
12 | 12 |
self.callback.onConnected() |
13 | 13 |
# Subscribing in on_connect() means that if we lose the connection and |
14 | 14 |
# reconnect then subscriptions will be renewed. |
15 |
client.subscribe("#") |
|
15 |
for topic in self.params.topics: |
|
16 |
print("Subscribed to topic: " + topic) |
|
17 |
client.subscribe(topic) |
|
16 | 18 |
|
17 | 19 |
# The callback for when a PUBLISH message is received from the server. |
18 |
def on_message(self, client, userdata, message): |
|
19 |
print(message.topic + " " + str(message.payload.decode("utf-8"))) |
|
20 |
m = Message(message.topic, 0, '', '', 1) |
|
21 |
self.callback.onMessage(m) |
|
20 |
def on_message(self, client, userdata, message: mqtt.MQTTMessage): |
|
21 |
try: |
|
22 |
m = parse_mqtt_message(message) |
|
23 |
print(m) |
|
24 |
self.callback.onMessage(m) |
|
25 |
except ParseException as error: |
|
26 |
print('invalid message data format') |
|
27 |
# TODO better reaction on bad format |
|
28 |
pass |
|
22 | 29 |
|
23 | 30 |
def on_disconnect(self, client, userdata, rc): |
24 | 31 |
self.callback.onDisconnected() |
... | ... | |
29 | 36 |
client.on_message = self.on_message |
30 | 37 |
client.on_disconnect = self.on_disconnect |
31 | 38 |
|
39 |
if not self.params.anonymous: |
|
40 |
client.tls_set() |
|
41 |
client.username_pw_set(self.params.username, self.params.password) |
|
42 |
|
|
32 | 43 |
try: |
33 |
client.connect("localhost", 1883, 60) |
|
44 |
client.connect( |
|
45 |
self.params.connection.host, |
|
46 |
self.params.connection.port, |
|
47 |
self.params.connection.timeout |
|
48 |
) |
|
34 | 49 |
except Exception as error: |
35 |
self.error.emit(error)
|
|
50 |
self.callback.onError()
|
|
36 | 51 |
return |
37 | 52 |
|
38 | 53 |
client.loop_forever() |
aswi2021vochomurka/service/subscriber.py | ||
---|---|---|
1 |
from PyQt5.QtCore import pyqtSignal, QObject |
|
2 |
|
|
3 | 1 |
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback |
2 |
from aswi2021vochomurka.service.subscriber_params import SubscriberParams |
|
4 | 3 |
|
5 | 4 |
|
6 | 5 |
class Subscriber: |
7 | 6 |
callback: SubscriberCallback |
7 |
params: SubscriberParams |
|
8 | 8 |
|
9 |
def __init__(self, callback: SubscriberCallback): |
|
9 |
def __init__(self, callback: SubscriberCallback, params: SubscriberParams):
|
|
10 | 10 |
self.callback = callback |
11 |
self.params = params |
|
11 | 12 |
|
12 | 13 |
def start(self): raise NotImplementedError |
aswi2021vochomurka/service/subscriber_callback.py | ||
---|---|---|
3 | 3 |
|
4 | 4 |
class SubscriberCallback: |
5 | 5 |
def onConnected(self): raise NotImplementedError |
6 |
""" called when subscriber successfully connect to broker """ |
|
6 | 7 |
|
7 | 8 |
def onDisconnected(self): raise NotImplementedError |
9 |
""" called when subscriber disconnect from broker """ |
|
8 | 10 |
|
9 | 11 |
def onError(self): raise NotImplementedError |
12 |
""" called when some error occurred """ |
|
10 | 13 |
|
11 | 14 |
def onMessage(self, message: Message): raise NotImplementedError |
15 |
""" called when new message received """ |
|
16 |
|
|
17 |
def onCloseTopic(self, topic: str): raise NotImplementedError |
|
18 |
""" called if no message has been received for a long time for topic """ |
aswi2021vochomurka/service/subscriber_params.py | ||
---|---|---|
1 |
from recordclass import RecordClass |
|
2 |
from typing import List |
|
3 |
|
|
4 |
|
|
5 |
class ConnectionParams(RecordClass): |
|
6 |
host: str |
|
7 |
port: int |
|
8 |
timeout: int |
|
9 |
|
|
10 |
|
|
11 |
class SubscriberParams(RecordClass): |
|
12 |
# list of topics to subscribe |
|
13 |
topics: List[str] |
|
14 |
# close limit in seconds |
|
15 |
closeLimit: int |
|
16 |
# connection params |
|
17 |
connection: ConnectionParams |
|
18 |
# connect anonymously to broker, otherwise provide username and password |
|
19 |
anonymous: bool |
|
20 |
username: str = "" |
|
21 |
password: str = "" |
aswi2021vochomurka/view/main_view.py | ||
---|---|---|
5 | 5 |
from aswi2021vochomurka.service.mqtt.mqtt_subscriber import MQTTSubscriber |
6 | 6 |
from aswi2021vochomurka.service.subscriber import Subscriber |
7 | 7 |
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback |
8 |
from aswi2021vochomurka.service.subscriber_params import SubscriberParams, ConnectionParams |
|
8 | 9 |
|
9 | 10 |
|
10 | 11 |
class Worker(QObject, SubscriberCallback): |
... | ... | |
14 | 15 |
newMessage = pyqtSignal(str) |
15 | 16 |
subscriber: Subscriber = None |
16 | 17 |
|
18 |
params = SubscriberParams( |
|
19 |
["/home/1", "/home/2"], |
|
20 |
120, |
|
21 |
ConnectionParams("localhost", 1883, 60), |
|
22 |
True |
|
23 |
) |
|
24 |
|
|
17 | 25 |
def start(self): |
18 |
self.subscriber = MQTTSubscriber(self) |
|
26 |
self.subscriber = MQTTSubscriber(self, self.params)
|
|
19 | 27 |
self.subscriber.start() |
20 | 28 |
|
21 | 29 |
def onConnected(self): |
... | ... | |
30 | 38 |
def onMessage(self, message: Message): |
31 | 39 |
self.newMessage.emit(message.topic) |
32 | 40 |
|
41 |
def onCloseTopic(self, topic: str): |
|
42 |
pass |
|
43 |
|
|
33 | 44 |
|
34 | 45 |
class MainView(QMainWindow): |
35 | 46 |
worker: Worker = None |
Také k dispozici: Unified diff
Feature/8566 be fe interface