Revize 0cda8e8d
Přidáno uživatelem Oto Šťáva před téměř 4 roky(ů)
deltarobot/curvedataserver.cpp | ||
---|---|---|
3 | 3 |
#include <QDebug> |
4 | 4 |
#include <QHostAddress> |
5 | 5 |
#include <QAbstractSocket> |
6 |
#include <chrono> |
|
7 |
|
|
8 |
|
|
9 |
// Utility functions /////////////////////////////////////////////////////////////////////////////////////////////////// |
|
6 | 10 |
|
7 | 11 |
static QByteArray message8(uint16_t messageId, uint8_t content = 0) { |
8 | 12 |
QByteArray result(2 + 1, 0); |
... | ... | |
79 | 83 |
return result; |
80 | 84 |
} |
81 | 85 |
|
82 |
static QByteArray messageVarLength(uint16_t messageId, uint32_t length) { |
|
86 |
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
|
|
83 | 87 |
QByteArray result(2 + 4 + length, 0); |
84 | 88 |
result[0] = ((uint8_t*) &messageId)[0]; |
85 | 89 |
result[1] = ((uint8_t*) &messageId)[1]; |
... | ... | |
87 | 91 |
result[3] = ((uint8_t*) &length)[1]; |
88 | 92 |
result[4] = ((uint8_t*) &length)[2]; |
89 | 93 |
result[5] = ((uint8_t*) &length)[3]; |
94 |
|
|
95 |
if (content) { |
|
96 |
for (uint32_t i = 0; i < length; i++) { |
|
97 |
result[6 + i] = content[i]; |
|
98 |
} |
|
99 |
} |
|
90 | 100 |
return result; |
91 | 101 |
} |
92 | 102 |
|
103 |
inline long currentTimeMillis() { |
|
104 |
return std::chrono::duration_cast<std::chrono::milliseconds>( |
|
105 |
std::chrono::steady_clock::now().time_since_epoch()) |
|
106 |
.count(); |
|
107 |
} |
|
108 |
|
|
109 |
|
|
110 |
// Client ////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
|
111 |
|
|
112 |
CurveDataServer::Client::Client(QTcpSocket* socket) : |
|
113 |
socket(socket), |
|
114 |
lastPong(currentTimeMillis()), |
|
115 |
bufUsed(0) |
|
116 |
{} |
|
117 |
|
|
118 |
|
|
119 |
// Server ////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
|
120 |
|
|
93 | 121 |
CurveDataServer::CurveDataServer() : |
94 | 122 |
_server(this), |
95 | 123 |
_sockets() |
... | ... | |
98 | 126 |
connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection())); |
99 | 127 |
} |
100 | 128 |
|
129 |
CurveDataServer::~CurveDataServer() |
|
130 |
{ |
|
131 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock); |
|
132 |
for (auto& client : _sockets) |
|
133 |
{ |
|
134 |
sendEot(client.socket, "Server is stopping"); |
|
135 |
client.socket->close(); |
|
136 |
} |
|
137 |
_sockets.clear(); |
|
138 |
} |
|
139 |
|
|
101 | 140 |
void CurveDataServer::onNewConnection() |
102 | 141 |
{ |
103 | 142 |
QTcpSocket *clientSocket = _server.nextPendingConnection(); |
104 | 143 |
connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead())); |
105 | 144 |
connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); |
106 | 145 |
|
107 |
_sockets.push_back(clientSocket); |
|
108 |
|
|
109 | 146 |
sendPreamble(clientSocket); |
147 |
_sockets.push_back(Client(clientSocket)); |
|
110 | 148 |
} |
111 | 149 |
|
112 | 150 |
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState) |
... | ... | |
114 | 152 |
if (socketState == QAbstractSocket::UnconnectedState) |
115 | 153 |
{ |
116 | 154 |
QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender()); |
117 |
_sockets.removeOne(sender); |
|
155 |
|
|
156 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock); |
|
157 |
for (auto it = _sockets.begin(); it != _sockets.end(); it++) { |
|
158 |
if (it->socket == sender) { |
|
159 |
_sockets.erase(it); |
|
160 |
break; |
|
161 |
} |
|
162 |
} |
|
118 | 163 |
} |
119 | 164 |
} |
120 | 165 |
|
121 | 166 |
void CurveDataServer::onReadyRead() |
122 | 167 |
{ |
123 |
// tady actually nakonec switch zpracovani ruznych zprav dle identifikatoru |
|
168 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock); |
|
169 |
|
|
124 | 170 |
QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender()); |
125 |
QByteArray data = sender->readAll(); |
|
126 |
for (QTcpSocket* socket : _sockets) { |
|
127 |
if (socket != sender) |
|
128 |
socket->write(QByteArray::fromStdString(sender->peerAddress().toString().toStdString() + ": " + data.toStdString())); |
|
171 |
auto it = clientOf(sender); |
|
172 |
if (it == _sockets.end()) { |
|
173 |
return; |
|
174 |
} |
|
175 |
Client& client = *it; |
|
176 |
|
|
177 |
while (sender->bytesAvailable()) { |
|
178 |
if (client.bufUsed < Client::MESSAGE_ID_SIZE) { |
|
179 |
qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable()); |
|
180 |
client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead); |
|
181 |
} |
|
182 |
|
|
183 |
if (client.bufUsed < 2) { |
|
184 |
// no ID yet, wait for more data |
|
185 |
return; |
|
186 |
} |
|
187 |
|
|
188 |
uint16_t messageId = *((uint16_t*) client.buf); |
|
189 |
qint64 contentSize; |
|
190 |
switch (messageId & 0xF000) { |
|
191 |
case 0x0000: |
|
192 |
contentSize = 1; |
|
193 |
break; |
|
194 |
case 0x1000: |
|
195 |
contentSize = 2; |
|
196 |
break; |
|
197 |
case 0x2000: |
|
198 |
contentSize = 4; |
|
199 |
break; |
|
200 |
case 0x3000: |
|
201 |
contentSize = 8; |
|
202 |
break; |
|
203 |
case 0x4000: |
|
204 |
contentSize = 16; |
|
205 |
break; |
|
206 |
case 0xF000: |
|
207 |
// TODO - implement support for variable length messages |
|
208 |
return; |
|
209 |
default: |
|
210 |
// unsupported message size |
|
211 |
return; |
|
212 |
} |
|
213 |
|
|
214 |
qint64 toRead = qMin(qint64(contentSize - (client.bufUsed - Client::MESSAGE_ID_SIZE)), sender->bytesAvailable()); |
|
215 |
if (toRead > 0) { |
|
216 |
client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead); |
|
217 |
} |
|
218 |
|
|
219 |
if (client.bufUsed == (Client::MESSAGE_ID_SIZE + contentSize)) { |
|
220 |
handleMessage(client, messageId, &client.buf[Client::MESSAGE_ID_SIZE]); |
|
221 |
client.bufUsed = 0; |
|
222 |
} |
|
129 | 223 |
} |
130 | 224 |
} |
131 | 225 |
|
132 |
QByteArray *CurveDataServer::putFloatIntoMessage(QByteArray *message, int index, float f) |
|
133 |
{ |
|
134 |
if (!message || message->size() < index + 4) |
|
135 |
return message; |
|
136 |
|
|
137 |
unsigned char *c = reinterpret_cast<unsigned char *>(&f); |
|
138 |
|
|
139 |
for (int i = 0; i < 4; i++) |
|
140 |
{ |
|
141 |
(*message)[index + i] = c[i]; |
|
226 |
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content) { |
|
227 |
switch (messageId) { |
|
228 |
case 0x3001: |
|
229 |
qDebug() << "Magic arrived!"; |
|
230 |
break; |
|
231 |
case 0x2002: |
|
232 |
qDebug() << "Version arrived!"; |
|
233 |
break; |
|
234 |
case 0x3005: |
|
235 |
qDebug() << "Pong " << *((uint64_t*) content) << " arrived!"; |
|
236 |
break; |
|
142 | 237 |
} |
143 |
|
|
144 |
return message; |
|
145 | 238 |
} |
146 | 239 |
|
147 |
|
|
148 | 240 |
void CurveDataServer::sendActuatorPosition(float x, float y, float z) |
149 | 241 |
{ |
150 | 242 |
QByteArray message = message128(0x4003, |
... | ... | |
152 | 244 |
*((uint32_t*) &y), |
153 | 245 |
*((uint32_t*) &z)); |
154 | 246 |
|
155 |
for (QTcpSocket* socket : _sockets) { |
|
156 |
socket->write(message); |
|
247 |
{ |
|
248 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock); |
|
249 |
for (auto& client : _sockets) { |
|
250 |
client.socket->write(message); |
|
251 |
} |
|
157 | 252 |
} |
158 |
//qDebug() << message.toHex(); |
|
159 |
//qDebug() << x << " " << y << " " << z; |
|
160 | 253 |
} |
161 | 254 |
|
255 |
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) { |
|
256 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock); |
|
257 |
for (auto it = _sockets.begin(); it != _sockets.end(); it++) { |
|
258 |
if (it->socket == socket) { |
|
259 |
return it; |
|
260 |
} |
|
261 |
} |
|
262 |
return _sockets.end(); |
|
263 |
} |
|
162 | 264 |
|
163 | 265 |
void CurveDataServer::sendPreamble(QTcpSocket *socket) |
164 | 266 |
{ |
... | ... | |
186 | 288 |
qDebug() << "Sent Protocol Version to" << socket->peerAddress(); |
187 | 289 |
//qDebug() << message.toHex(); |
188 | 290 |
} |
291 |
|
|
292 |
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason) |
|
293 |
{ |
|
294 |
QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str()); |
|
295 |
socket->write(message); |
|
296 |
} |
Také k dispozici: Unified diff
Re #8735 - Implement base server message retrieval