Projekt

Obecné

Profil

Stáhnout (16.6 KB) Statistiky
| Větev: | Tag: | Revize:
1 681d1a97 Jakub Hejman
#include "curvedataserver.h"
2
3
#include <QDebug>
4
#include <QHostAddress>
5
#include <QAbstractSocket>
6 f06d5ae1 Jakub Hejman
#include <QVector4D>
7 0cda8e8d Oto Šťáva
#include <chrono>
8 285c6fe5 Oto Šťáva
#include <string>
9
#include <cstring>
10 0cda8e8d Oto Šťáva
11 6d9543aa Jakub Hejman
#include "../element/trajectory.hpp"
12 0cda8e8d Oto Šťáva
13
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
14 681d1a97 Jakub Hejman
15 e6053a29 Oto Šťáva
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
16
    QByteArray result(2 + 1, 0);
17
    result[0] = ((uint8_t*) &messageId)[0];
18
    result[1] = ((uint8_t*) &messageId)[1];
19
    result[2] = content;
20
    return result;
21
}
22
23
static QByteArray message16(uint16_t messageId, uint16_t content = 0) {
24
    QByteArray result(2 + 2, 0);
25
    result[0] = ((uint8_t*) &messageId)[0];
26
    result[1] = ((uint8_t*) &messageId)[1];
27
    result[2] = ((uint8_t*) &content)[0];
28
    result[3] = ((uint8_t*) &content)[1];
29
    return result;
30
}
31
32
static QByteArray message32(uint16_t messageId, uint32_t content = 0) {
33
    QByteArray result(2 + 4, 0);
34
    result[0] = ((uint8_t*) &messageId)[0];
35
    result[1] = ((uint8_t*) &messageId)[1];
36
    result[2] = ((uint8_t*) &content)[0];
37
    result[3] = ((uint8_t*) &content)[1];
38
    result[4] = ((uint8_t*) &content)[2];
39
    result[5] = ((uint8_t*) &content)[3];
40
    return result;
41
}
42
43
static QByteArray message64(uint16_t messageId, uint64_t content = 0) {
44
    QByteArray result(2 + 8, 0);
45
    result[0] = ((uint8_t*) &messageId)[0];
46
    result[1] = ((uint8_t*) &messageId)[1];
47
    result[2] = ((uint8_t*) &content)[0];
48
    result[3] = ((uint8_t*) &content)[1];
49
    result[4] = ((uint8_t*) &content)[2];
50
    result[5] = ((uint8_t*) &content)[3];
51
    result[6] = ((uint8_t*) &content)[4];
52
    result[7] = ((uint8_t*) &content)[5];
53
    result[8] = ((uint8_t*) &content)[6];
54
    result[9] = ((uint8_t*) &content)[7];
55
    return result;
56
}
57
58
static QByteArray message128(uint16_t messageId,
59
                             uint32_t contentX = 0,
60
                             uint32_t contentY = 0,
61
                             uint32_t contentZ = 0,
62
                             uint32_t contentW = 0)
63
{
64
    QByteArray result(2 + 16, 0);
65
    result[0] = ((uint8_t*) &messageId)[0];
66
    result[1] = ((uint8_t*) &messageId)[1];
67
68
    result[2] = ((uint8_t*) &contentX)[0];
69
    result[3] = ((uint8_t*) &contentX)[1];
70
    result[4] = ((uint8_t*) &contentX)[2];
71
    result[5] = ((uint8_t*) &contentX)[3];
72
73
    result[6] = ((uint8_t*) &contentY)[0];
74
    result[7] = ((uint8_t*) &contentY)[1];
75
    result[8] = ((uint8_t*) &contentY)[2];
76
    result[9] = ((uint8_t*) &contentY)[3];
77
78
    result[10] = ((uint8_t*) &contentZ)[0];
79
    result[11] = ((uint8_t*) &contentZ)[1];
80
    result[12] = ((uint8_t*) &contentZ)[2];
81
    result[13] = ((uint8_t*) &contentZ)[3];
82
83
    result[14] = ((uint8_t*) &contentW)[0];
84
    result[15] = ((uint8_t*) &contentW)[1];
85
    result[16] = ((uint8_t*) &contentW)[2];
86
    result[17] = ((uint8_t*) &contentW)[3];
87
    return result;
88
}
89
90 0cda8e8d Oto Šťáva
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
91 e6053a29 Oto Šťáva
    QByteArray result(2 + 4 + length, 0);
92
    result[0] = ((uint8_t*) &messageId)[0];
93
    result[1] = ((uint8_t*) &messageId)[1];
94
    result[2] = ((uint8_t*) &length)[0];
95
    result[3] = ((uint8_t*) &length)[1];
96
    result[4] = ((uint8_t*) &length)[2];
97
    result[5] = ((uint8_t*) &length)[3];
98 0cda8e8d Oto Šťáva
99
    if (content) {
100
        for (uint32_t i = 0; i < length; i++) {
101
            result[6 + i] = content[i];
102
        }
103
    }
104 e6053a29 Oto Šťáva
    return result;
105
}
106
107 6d9543aa Jakub Hejman
inline uint64_t currentTimeMillis() {
108 0cda8e8d Oto Šťáva
    return std::chrono::duration_cast<std::chrono::milliseconds>(
109
                std::chrono::steady_clock::now().time_since_epoch())
110
            .count();
111
}
112
113
114
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
115
116
CurveDataServer::Client::Client(QTcpSocket* socket) :
117
    socket(socket),
118
    lastPong(currentTimeMillis()),
119
    bufUsed(0)
120
{}
121
122
123
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
124
125 207b75a4 Oto Šťáva
CurveDataServer::CurveDataServer(quint16 port) :
126 681d1a97 Jakub Hejman
    _server(this),
127
    _sockets()
128
{
129 207b75a4 Oto Šťáva
    qDebug() << "Curve server listening on port" << port;
130
    _server.listen(QHostAddress::Any, port);
131 681d1a97 Jakub Hejman
    connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
132
}
133
134 0cda8e8d Oto Šťáva
CurveDataServer::~CurveDataServer()
135
{
136
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
137
    for (auto& client : _sockets)
138
    {
139 6d9543aa Jakub Hejman
        terminateConnection(client, "Server is stopping");
140 0cda8e8d Oto Šťáva
    }
141
    _sockets.clear();
142
}
143
144 681d1a97 Jakub Hejman
void CurveDataServer::onNewConnection()
145
{
146
    QTcpSocket *clientSocket = _server.nextPendingConnection();
147
    connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
148
    connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
149
150 04839796 Oto Šťáva
    qDebug() << "Client" << clientSocket->peerAddress() << "is connecting...";
151 e60ade62 Jakub Hejman
    sendPreamble(clientSocket);
152 0cda8e8d Oto Šťáva
    _sockets.push_back(Client(clientSocket));
153 681d1a97 Jakub Hejman
}
154
155
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
156
{
157
    if (socketState == QAbstractSocket::UnconnectedState)
158
    {
159
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
160 0cda8e8d Oto Šťáva
161 6d9543aa Jakub Hejman
        removeSocket(sender);
162 681d1a97 Jakub Hejman
    }
163
}
164
165
void CurveDataServer::onReadyRead()
166
{
167 0cda8e8d Oto Šťáva
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
168
169 681d1a97 Jakub Hejman
    QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
170 0cda8e8d Oto Šťáva
    auto it = clientOf(sender);
171
    if (it == _sockets.end()) {
172
        return;
173
    }
174
    Client& client = *it;
175
176
    while (sender->bytesAvailable()) {
177 285c6fe5 Oto Šťáva
        // read the message ID
178 0cda8e8d Oto Šťáva
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
179
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
180 285c6fe5 Oto Šťáva
            if (toRead > 0) {
181
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
182
            }
183 0cda8e8d Oto Šťáva
        }
184
185
        if (client.bufUsed < 2) {
186
            // no ID yet, wait for more data
187
            return;
188
        }
189
190
        uint16_t messageId = *((uint16_t*) client.buf);
191 285c6fe5 Oto Šťáva
        bool variableLength = false;
192 0cda8e8d Oto Šťáva
        qint64 contentSize;
193
        switch (messageId & 0xF000) {
194
        case 0x0000:
195
            contentSize = 1;
196
            break;
197
        case 0x1000:
198
            contentSize = 2;
199
            break;
200
        case 0x2000:
201
            contentSize = 4;
202
            break;
203
        case 0x3000:
204
            contentSize = 8;
205
            break;
206
        case 0x4000:
207
            contentSize = 16;
208
            break;
209
        case 0xF000:
210 285c6fe5 Oto Šťáva
            variableLength = true;
211 6d9543aa Jakub Hejman
            contentSize = 0;
212
            break;
213 0cda8e8d Oto Šťáva
        default:
214
            // unsupported message size
215
            return;
216
        }
217
218 285c6fe5 Oto Šťáva
        size_t contentOffset = Client::MESSAGE_ID_SIZE;
219
220
        if (variableLength) {
221
            // read the message's content length
222
            qint64 toRead = qMin(
223
                        qint64(Client::MESSAGE_LENGTH_SIZE - (client.bufUsed - Client::MESSAGE_ID_SIZE)),
224
                        sender->bytesAvailable());
225
            if (toRead > 0) {
226
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
227
            }
228
229
            if (client.bufUsed < (Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE)) {
230
                // we do not yet know the content size, do not proceed any further
231
                return;
232
            }
233
234
            contentSize = qint64(*((uint32_t*) &client.buf[Client::MESSAGE_ID_SIZE]));
235
            contentOffset = Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE;
236
        }
237
238
        // read message content
239
        qint64 toRead = qMin(
240
                    qint64(contentSize - (client.bufUsed - contentOffset)),
241
                    sender->bytesAvailable());
242 0cda8e8d Oto Šťáva
        if (toRead > 0) {
243 285c6fe5 Oto Šťáva
            qint64 bufRemaining = qint64(Client::BUF_SIZE - client.bufUsed);
244
            if (toRead > bufRemaining) {
245
                // message would overflow the buffer - we'll read whatever fits and skip the rest
246
                if (bufRemaining > 0) {
247
                    qint64 actuallyRead = sender->read(&client.buf[client.bufUsed], bufRemaining);
248
                    client.bufUsed += actuallyRead;
249
                    toRead -= actuallyRead;
250
                }
251
252
                if (toRead > 0) {
253
                    sender->skip(toRead);
254
                }
255
            } else {
256
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
257
            }
258 0cda8e8d Oto Šťáva
        }
259
260 285c6fe5 Oto Šťáva
        if (client.bufUsed == (contentOffset + contentSize)) {
261
            handleMessage(client, messageId, &client.buf[contentOffset], contentSize);
262 0cda8e8d Oto Šťáva
            client.bufUsed = 0;
263
        }
264 681d1a97 Jakub Hejman
    }
265
}
266
267 285c6fe5 Oto Šťáva
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content, size_t contentSize) {
268 0cda8e8d Oto Šťáva
    switch (messageId) {
269
    case 0x3001:
270 6d9543aa Jakub Hejman
        handleProtocolMagic(client, (char *) content);
271 0cda8e8d Oto Šťáva
        break;
272
    case 0x2002:
273 6d9543aa Jakub Hejman
        handleVersion(client, *((uint8_t*) content));
274 0cda8e8d Oto Šťáva
        break;
275
    case 0x3005:
276 6d9543aa Jakub Hejman
        handlePong(client, (uint64_t*) content);
277 0cda8e8d Oto Šťáva
        break;
278 6d9543aa Jakub Hejman
    case 0xF006:
279
        qDebug() << "EOT!";
280 285c6fe5 Oto Šťáva
        handleEOT(client, (char *) content, contentSize);
281 6d9543aa Jakub Hejman
        break;
282
    }
283
}
284
285
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
286
{
287
    char *correct = "DeltaRVr";
288
289
    if (strncmp(content, correct, 8) == 0) {
290
        client.magic = true;
291 681d1a97 Jakub Hejman
    }
292 04839796 Oto Šťáva
    validateProtocol(client);
293 681d1a97 Jakub Hejman
}
294
295 6d9543aa Jakub Hejman
void CurveDataServer::handleVersion(Client& client, uint8_t content)
296
{
297
    if (((uint8_t*) &content)[0] == 1)
298
    {
299
        client.version = true;
300
    }
301 04839796 Oto Šťáva
    validateProtocol(client);
302 6d9543aa Jakub Hejman
}
303
304
void CurveDataServer::handlePong(Client& client, uint64_t* content)
305
{
306
307
    uint64_t time = currentTimeMillis();
308
309
    uint64_t number = *content;
310
311
    uint64_t timeSent = client.ping.at(number);
312
313
    client.ping.erase(number);
314
315
    client.latency = time - timeSent;
316
    client.lastPong = time;
317
}
318
319 285c6fe5 Oto Šťáva
void CurveDataServer::handleEOT(Client& client, char* content, size_t contentSize)
320 6d9543aa Jakub Hejman
{
321 285c6fe5 Oto Šťáva
    char cstrContent[contentSize + 1];
322
    std::memcpy(cstrContent, content, contentSize);
323
    cstrContent[contentSize] = '\0';
324
325 04839796 Oto Šťáva
    qDebug() << "Client" << client.socket->peerAddress() << "disconnected:" << cstrContent;
326 6d9543aa Jakub Hejman
    terminateConnection(client, "Recieved EOT from client");
327
}
328
329
330 04839796 Oto Šťáva
void CurveDataServer::validateProtocol(Client &client) {
331
    if (!client.protocolValid && client.magic && client.version) {
332
        client.protocolValid = true;
333
        qDebug() << "Client" << client.socket->peerAddress() << "has successfully connected.";
334
    }
335
}
336
337
338 6d9543aa Jakub Hejman
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
339
{
340
    if (client.socket->isOpen())
341
    {
342
        sendEot(client.socket, std::forward<std::string&&>(reason));
343
344
        client.socket->close();
345
    }
346
347 04839796 Oto Šťáva
    qDebug() << "Terminating connection to client" << client.socket->peerAddress() << "reason:" << reason.c_str();
348 6d9543aa Jakub Hejman
    removeSocket(client.socket);
349
}
350
351 e6053a29 Oto Šťáva
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
352 681d1a97 Jakub Hejman
{
353 e6053a29 Oto Šťáva
    QByteArray message = message128(0x4003,
354
                                    *((uint32_t*) &x),
355
                                    *((uint32_t*) &y),
356
                                    *((uint32_t*) &z));
357 681d1a97 Jakub Hejman
358 6d9543aa Jakub Hejman
    sendMessageToAllConnected(message);
359 e60ade62 Jakub Hejman
}
360
361 f06d5ae1 Jakub Hejman
void CurveDataServer::sendActualDirection(float x, float y, float z)
362
{
363
    QByteArray message = message128(0x4007,
364
                                    *((uint32_t*) &x),
365
                                    *((uint32_t*) &y),
366
                                    *((uint32_t*) &z));
367
368 6d9543aa Jakub Hejman
    sendMessageToAllConnected(message);
369 f06d5ae1 Jakub Hejman
}
370
371
void CurveDataServer::sendTargetDirection(float x, float y, float z)
372
{
373
    QByteArray message = message128(0x4008,
374
                                    *((uint32_t*) &x),
375
                                    *((uint32_t*) &y),
376
                                    *((uint32_t*) &z));
377
378 6d9543aa Jakub Hejman
    sendMessageToAllConnected(message);
379 f06d5ae1 Jakub Hejman
}
380
381 705930ed Jakub Hejman
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
382 f06d5ae1 Jakub Hejman
{
383
    int size = points.size();
384
385
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
386
387
    for (int i = 0; i < size; i++)
388
    {
389
        float xf = points[i].x();
390
        float yf = points[i].y();
391
        float zf = points[i].z();
392
393
        uint8_t *x = ((uint8_t*) &xf);
394
        uint8_t *y = ((uint8_t*) &yf);
395
        uint8_t *z = ((uint8_t*) &zf);
396
397
        message[2 + 4 + i*12 + 0] = x[0];
398
        message[2 + 4 + i*12 + 1] = x[1];
399
        message[2 + 4 + i*12 + 2] = x[2];
400
        message[2 + 4 + i*12 + 3] = x[3];
401
402
        message[2 + 4 + i*12 + 4] = y[0];
403
        message[2 + 4 + i*12 + 5] = y[1];
404
        message[2 + 4 + i*12 + 6] = y[2];
405
        message[2 + 4 + i*12 + 7] = y[3];
406
407
        message[2 + 4 + i*12 + 8] = z[0];
408
        message[2 + 4 + i*12 + 9] = z[1];
409
        message[2 + 4 + i*12 + 10] = z[2];
410
        message[2 + 4 + i*12 + 11] = z[3];
411
    }
412
413 6d9543aa Jakub Hejman
    sendMessageToAllConnected(message);
414
415
416
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
417
    _currentCurve.clear();
418
    _currentCurve.append(points);
419 f06d5ae1 Jakub Hejman
}
420
421 705930ed Jakub Hejman
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
422 f06d5ae1 Jakub Hejman
{
423
    QList<QVector3D> curve3d;
424
    int itemCount = points.size();
425
426
    curve3d.clear();
427
    if (itemCount > MAX_CURVE_SIZE)
428
        itemCount = MAX_CURVE_SIZE;
429
    for (int i = 0; i < itemCount; i++)
430
    {
431
        curve3d.append(points[i].toVector3D());
432
    }
433
434
    sendNewCurve(curve3d);
435
}
436
437 0cda8e8d Oto Šťáva
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
438
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
439
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
440
        if (it->socket == socket) {
441
            return it;
442
        }
443
    }
444
    return _sockets.end();
445
}
446 e60ade62 Jakub Hejman
447
void CurveDataServer::sendPreamble(QTcpSocket *socket)
448
{
449
    sendProtocolMagic(socket);
450
    sendVersion(socket);
451
}
452
453
void CurveDataServer::sendProtocolMagic(QTcpSocket *socket)
454
{
455
    char *messageContent = "DeltaRVr";
456 e6053a29 Oto Šťáva
    QByteArray message = message64(0x3001, *((uint64_t*) messageContent));
457 e60ade62 Jakub Hejman
458
    socket->write(message);
459
}
460
461
void CurveDataServer::sendVersion(QTcpSocket *socket)
462
{
463 e6053a29 Oto Šťáva
    QByteArray message = message32(0x2002, 1);
464 e60ade62 Jakub Hejman
465
    socket->write(message);
466 681d1a97 Jakub Hejman
}
467 0cda8e8d Oto Šťáva
468
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
469
{
470
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
471
    socket->write(message);
472
}
473 6d9543aa Jakub Hejman
474
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
475
{
476
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
477
    for (auto& client : _sockets) {
478 04839796 Oto Šťáva
        if (client.protocolValid)
479 6d9543aa Jakub Hejman
        {
480
            client.socket->write(message);
481
        }
482
    }
483
}
484
485
void CurveDataServer::processPings()
486
{
487
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
488
    for (auto& client : _sockets) {
489
490 04839796 Oto Šťáva
        if (client.protocolValid)
491 6d9543aa Jakub Hejman
        {
492
            if (client.lastPong  < currentTimeMillis() - 10000)
493
            {
494
                terminateConnection(client, "Timeout");
495
                continue;
496
            }
497
498
            sendPing(client);
499
500
            std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
501
            if (!client.initialized && !_currentCurve.isEmpty())
502
            {
503
                sendCurve(client);
504
                client.initialized = true;
505
            }
506
        }
507
        else if (client.lastPong  < currentTimeMillis() - 10000)
508
        {
509
            terminateConnection(client, "Preamble timeout");
510
            continue;
511
        }
512
    }
513
}
514
515
void CurveDataServer::removeSocket(QTcpSocket *socket)
516
{
517
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
518
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
519
        if (it->socket == socket) {
520
            _sockets.erase(it);
521
            break;
522
        }
523
    }
524
}
525
526
void CurveDataServer::sendPing(Client& client)
527
{
528
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
529
530
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
531
532
    client.socket->write(message64(0x3004, (uint64_t) number));
533
}
534
535
void CurveDataServer::sendCurve(Client &client)
536
{
537
    int size = _currentCurve.size();
538
539
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
540
541
    for (int i = 0; i < size; i++)
542
    {
543
        float xf = _currentCurve[i].x();
544
        float yf = _currentCurve[i].y();
545
        float zf = _currentCurve[i].z();
546
547
        uint8_t *x = ((uint8_t*) &xf);
548
        uint8_t *y = ((uint8_t*) &yf);
549
        uint8_t *z = ((uint8_t*) &zf);
550
551
        message[2 + 4 + i*12 + 0] = x[0];
552
        message[2 + 4 + i*12 + 1] = x[1];
553
        message[2 + 4 + i*12 + 2] = x[2];
554
        message[2 + 4 + i*12 + 3] = x[3];
555
556
        message[2 + 4 + i*12 + 4] = y[0];
557
        message[2 + 4 + i*12 + 5] = y[1];
558
        message[2 + 4 + i*12 + 6] = y[2];
559
        message[2 + 4 + i*12 + 7] = y[3];
560
561
        message[2 + 4 + i*12 + 8] = z[0];
562
        message[2 + 4 + i*12 + 9] = z[1];
563
        message[2 + 4 + i*12 + 10] = z[2];
564
        message[2 + 4 + i*12 + 11] = z[3];
565
    }
566
567
    client.socket->write(message);
568
}