1
|
import paho.mqtt.client as mqtt
|
2
|
|
3
|
from aswi2021vochomurka.service.message_parser import parse_mqtt_message, ParseException
|
4
|
from aswi2021vochomurka.service.subscriber import Subscriber
|
5
|
|
6
|
|
7
|
class MQTTSubscriber(Subscriber):
|
8
|
|
9
|
# The callback for when the client receives a CONNACK response from the server.
|
10
|
def on_connect(self, client, userdata, flags, rc, properties=None):
|
11
|
print("Connected with result code " + str(rc))
|
12
|
self.callback.onConnected()
|
13
|
# Subscribing in on_connect() means that if we lose the connection and
|
14
|
# reconnect then subscriptions will be renewed.
|
15
|
for topic in self.params.topics:
|
16
|
print("Subscribed to topic: " + topic)
|
17
|
client.subscribe(topic)
|
18
|
|
19
|
# The callback for when a PUBLISH message is received from the server.
|
20
|
def on_message(self, client, userdata, message: mqtt.MQTTMessage):
|
21
|
try:
|
22
|
m = parse_mqtt_message(message)
|
23
|
print(m)
|
24
|
self.callback.onMessage(m)
|
25
|
except ParseException as error:
|
26
|
print('invalid message data format')
|
27
|
# TODO better reaction on bad format
|
28
|
pass
|
29
|
|
30
|
def on_disconnect(self, client, userdata, rc):
|
31
|
self.callback.onDisconnected()
|
32
|
|
33
|
def start(self):
|
34
|
client = mqtt.Client()
|
35
|
client.on_connect = self.on_connect
|
36
|
client.on_message = self.on_message
|
37
|
client.on_disconnect = self.on_disconnect
|
38
|
|
39
|
if not self.params.anonymous:
|
40
|
client.tls_set()
|
41
|
client.username_pw_set(self.params.username, self.params.password)
|
42
|
|
43
|
try:
|
44
|
client.connect(
|
45
|
self.params.connection.host,
|
46
|
self.params.connection.port,
|
47
|
self.params.connection.timeout
|
48
|
)
|
49
|
except Exception as error:
|
50
|
self.callback.onError()
|
51
|
return
|
52
|
|
53
|
client.loop_forever()
|