Revize be89639f
Přidáno uživatelem Martin Forejt před téměř 4 roky(ů)
aswi2021vochomurka/service/mqtt/mqtt_subscriber.py | ||
---|---|---|
7 | 7 |
|
8 | 8 |
|
9 | 9 |
class MQTTSubscriber(Subscriber): |
10 |
client: mqtt.Client = None |
|
10 | 11 |
|
11 | 12 |
# The callback for when the client receives a CONNACK response from the server. |
12 | 13 |
def on_connect(self, client, userdata, flags, rc, properties=None): |
... | ... | |
40 | 41 |
def start(self): |
41 | 42 |
super().start() |
42 | 43 |
client = mqtt.Client() |
44 |
self.client = client |
|
43 | 45 |
client.on_connect = self.on_connect |
44 | 46 |
client.on_message = self.on_message |
45 | 47 |
client.on_disconnect = self.on_disconnect |
... | ... | |
47 | 49 |
|
48 | 50 |
if not self.params.anonymous: |
49 | 51 |
logging.info('Using credentials, username=' + self.params.username + ', password=' + self.params.password) |
50 |
client.tls_set() |
|
51 | 52 |
client.username_pw_set(self.params.username, self.params.password) |
52 | 53 |
|
53 | 54 |
try: |
... | ... | |
63 | 64 |
return |
64 | 65 |
|
65 | 66 |
client.loop_forever() |
67 |
|
|
68 |
def stop(self): |
|
69 |
super().stop() |
|
70 |
if self.client is not None: |
|
71 |
logging.info("Disconnecting from broker") |
|
72 |
client = self.client |
|
73 |
self.client = None |
|
74 |
client.disconnect() |
aswi2021vochomurka/service/subscriber.py | ||
---|---|---|
1 | 1 |
import time |
2 |
from typing import Dict |
|
2 | 3 |
|
3 | 4 |
from apscheduler.schedulers.background import BackgroundScheduler |
5 |
from apscheduler.schedulers.base import STATE_STOPPED |
|
4 | 6 |
|
5 | 7 |
from aswi2021vochomurka.model.Message import Message |
6 | 8 |
from aswi2021vochomurka.service.file_manager import FileManager |
7 | 9 |
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback |
8 | 10 |
from aswi2021vochomurka.service.subscriber_params import SubscriberParams |
9 |
from typing import Dict |
|
10 | 11 |
|
11 | 12 |
|
12 | 13 |
class Subscriber: |
13 | 14 |
callback: SubscriberCallback |
14 | 15 |
params: SubscriberParams |
15 | 16 |
|
16 |
scheduler = BackgroundScheduler()
|
|
17 |
scheduler: BackgroundScheduler
|
|
17 | 18 |
files: Dict[str, FileManager] = {} |
18 | 19 |
|
19 | 20 |
def __init__(self, callback: SubscriberCallback, params: SubscriberParams): |
... | ... | |
22 | 23 |
|
23 | 24 |
def start(self): |
24 | 25 |
# start scheduler to check closed topics |
26 |
self.scheduler = BackgroundScheduler() |
|
25 | 27 |
self.scheduler.add_job(self.check_closed_topics, 'interval', seconds=self.params.closeLimit) |
26 | 28 |
self.scheduler.start() |
27 | 29 |
|
28 | 30 |
def stop(self): |
29 |
self.scheduler.shutdown() |
|
31 |
if self.scheduler.state != STATE_STOPPED: |
|
32 |
self.scheduler.shutdown() |
|
30 | 33 |
self.close_files() |
31 | 34 |
|
32 | 35 |
def close_files(self): |
33 | 36 |
for topic in self.files: |
34 | 37 |
self.files.get(topic).close() |
35 |
self.files = {}
|
|
38 |
self.files.clear()
|
|
36 | 39 |
|
37 | 40 |
def check_closed_topics(self): |
38 | 41 |
t = time.time() |
aswi2021vochomurka/view/main_view.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import matplotlib.pyplot as plt |
4 | 4 |
from PyQt5 import QtCore |
5 |
from PyQt5.QtCore import QSize, QThread, QObject, pyqtSignal |
|
6 |
from PyQt5.QtWidgets import QDialog, QPushButton, QVBoxLayout |
|
7 |
from PyQt5.QtWidgets import QHBoxLayout, QGridLayout, QScrollArea, QWidget |
|
5 |
from PyQt5 import QtGui |
|
6 |
from PyQt5.QtCore import QSize, QThread, QObject, pyqtSignal, QSettings |
|
7 |
from PyQt5.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QScrollArea, QGridLayout |
|
8 |
from PyQt5.QtWidgets import QMenuBar, QAction, QPushButton |
|
8 | 9 |
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas |
9 | 10 |
|
10 | 11 |
from aswi2021vochomurka.model.Message import Message |
... | ... | |
13 | 14 |
from aswi2021vochomurka.service.subscriber_callback import SubscriberCallback |
14 | 15 |
from aswi2021vochomurka.service.subscriber_params import SubscriberParams, ConnectionParams |
15 | 16 |
from aswi2021vochomurka.view.logger_view import LoggerView |
17 |
from aswi2021vochomurka.view.settings import SettingsDialog, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_KEEPALIVE, \ |
|
18 |
DEFAULT_ANONYMOUS, DEFAULT_USERNAME, DEFAULT_TIMEOUT, DEFAULT_TOPICS |
|
16 | 19 |
|
17 | 20 |
|
18 | 21 |
class Worker(QObject, SubscriberCallback): |
... | ... | |
21 | 24 |
error = pyqtSignal(Exception) |
22 | 25 |
newMessage = pyqtSignal(Message) |
23 | 26 |
subscriber: Subscriber = None |
27 |
params: SubscriberParams |
|
24 | 28 |
|
25 |
params = SubscriberParams( |
|
26 |
["/home/1", "/home/2", "/home/3", "/home/4", "/home/5", "/home/6", "/home/7", "/home/8"], |
|
27 |
10, |
|
28 |
ConnectionParams("localhost", 1883, 60), |
|
29 |
True |
|
30 |
) |
|
29 |
def __init__(self, params: SubscriberParams) -> None: |
|
30 |
super().__init__() |
|
31 |
self.params = params |
|
31 | 32 |
|
32 | 33 |
def start(self): |
33 | 34 |
self.subscriber = MQTTSubscriber(self, self.params) |
34 | 35 |
self.subscriber.start() |
35 | 36 |
|
37 |
def stop(self): |
|
38 |
self.subscriber.stop() |
|
39 |
|
|
36 | 40 |
def onConnected(self): |
37 | 41 |
self.connected.emit() |
38 | 42 |
|
... | ... | |
40 | 44 |
self.disconnected.emit() |
41 | 45 |
|
42 | 46 |
def onError(self): |
43 |
self.error.emit()
|
|
47 |
pass
|
|
44 | 48 |
|
45 | 49 |
def onMessage(self, message: Message): |
46 | 50 |
self.newMessage.emit(message) |
... | ... | |
50 | 54 |
pass |
51 | 55 |
|
52 | 56 |
|
53 |
class MainView(QDialog):
|
|
57 |
class MainView(QMainWindow):
|
|
54 | 58 |
worker: Worker = None |
55 | 59 |
workerThread: QThread = None |
56 | 60 |
|
... | ... | |
70 | 74 |
|
71 | 75 |
# self.setMinimumSize(QSize(440, 240)) |
72 | 76 |
self.setMinimumSize(QSize(1200, 800)) |
73 |
self.setWindowTitle("MQTT demo") |
|
74 |
|
|
75 |
# Add logger text field |
|
76 |
logger = LoggerView(self) |
|
77 |
formatter = logging.Formatter('%(asctime)s %(message)s', '%H:%M') |
|
78 |
logger.setFormatter(formatter) |
|
79 |
logger.setLevel(logging.INFO) |
|
80 |
logging.getLogger('').addHandler(logger) |
|
77 |
self.setWindowTitle("MQTT client") |
|
81 | 78 |
|
79 |
logger = self._createLoggerView() |
|
82 | 80 |
layout = QVBoxLayout() |
83 | 81 |
layout.addWidget(logger.widget) |
84 | 82 |
# layout.addWidget(self.toolbar) |
85 | 83 |
|
86 |
self.setLayout(layout) |
|
84 |
widget = QWidget() |
|
85 |
widget.setLayout(layout) |
|
86 |
self.setCentralWidget(widget) |
|
87 |
self._createMenuBar() |
|
87 | 88 |
|
88 | 89 |
scrollArea = QScrollArea(self) |
89 | 90 |
scrollArea.setWidgetResizable(True) |
... | ... | |
94 | 95 |
|
95 | 96 |
self.init_subscriber() |
96 | 97 |
|
98 |
def _createLoggerView(self): |
|
99 |
logger = LoggerView(self) |
|
100 |
formatter = logging.Formatter('%(asctime)s %(message)s', '%H:%M') |
|
101 |
logger.setFormatter(formatter) |
|
102 |
logger.setLevel(logging.INFO) |
|
103 |
logging.getLogger('').addHandler(logger) |
|
104 |
return logger |
|
105 |
|
|
106 |
def _createMenuBar(self): |
|
107 |
menuBar = QMenuBar(self) |
|
108 |
settingsAction = QAction("&Settings", self) |
|
109 |
settingsAction.triggered.connect(self.settings) |
|
110 |
menuBar.addAction(settingsAction) |
|
111 |
self.setMenuBar(menuBar) |
|
112 |
|
|
97 | 113 |
def plot(self, message: Message): |
98 | 114 |
if message.topic in self.dataDict: |
99 | 115 |
self.dataDict[message.topic].append(message.value) |
... | ... | |
130 | 146 |
|
131 | 147 |
self.chartsNum += 1 |
132 | 148 |
|
149 |
def closeEvent(self, a0: QtGui.QCloseEvent) -> None: |
|
150 |
self.worker.stop() |
|
151 |
|
|
152 |
def settings(self): |
|
153 |
dialog = SettingsDialog() |
|
154 |
if dialog.exec_(): |
|
155 |
self.reconnect() |
|
156 |
|
|
157 |
def disconnect(self): |
|
158 |
self.worker.stop() |
|
159 |
self.workerThread.quit() |
|
160 |
self.workerThread.wait() |
|
161 |
|
|
162 |
def reconnect(self): |
|
163 |
self.disconnect() |
|
164 |
self.worker.params = self.getConfigParams() |
|
165 |
self.workerThread.start() |
|
166 |
|
|
133 | 167 |
def init_subscriber(self): |
134 | 168 |
self.workerThread = QThread() |
135 |
self.worker = Worker() |
|
169 |
self.worker = Worker(self.getConfigParams())
|
|
136 | 170 |
self.worker.moveToThread(self.workerThread) |
137 | 171 |
self.workerThread.started.connect(self.worker.start) |
138 | 172 |
self.worker.newMessage.connect( |
... | ... | |
140 | 174 |
) |
141 | 175 |
self.worker.window = self |
142 | 176 |
self.workerThread.start() |
177 |
|
|
178 |
def getConfigParams(self) -> SubscriberParams: |
|
179 |
settings = QSettings("Vochomurka", "MQTTClient") |
|
180 |
|
|
181 |
connection = ConnectionParams( |
|
182 |
settings.value("connection_host", DEFAULT_HOST, str), |
|
183 |
settings.value("connection_port", DEFAULT_PORT, int), |
|
184 |
settings.value("connection_keepalive", DEFAULT_KEEPALIVE, int) |
|
185 |
) |
|
186 |
|
|
187 |
params = SubscriberParams( |
|
188 |
settings.value("topics_items", DEFAULT_TOPICS), |
|
189 |
settings.value("topics_timeout", DEFAULT_TIMEOUT), |
|
190 |
connection, |
|
191 |
settings.value("connection_anonymous", DEFAULT_ANONYMOUS, bool), |
|
192 |
settings.value("connection_username", DEFAULT_USERNAME, str), |
|
193 |
settings.value("connection_password", DEFAULT_USERNAME, str), |
|
194 |
) |
|
195 |
|
|
196 |
return params |
aswi2021vochomurka/view/settings.py | ||
---|---|---|
1 |
from PyQt5 import QtCore |
|
2 |
from PyQt5.QtCore import QSettings, QSize |
|
3 |
from PyQt5.QtWidgets import QDialog, QVBoxLayout, QDialogButtonBox, QGroupBox, QFormLayout, QLabel, QLineEdit, QSpinBox, \ |
|
4 |
QCheckBox, QPushButton, QListWidget, QListWidgetItem |
|
5 |
|
|
6 |
DEFAULT_HOST = "localhost" |
|
7 |
DEFAULT_PORT = 1883 |
|
8 |
DEFAULT_KEEPALIVE = 60 |
|
9 |
DEFAULT_ANONYMOUS = True |
|
10 |
DEFAULT_USERNAME = "" |
|
11 |
DEFAULT_PASSWORD = "" |
|
12 |
DEFAULT_TOPICS = ["/home/1", "/home/2"] |
|
13 |
DEFAULT_TIMEOUT = 60 |
|
14 |
|
|
15 |
|
|
16 |
class SettingsDialog(QDialog): |
|
17 |
topics = DEFAULT_TOPICS |
|
18 |
|
|
19 |
def __init__(self): |
|
20 |
super(SettingsDialog, self).__init__(None, |
|
21 |
QtCore.Qt.WindowCloseButtonHint | QtCore.Qt.WindowSystemMenuHint | QtCore.Qt.WindowTitleHint) |
|
22 |
self.settings = QSettings("Vochomurka", "MQTTClient") |
|
23 |
self.setWindowTitle("Settings") |
|
24 |
self.setMinimumSize(QSize(600, 500)) |
|
25 |
|
|
26 |
connectionGroupBox = QGroupBox("Connection") |
|
27 |
connectionLayout = QFormLayout() |
|
28 |
self.hostInput = QLineEdit(self.settings.value("connection_host", DEFAULT_HOST, str)) |
|
29 |
connectionLayout.addRow(QLabel("Host:"), self.hostInput) |
|
30 |
self.portInput = QSpinBox() |
|
31 |
self.portInput.setMaximum(65535) |
|
32 |
self.portInput.setValue(self.settings.value("connection_port", DEFAULT_PORT, int)) |
|
33 |
connectionLayout.addRow(QLabel("Port:"), self.portInput) |
|
34 |
self.keepaliveInput = QSpinBox() |
|
35 |
self.keepaliveInput.setMaximum(1000) |
|
36 |
self.keepaliveInput.setValue(self.settings.value("connection_keepalive", DEFAULT_KEEPALIVE, int)) |
|
37 |
connectionLayout.addRow(QLabel("Keepalive(s):"), self.keepaliveInput) |
|
38 |
self.anonymousInput = QCheckBox() |
|
39 |
self.anonymousInput.setChecked(self.settings.value("connection_anonymous", DEFAULT_ANONYMOUS, bool)) |
|
40 |
self.anonymousInput.stateChanged.connect(self.anonymousChanged) |
|
41 |
connectionLayout.addRow(QLabel("Anonymous:"), self.anonymousInput) |
|
42 |
self.usernameInput = QLineEdit(self.settings.value("connection_username", DEFAULT_USERNAME, str)) |
|
43 |
connectionLayout.addRow(QLabel("Username:"), self.usernameInput) |
|
44 |
self.passwordInput = QLineEdit(self.settings.value("connection_password", DEFAULT_PASSWORD, str)) |
|
45 |
connectionLayout.addRow(QLabel("Password:"), self.passwordInput) |
|
46 |
self.anonymousChanged() |
|
47 |
connectionGroupBox.setLayout(connectionLayout) |
|
48 |
|
|
49 |
topicsGroupBox = QGroupBox("Topics") |
|
50 |
topicsLayout = QFormLayout() |
|
51 |
|
|
52 |
self.topics = self.settings.value("topics_items", DEFAULT_TOPICS, list) |
|
53 |
self.topicsListWidget = QListWidget() |
|
54 |
for topic in self.topics: |
|
55 |
item = QListWidgetItem() |
|
56 |
item.setText(topic) |
|
57 |
item.setFlags(item.flags() | QtCore.Qt.ItemIsEditable) |
|
58 |
self.topicsListWidget.addItem(item) |
|
59 |
|
|
60 |
topicsLayout.addRow(self.topicsListWidget) |
|
61 |
add = QPushButton("Add") |
|
62 |
add.setFixedWidth(60) |
|
63 |
add.clicked.connect(self.addTopic) |
|
64 |
remove = QPushButton("Remove") |
|
65 |
remove.setFixedWidth(60) |
|
66 |
remove.clicked.connect(self.removeTopic) |
|
67 |
topicsLayout.addRow(add, remove) |
|
68 |
|
|
69 |
self.timeoutInput = QSpinBox() |
|
70 |
self.timeoutInput.setMaximum(1000) |
|
71 |
self.timeoutInput.setToolTip("Unsubscribe topic and close file when there is not new message after this " |
|
72 |
"timeout (in seconds) expires") |
|
73 |
timeoutLabel = QLabel("Topic timeout(s):") |
|
74 |
timeoutLabel.setToolTip("Unsubscribe topic and close file when there is not new message after this " |
|
75 |
"timeout (in seconds) expires") |
|
76 |
self.timeoutInput.setValue(self.settings.value("topics_timeout", DEFAULT_TIMEOUT, int)) |
|
77 |
topicsLayout.addRow(timeoutLabel, self.timeoutInput) |
|
78 |
|
|
79 |
topicsGroupBox.setLayout(topicsLayout) |
|
80 |
|
|
81 |
buttonBox = QDialogButtonBox() |
|
82 |
buttonBox.addButton("Save and Reconnect", QDialogButtonBox.AcceptRole) |
|
83 |
buttonBox.addButton("Cancel", QDialogButtonBox.RejectRole) |
|
84 |
buttonBox.accepted.connect(self.accept) |
|
85 |
buttonBox.rejected.connect(self.reject) |
|
86 |
|
|
87 |
mainLayout = QVBoxLayout() |
|
88 |
mainLayout.addWidget(connectionGroupBox) |
|
89 |
mainLayout.addWidget(topicsGroupBox) |
|
90 |
mainLayout.addWidget(buttonBox) |
|
91 |
self.setLayout(mainLayout) |
|
92 |
|
|
93 |
def addTopic(self): |
|
94 |
item = QListWidgetItem() |
|
95 |
item.setText("/topic") |
|
96 |
item.setFlags(item.flags() | QtCore.Qt.ItemIsEditable) |
|
97 |
self.topicsListWidget.addItem(item) |
|
98 |
|
|
99 |
def removeTopic(self): |
|
100 |
for item in self.topicsListWidget.selectedItems(): |
|
101 |
self.topicsListWidget.takeItem(self.topicsListWidget.row(item)) |
|
102 |
|
|
103 |
def anonymousChanged(self): |
|
104 |
self.usernameInput.setEnabled(not self.anonymousInput.isChecked()) |
|
105 |
self.passwordInput.setEnabled(not self.anonymousInput.isChecked()) |
|
106 |
|
|
107 |
def accept(self) -> None: |
|
108 |
super().accept() |
|
109 |
self.topics = [] |
|
110 |
for index in range(self.topicsListWidget.count()): |
|
111 |
self.topics.append(self.topicsListWidget.item(index).text()) |
|
112 |
|
|
113 |
self.settings.setValue("topics_items", self.topics) |
|
114 |
self.settings.setValue("topics_timeout", self.timeoutInput.value()) |
|
115 |
self.settings.setValue("connection_host", self.hostInput.text()) |
|
116 |
self.settings.setValue("connection_port", self.portInput.value()) |
|
117 |
self.settings.setValue("connection_keepalive", self.keepaliveInput.value()) |
|
118 |
self.settings.setValue("connection_anonymous", self.anonymousInput.isChecked()) |
|
119 |
self.settings.setValue("connection_username", self.usernameInput.text()) |
|
120 |
self.settings.setValue("connection_password", self.passwordInput.text()) |
Také k dispozici: Unified diff
Feature/8921 preferences dialog