Projekt

Obecné

Profil

Stáhnout (1.92 KB) Statistiky
| Větev: | Tag: | Revize:
1
import paho.mqtt.client as mqtt
2

    
3
from aswi2021vochomurka.service.message_parser import parse_mqtt_message, ParseException
4
from aswi2021vochomurka.service.subscriber import Subscriber
5

    
6

    
7
class MQTTSubscriber(Subscriber):
8

    
9
    # The callback for when the client receives a CONNACK response from the server.
10
    def on_connect(self, client, userdata, flags, rc, properties=None):
11
        print("Connected with result code " + str(rc))
12
        self.callback.onConnected()
13

    
14
        # Subscribing in on_connect() means that if we lose the connection and
15
        # reconnect then subscriptions will be renewed.
16
        for topic in self.params.topics:
17
            print("Subscribed to topic: " + topic)
18
            client.subscribe(topic)
19

    
20
    # The callback for when a PUBLISH message is received from the server.
21
    def on_message(self, client, userdata, message: mqtt.MQTTMessage):
22
        try:
23
            m = parse_mqtt_message(message)
24
            print(m)
25
            self.callback.onMessage(m)
26
            self.write_to_file(m)
27
        except ParseException as error:
28
            print('invalid message data format')
29
            # TODO better reaction on bad format
30
            pass
31

    
32
    def on_disconnect(self, client, userdata, rc):
33
        self.callback.onDisconnected()
34
        self.stop()
35

    
36
    def start(self):
37
        super().start()
38
        client = mqtt.Client()
39
        client.on_connect = self.on_connect
40
        client.on_message = self.on_message
41
        client.on_disconnect = self.on_disconnect
42

    
43
        if not self.params.anonymous:
44
            client.tls_set()
45
            client.username_pw_set(self.params.username, self.params.password)
46

    
47
        try:
48
            client.connect(
49
                self.params.connection.host,
50
                self.params.connection.port,
51
                self.params.connection.timeout
52
            )
53
        except Exception as error:
54
            self.callback.onError()
55
            return
56

    
57
        client.loop_forever()
    (1-1/1)