Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 523382c1

Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)

Re: #8997 - refactoring, comments

Zobrazit rozdíly:

aswi2021vochomurka/model/Message.py
2 2

  
3 3

  
4 4
class Message(RecordClass):
5
    """
6
    Message wrapper
7
    """
5 8
    topic: str
6 9
    index: int
7 10
    date: str
aswi2021vochomurka/service/file_manager.py
15 15
    ".": "_"})
16 16

  
17 17

  
18
def create_filename(message: Message):
18
def create_filename(message: Message) -> str:
19
    """
20
    Create file name based on message data
21
    :param message: message
22
    :return: filename
23
    """
19 24
    name = "data/" + message.topic.translate(trans) + "/" + message.date + "_" + message.time + ".csv"
20 25
    return name
21 26

  
22 27

  
23 28
class FileManager:
29
    """
30
    Helper class for writing incoming message to files
31
    Each topic has created own instance of this class
32
    """
24 33
    topic: str
25 34
    lastUpdate: float
26 35
    file: TextIO
27 36

  
28 37
    def __init__(self, topic: str, message: Message):
38
        """
39
        Constructing new FileManager will create new file and write first message
40
        :param topic: topic
41
        :param message: message
42
        :except when creating new file fails
43
        """
29 44
        self.topic = topic
30 45
        logging.debug('opening file ' + self.topic)
31 46

  
......
39 54
        self.write(message)
40 55

  
41 56
    def write(self, message: Message):
57
        """
58
        Append message to file
59
        :param message: message
60
        """
42 61
        self.file.write(message.date + ";" + message.time + ";" + str(message.index) + ";" + str(message.value) + "\n")
43 62
        self.lastUpdate = time.time()
44 63

  
45 64
    def close(self):
65
        """
66
        Close file
67
        """
46 68
        logging.debug('closing file ' + self.topic)
47 69
        self.file.flush()
48 70
        self.file.close()
aswi2021vochomurka/service/message_parser.py
6 6

  
7 7

  
8 8
class ParseException(Exception):
9
    """
10
    May be throw when message has incorrect format
11
    """
9 12
    pass
10 13

  
11 14

  
12 15
def parse_mqtt_message(message: MQTTMessage) -> Message:
16
    """
17
    Parse MQTTMessage to Message
18
    :param message: messsage
19
    :return: message
20
    :except: when message has incorrect format
21
    """
13 22
    data = message.payload.decode("utf-8")
14 23
    parts = data.split(";")
15 24
    logging.debug('Parsing message: ' + data + ', parts: ' + str(len(parts)))
aswi2021vochomurka/service/mqtt/mqtt_subscriber.py
7 7

  
8 8

  
9 9
class MQTTSubscriber(Subscriber):
10
    """
11
    MQTT subscriber, implementation of Subscriber over MQTT protocol
12
    """
10 13
    client: mqtt.Client = None
11 14

  
12
    # The callback for when the client receives a CONNACK response from the server.
13 15
    def on_connect(self, client, userdata, flags, rc, properties=None):
16
        """
17
        The callback for when the client receives a CONNACK response from the server.
18
        See: mqtt.Client.on_connect for info about params
19
        """
14 20
        logging.info('Connected with result code ' + str(rc))
15 21
        self.callback.onConnected()
16 22

  
......
20 26
            logging.info('Subscribed to topic: ' + topic)
21 27
            client.subscribe(topic)
22 28

  
23
    # The callback for when a PUBLISH message is received from the server.
24 29
    def on_message(self, client, userdata, message: mqtt.MQTTMessage):
30
        """
31
        The callback for when a PUBLISH message is received from the server.
32
        See: mqtt.Client.on_message for info about params
33
        """
25 34
        try:
26 35
            m = parse_mqtt_message(message)
27 36
            logging.info('Message: ' + str(m))
......
34 43
            pass
35 44

  
36 45
    def on_disconnect(self, client, userdata, rc):
46
        """
47
        The callback for when the client disconnects from the server.
48
        See: mqtt.Client.on_disconnect for info about params
49
        """
37 50
        logging.info('Disconnected')
38 51
        self.callback.onDisconnected()
39 52
        self.stop()
40 53

  
41 54
    def start(self):
55
        """
56
        Start mqtt client
57
        """
42 58
        super().start()
43 59
        client = mqtt.Client()
44 60
        self.client = client
......
66 82
        client.loop_forever()
67 83

  
68 84
    def stop(self):
85
        """
86
        Stop mqtt client
87
        """
69 88
        super().stop()
70 89
        if self.client is not None:
71 90
            logging.info("Disconnecting from broker")
aswi2021vochomurka/service/subscriber.py
11 11

  
12 12

  
13 13
class Subscriber:
14
    """
15
    Subscriber is responsible for establishing communication with broker and notifying
16
    about new message via callback
17
    Subscriber must be started via 'start' method and stopped via 'stop' method
18
    """
14 19
    callback: SubscriberCallback
15 20
    params: SubscriberParams
16 21

  
......
18 23
    files: Dict[str, FileManager] = {}
19 24

  
20 25
    def __init__(self, callback: SubscriberCallback, params: SubscriberParams):
26
        """
27
        Constructor
28
        :param callback: callback
29
        :param params: params
30
        """
21 31
        self.callback = callback
22 32
        self.params = params
23 33

  
24 34
    def start(self):
35
        """
36
        Start subscriber
37
        """
25 38
        # start scheduler to check closed topics
26 39
        self.scheduler = BackgroundScheduler()
27 40
        self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit)
28 41
        self.scheduler.start()
29 42

  
30 43
    def stop(self):
44
        """
45
        Stop subscriber
46
        """
31 47
        if self.scheduler.state != STATE_STOPPED:
32 48
            self.scheduler.shutdown()
33 49
        self.close_files()
34 50

  
35 51
    def close_files(self):
52
        """
53
        Close all open files
54
        """
36 55
        for topic in self.files:
37 56
            self.files.get(topic).close()
38 57
        self.files.clear()
39 58

  
40 59
    def check_closed_topics(self):
60
        """
61
        May be called periodically for checking for expired timeout for closing topic
62
        """
41 63
        t = time.time()
42 64
        for topic in list(self.files):
43 65
            file = self.files.get(topic)
......
47 69
                self.files.pop(topic)
48 70

  
49 71
    def write_to_file(self, message: Message):
72
        """
73
        Write message to file
74
        :param message: message
75
        """
50 76
        if message.topic in self.files:
77
            # file exist, just append message
51 78
            self.files.get(message.topic).write(message)
52 79
        else:
80
            # new message for this topic, create new file
53 81
            fm = FileManager(message.topic, message)
54 82
            self.files[message.topic] = fm
aswi2021vochomurka/service/subscriber_params.py
3 3

  
4 4

  
5 5
class ConnectionParams(RecordClass):
6
    """
7
    Connection params to connect to broker
8
    """
6 9
    host: str
7 10
    port: int
8 11
    timeout: int
9 12

  
10 13

  
11 14
class SubscriberParams(RecordClass):
15
    """
16
    Params for Subscriber
17
    """
12 18
    # list of topics to subscribe
13 19
    topics: List[str]
14 20
    # close limit in seconds

Také k dispozici: Unified diff