Projekt

Obecné

Profil

Stáhnout (16.6 KB) Statistiky
| Větev: | Tag: | Revize:
1
#include "curvedataserver.h"
2

    
3
#include <QDebug>
4
#include <QHostAddress>
5
#include <QAbstractSocket>
6
#include <QVector4D>
7
#include <chrono>
8
#include <string>
9
#include <cstring>
10

    
11
#include "../element/trajectory.hpp"
12

    
13
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
14

    
15
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
16
    QByteArray result(2 + 1, 0);
17
    result[0] = ((uint8_t*) &messageId)[0];
18
    result[1] = ((uint8_t*) &messageId)[1];
19
    result[2] = content;
20
    return result;
21
}
22

    
23
static QByteArray message16(uint16_t messageId, uint16_t content = 0) {
24
    QByteArray result(2 + 2, 0);
25
    result[0] = ((uint8_t*) &messageId)[0];
26
    result[1] = ((uint8_t*) &messageId)[1];
27
    result[2] = ((uint8_t*) &content)[0];
28
    result[3] = ((uint8_t*) &content)[1];
29
    return result;
30
}
31

    
32
static QByteArray message32(uint16_t messageId, uint32_t content = 0) {
33
    QByteArray result(2 + 4, 0);
34
    result[0] = ((uint8_t*) &messageId)[0];
35
    result[1] = ((uint8_t*) &messageId)[1];
36
    result[2] = ((uint8_t*) &content)[0];
37
    result[3] = ((uint8_t*) &content)[1];
38
    result[4] = ((uint8_t*) &content)[2];
39
    result[5] = ((uint8_t*) &content)[3];
40
    return result;
41
}
42

    
43
static QByteArray message64(uint16_t messageId, uint64_t content = 0) {
44
    QByteArray result(2 + 8, 0);
45
    result[0] = ((uint8_t*) &messageId)[0];
46
    result[1] = ((uint8_t*) &messageId)[1];
47
    result[2] = ((uint8_t*) &content)[0];
48
    result[3] = ((uint8_t*) &content)[1];
49
    result[4] = ((uint8_t*) &content)[2];
50
    result[5] = ((uint8_t*) &content)[3];
51
    result[6] = ((uint8_t*) &content)[4];
52
    result[7] = ((uint8_t*) &content)[5];
53
    result[8] = ((uint8_t*) &content)[6];
54
    result[9] = ((uint8_t*) &content)[7];
55
    return result;
56
}
57

    
58
static QByteArray message128(uint16_t messageId,
59
                             uint32_t contentX = 0,
60
                             uint32_t contentY = 0,
61
                             uint32_t contentZ = 0,
62
                             uint32_t contentW = 0)
63
{
64
    QByteArray result(2 + 16, 0);
65
    result[0] = ((uint8_t*) &messageId)[0];
66
    result[1] = ((uint8_t*) &messageId)[1];
67

    
68
    result[2] = ((uint8_t*) &contentX)[0];
69
    result[3] = ((uint8_t*) &contentX)[1];
70
    result[4] = ((uint8_t*) &contentX)[2];
71
    result[5] = ((uint8_t*) &contentX)[3];
72

    
73
    result[6] = ((uint8_t*) &contentY)[0];
74
    result[7] = ((uint8_t*) &contentY)[1];
75
    result[8] = ((uint8_t*) &contentY)[2];
76
    result[9] = ((uint8_t*) &contentY)[3];
77

    
78
    result[10] = ((uint8_t*) &contentZ)[0];
79
    result[11] = ((uint8_t*) &contentZ)[1];
80
    result[12] = ((uint8_t*) &contentZ)[2];
81
    result[13] = ((uint8_t*) &contentZ)[3];
82

    
83
    result[14] = ((uint8_t*) &contentW)[0];
84
    result[15] = ((uint8_t*) &contentW)[1];
85
    result[16] = ((uint8_t*) &contentW)[2];
86
    result[17] = ((uint8_t*) &contentW)[3];
87
    return result;
88
}
89

    
90
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
91
    QByteArray result(2 + 4 + length, 0);
92
    result[0] = ((uint8_t*) &messageId)[0];
93
    result[1] = ((uint8_t*) &messageId)[1];
94
    result[2] = ((uint8_t*) &length)[0];
95
    result[3] = ((uint8_t*) &length)[1];
96
    result[4] = ((uint8_t*) &length)[2];
97
    result[5] = ((uint8_t*) &length)[3];
98

    
99
    if (content) {
100
        for (uint32_t i = 0; i < length; i++) {
101
            result[6 + i] = content[i];
102
        }
103
    }
104
    return result;
105
}
106

    
107
inline uint64_t currentTimeMillis() {
108
    return std::chrono::duration_cast<std::chrono::milliseconds>(
109
                std::chrono::steady_clock::now().time_since_epoch())
110
            .count();
111
}
112

    
113

    
114
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
115

    
116
CurveDataServer::Client::Client(QTcpSocket* socket) :
117
    socket(socket),
118
    lastPong(currentTimeMillis()),
119
    bufUsed(0)
120
{}
121

    
122

    
123
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
124

    
125
CurveDataServer::CurveDataServer() :
126
    _server(this),
127
    _sockets()
128
{
129
    _server.listen(QHostAddress::Any, 4242);
130
    connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
131
}
132

    
133
CurveDataServer::~CurveDataServer()
134
{
135
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
136
    for (auto& client : _sockets)
137
    {
138
        terminateConnection(client, "Server is stopping");
139
    }
140
    _sockets.clear();
141
}
142

    
143
void CurveDataServer::onNewConnection()
144
{
145
    QTcpSocket *clientSocket = _server.nextPendingConnection();
146
    connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
147
    connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
148

    
149
    qDebug() << "Client" << clientSocket->peerAddress() << "is connecting...";
150
    sendPreamble(clientSocket);
151
    _sockets.push_back(Client(clientSocket));
152
}
153

    
154
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
155
{
156
    if (socketState == QAbstractSocket::UnconnectedState)
157
    {
158
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
159

    
160
        removeSocket(sender);
161
    }
162
}
163

    
164
void CurveDataServer::onReadyRead()
165
{
166
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
167

    
168
    QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
169
    auto it = clientOf(sender);
170
    if (it == _sockets.end()) {
171
        return;
172
    }
173
    Client& client = *it;
174

    
175
    while (sender->bytesAvailable()) {
176
        // read the message ID
177
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
178
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
179
            if (toRead > 0) {
180
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
181
            }
182
        }
183

    
184
        if (client.bufUsed < 2) {
185
            // no ID yet, wait for more data
186
            return;
187
        }
188

    
189
        uint16_t messageId = *((uint16_t*) client.buf);
190
        bool variableLength = false;
191
        qint64 contentSize;
192
        switch (messageId & 0xF000) {
193
        case 0x0000:
194
            contentSize = 1;
195
            break;
196
        case 0x1000:
197
            contentSize = 2;
198
            break;
199
        case 0x2000:
200
            contentSize = 4;
201
            break;
202
        case 0x3000:
203
            contentSize = 8;
204
            break;
205
        case 0x4000:
206
            contentSize = 16;
207
            break;
208
        case 0xF000:
209
            variableLength = true;
210
            contentSize = 0;
211
            break;
212
        default:
213
            // unsupported message size
214
            return;
215
        }
216

    
217
        size_t contentOffset = Client::MESSAGE_ID_SIZE;
218

    
219
        if (variableLength) {
220
            // read the message's content length
221
            qint64 toRead = qMin(
222
                        qint64(Client::MESSAGE_LENGTH_SIZE - (client.bufUsed - Client::MESSAGE_ID_SIZE)),
223
                        sender->bytesAvailable());
224
            if (toRead > 0) {
225
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
226
            }
227

    
228
            if (client.bufUsed < (Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE)) {
229
                // we do not yet know the content size, do not proceed any further
230
                return;
231
            }
232

    
233
            contentSize = qint64(*((uint32_t*) &client.buf[Client::MESSAGE_ID_SIZE]));
234
            contentOffset = Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE;
235
        }
236

    
237
        // read message content
238
        qint64 toRead = qMin(
239
                    qint64(contentSize - (client.bufUsed - contentOffset)),
240
                    sender->bytesAvailable());
241
        if (toRead > 0) {
242
            qint64 bufRemaining = qint64(Client::BUF_SIZE - client.bufUsed);
243
            if (toRead > bufRemaining) {
244
                // message would overflow the buffer - we'll read whatever fits and skip the rest
245
                if (bufRemaining > 0) {
246
                    qint64 actuallyRead = sender->read(&client.buf[client.bufUsed], bufRemaining);
247
                    client.bufUsed += actuallyRead;
248
                    toRead -= actuallyRead;
249
                }
250

    
251
                if (toRead > 0) {
252
                    sender->skip(toRead);
253
                }
254
            } else {
255
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
256
            }
257
        }
258

    
259
        if (client.bufUsed == (contentOffset + contentSize)) {
260
            handleMessage(client, messageId, &client.buf[contentOffset], contentSize);
261
            client.bufUsed = 0;
262
        }
263
    }
264
}
265

    
266
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content, size_t contentSize) {
267
    switch (messageId) {
268
    case 0x3001:
269
        handleProtocolMagic(client, (char *) content);
270
        break;
271
    case 0x2002:
272
        handleVersion(client, *((uint8_t*) content));
273
        break;
274
    case 0x3005:
275
        handlePong(client, (uint64_t*) content);
276
        break;
277
    case 0xF006:
278
        qDebug() << "EOT!";
279
        handleEOT(client, (char *) content, contentSize);
280
        break;
281
    }
282
}
283

    
284
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
285
{
286
    char *correct = "DeltaRVr";
287

    
288
    if (strncmp(content, correct, 8) == 0) {
289
        client.magic = true;
290
    }
291
    validateProtocol(client);
292
}
293

    
294
void CurveDataServer::handleVersion(Client& client, uint8_t content)
295
{
296
    if (((uint8_t*) &content)[0] == 1)
297
    {
298
        client.version = true;
299
    }
300
    validateProtocol(client);
301
}
302

    
303
void CurveDataServer::handlePong(Client& client, uint64_t* content)
304
{
305

    
306
    uint64_t time = currentTimeMillis();
307

    
308
    uint64_t number = *content;
309

    
310
    uint64_t timeSent = client.ping.at(number);
311

    
312
    client.ping.erase(number);
313

    
314
    client.latency = time - timeSent;
315
    client.lastPong = time;
316
}
317

    
318
void CurveDataServer::handleEOT(Client& client, char* content, size_t contentSize)
319
{
320
    char cstrContent[contentSize + 1];
321
    std::memcpy(cstrContent, content, contentSize);
322
    cstrContent[contentSize] = '\0';
323

    
324
    qDebug() << "Client" << client.socket->peerAddress() << "disconnected:" << cstrContent;
325
    terminateConnection(client, "Recieved EOT from client");
326
}
327

    
328

    
329
void CurveDataServer::validateProtocol(Client &client) {
330
    if (!client.protocolValid && client.magic && client.version) {
331
        client.protocolValid = true;
332
        qDebug() << "Client" << client.socket->peerAddress() << "has successfully connected.";
333
    }
334
}
335

    
336

    
337
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
338
{
339
    if (client.socket->isOpen())
340
    {
341
        sendEot(client.socket, std::forward<std::string&&>(reason));
342

    
343
        client.socket->close();
344
    }
345

    
346
    qDebug() << "Terminating connection to client" << client.socket->peerAddress() << "reason:" << reason.c_str();
347
    removeSocket(client.socket);
348
}
349

    
350
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
351
{
352
    QByteArray message = message128(0x4003,
353
                                    *((uint32_t*) &x),
354
                                    *((uint32_t*) &y),
355
                                    *((uint32_t*) &z));
356

    
357
    sendMessageToAllConnected(message);
358
}
359

    
360
void CurveDataServer::sendActualDirection(float x, float y, float z)
361
{
362
    QByteArray message = message128(0x4007,
363
                                    *((uint32_t*) &x),
364
                                    *((uint32_t*) &y),
365
                                    *((uint32_t*) &z));
366

    
367
    sendMessageToAllConnected(message);
368
}
369

    
370
void CurveDataServer::sendTargetDirection(float x, float y, float z)
371
{
372
    QByteArray message = message128(0x4008,
373
                                    *((uint32_t*) &x),
374
                                    *((uint32_t*) &y),
375
                                    *((uint32_t*) &z));
376

    
377
    sendMessageToAllConnected(message);
378
}
379

    
380
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
381
{
382
    int size = points.size();
383

    
384
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
385

    
386
    for (int i = 0; i < size; i++)
387
    {
388
        float xf = points[i].x();
389
        float yf = points[i].y();
390
        float zf = points[i].z();
391

    
392
        uint8_t *x = ((uint8_t*) &xf);
393
        uint8_t *y = ((uint8_t*) &yf);
394
        uint8_t *z = ((uint8_t*) &zf);
395

    
396
        message[2 + 4 + i*12 + 0] = x[0];
397
        message[2 + 4 + i*12 + 1] = x[1];
398
        message[2 + 4 + i*12 + 2] = x[2];
399
        message[2 + 4 + i*12 + 3] = x[3];
400

    
401
        message[2 + 4 + i*12 + 4] = y[0];
402
        message[2 + 4 + i*12 + 5] = y[1];
403
        message[2 + 4 + i*12 + 6] = y[2];
404
        message[2 + 4 + i*12 + 7] = y[3];
405

    
406
        message[2 + 4 + i*12 + 8] = z[0];
407
        message[2 + 4 + i*12 + 9] = z[1];
408
        message[2 + 4 + i*12 + 10] = z[2];
409
        message[2 + 4 + i*12 + 11] = z[3];
410
    }
411

    
412
    sendMessageToAllConnected(message);
413

    
414

    
415
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
416
    _currentCurve.clear();
417
    _currentCurve.append(points);
418
}
419

    
420
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
421
{
422
    QList<QVector3D> curve3d;
423
    int itemCount = points.size();
424

    
425
    curve3d.clear();
426
    if (itemCount > MAX_CURVE_SIZE)
427
        itemCount = MAX_CURVE_SIZE;
428
    for (int i = 0; i < itemCount; i++)
429
    {
430
        curve3d.append(points[i].toVector3D());
431
    }
432

    
433
    sendNewCurve(curve3d);
434
}
435

    
436
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
437
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
438
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
439
        if (it->socket == socket) {
440
            return it;
441
        }
442
    }
443
    return _sockets.end();
444
}
445

    
446
void CurveDataServer::sendPreamble(QTcpSocket *socket)
447
{
448
    sendProtocolMagic(socket);
449
    sendVersion(socket);
450
}
451

    
452
void CurveDataServer::sendProtocolMagic(QTcpSocket *socket)
453
{
454
    char *messageContent = "DeltaRVr";
455
    QByteArray message = message64(0x3001, *((uint64_t*) messageContent));
456

    
457
    socket->write(message);
458
}
459

    
460
void CurveDataServer::sendVersion(QTcpSocket *socket)
461
{
462
    QByteArray message = message32(0x2002, 1);
463

    
464
    socket->write(message);
465
}
466

    
467
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
468
{
469
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
470
    socket->write(message);
471
}
472

    
473
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
474
{
475
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
476
    for (auto& client : _sockets) {
477
        if (client.protocolValid)
478
        {
479
            client.socket->write(message);
480
        }
481
    }
482
}
483

    
484
void CurveDataServer::processPings()
485
{
486
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
487
    for (auto& client : _sockets) {
488

    
489
        if (client.protocolValid)
490
        {
491
            if (client.lastPong  < currentTimeMillis() - 10000)
492
            {
493
                terminateConnection(client, "Timeout");
494
                continue;
495
            }
496

    
497
            sendPing(client);
498

    
499
            std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
500
            if (!client.initialized && !_currentCurve.isEmpty())
501
            {
502
                sendCurve(client);
503
                client.initialized = true;
504
            }
505
        }
506
        else if (client.lastPong  < currentTimeMillis() - 10000)
507
        {
508
            terminateConnection(client, "Preamble timeout");
509
            continue;
510
        }
511
    }
512
}
513

    
514
void CurveDataServer::removeSocket(QTcpSocket *socket)
515
{
516
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
517
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
518
        if (it->socket == socket) {
519
            _sockets.erase(it);
520
            break;
521
        }
522
    }
523
}
524

    
525
void CurveDataServer::sendPing(Client& client)
526
{
527
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
528

    
529
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
530

    
531
    client.socket->write(message64(0x3004, (uint64_t) number));
532
}
533

    
534
void CurveDataServer::sendCurve(Client &client)
535
{
536
    int size = _currentCurve.size();
537

    
538
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
539

    
540
    for (int i = 0; i < size; i++)
541
    {
542
        float xf = _currentCurve[i].x();
543
        float yf = _currentCurve[i].y();
544
        float zf = _currentCurve[i].z();
545

    
546
        uint8_t *x = ((uint8_t*) &xf);
547
        uint8_t *y = ((uint8_t*) &yf);
548
        uint8_t *z = ((uint8_t*) &zf);
549

    
550
        message[2 + 4 + i*12 + 0] = x[0];
551
        message[2 + 4 + i*12 + 1] = x[1];
552
        message[2 + 4 + i*12 + 2] = x[2];
553
        message[2 + 4 + i*12 + 3] = x[3];
554

    
555
        message[2 + 4 + i*12 + 4] = y[0];
556
        message[2 + 4 + i*12 + 5] = y[1];
557
        message[2 + 4 + i*12 + 6] = y[2];
558
        message[2 + 4 + i*12 + 7] = y[3];
559

    
560
        message[2 + 4 + i*12 + 8] = z[0];
561
        message[2 + 4 + i*12 + 9] = z[1];
562
        message[2 + 4 + i*12 + 10] = z[2];
563
        message[2 + 4 + i*12 + 11] = z[3];
564
    }
565

    
566
    client.socket->write(message);
567
}
(2-2/19)