1
|
import paho.mqtt.client as mqtt
|
2
|
|
3
|
from aswi2021vochomurka.service.message_parser import parse_mqtt_message, ParseException
|
4
|
from aswi2021vochomurka.service.subscriber import Subscriber
|
5
|
|
6
|
|
7
|
class MQTTSubscriber(Subscriber):
|
8
|
|
9
|
# The callback for when the client receives a CONNACK response from the server.
|
10
|
def on_connect(self, client, userdata, flags, rc, properties=None):
|
11
|
print("Connected with result code " + str(rc))
|
12
|
self.callback.onConnected()
|
13
|
|
14
|
# Subscribing in on_connect() means that if we lose the connection and
|
15
|
# reconnect then subscriptions will be renewed.
|
16
|
for topic in self.params.topics:
|
17
|
print("Subscribed to topic: " + topic)
|
18
|
client.subscribe(topic)
|
19
|
|
20
|
# The callback for when a PUBLISH message is received from the server.
|
21
|
def on_message(self, client, userdata, message: mqtt.MQTTMessage):
|
22
|
try:
|
23
|
m = parse_mqtt_message(message)
|
24
|
print(m)
|
25
|
self.callback.onMessage(m)
|
26
|
self.write_to_file(m)
|
27
|
except ParseException as error:
|
28
|
print('invalid message data format')
|
29
|
# TODO better reaction on bad format
|
30
|
pass
|
31
|
|
32
|
def on_disconnect(self, client, userdata, rc):
|
33
|
self.callback.onDisconnected()
|
34
|
self.stop()
|
35
|
|
36
|
def start(self):
|
37
|
super().start()
|
38
|
client = mqtt.Client()
|
39
|
client.on_connect = self.on_connect
|
40
|
client.on_message = self.on_message
|
41
|
client.on_disconnect = self.on_disconnect
|
42
|
|
43
|
if not self.params.anonymous:
|
44
|
client.tls_set()
|
45
|
client.username_pw_set(self.params.username, self.params.password)
|
46
|
|
47
|
try:
|
48
|
client.connect(
|
49
|
self.params.connection.host,
|
50
|
self.params.connection.port,
|
51
|
self.params.connection.timeout
|
52
|
)
|
53
|
except Exception as error:
|
54
|
self.callback.onError()
|
55
|
return
|
56
|
|
57
|
client.loop_forever()
|