Projekt

Obecné

Profil

Stáhnout (16.8 KB) Statistiky
| Větev: | Tag: | Revize:
1
#include "curvedataserver.h"
2

    
3
#include <QDebug>
4
#include <QHostAddress>
5
#include <QAbstractSocket>
6
#include <QVector4D>
7
#include <chrono>
8
#include <string>
9
#include <cstring>
10

    
11
#include "../element/trajectory.hpp"
12

    
13
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
14

    
15
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
16
    QByteArray result(2 + 1, 0);
17
    result[0] = ((uint8_t*) &messageId)[0];
18
    result[1] = ((uint8_t*) &messageId)[1];
19
    result[2] = content;
20
    return result;
21
}
22

    
23
static QByteArray message16(uint16_t messageId, uint16_t content = 0) {
24
    QByteArray result(2 + 2, 0);
25
    result[0] = ((uint8_t*) &messageId)[0];
26
    result[1] = ((uint8_t*) &messageId)[1];
27
    result[2] = ((uint8_t*) &content)[0];
28
    result[3] = ((uint8_t*) &content)[1];
29
    return result;
30
}
31

    
32
static QByteArray message32(uint16_t messageId, uint32_t content = 0) {
33
    QByteArray result(2 + 4, 0);
34
    result[0] = ((uint8_t*) &messageId)[0];
35
    result[1] = ((uint8_t*) &messageId)[1];
36
    result[2] = ((uint8_t*) &content)[0];
37
    result[3] = ((uint8_t*) &content)[1];
38
    result[4] = ((uint8_t*) &content)[2];
39
    result[5] = ((uint8_t*) &content)[3];
40
    return result;
41
}
42

    
43
static QByteArray message64(uint16_t messageId, uint64_t content = 0) {
44
    QByteArray result(2 + 8, 0);
45
    result[0] = ((uint8_t*) &messageId)[0];
46
    result[1] = ((uint8_t*) &messageId)[1];
47
    result[2] = ((uint8_t*) &content)[0];
48
    result[3] = ((uint8_t*) &content)[1];
49
    result[4] = ((uint8_t*) &content)[2];
50
    result[5] = ((uint8_t*) &content)[3];
51
    result[6] = ((uint8_t*) &content)[4];
52
    result[7] = ((uint8_t*) &content)[5];
53
    result[8] = ((uint8_t*) &content)[6];
54
    result[9] = ((uint8_t*) &content)[7];
55
    return result;
56
}
57

    
58
static QByteArray message128(uint16_t messageId,
59
                             uint32_t contentX = 0,
60
                             uint32_t contentY = 0,
61
                             uint32_t contentZ = 0,
62
                             uint32_t contentW = 0)
63
{
64
    QByteArray result(2 + 16, 0);
65
    result[0] = ((uint8_t*) &messageId)[0];
66
    result[1] = ((uint8_t*) &messageId)[1];
67

    
68
    result[2] = ((uint8_t*) &contentX)[0];
69
    result[3] = ((uint8_t*) &contentX)[1];
70
    result[4] = ((uint8_t*) &contentX)[2];
71
    result[5] = ((uint8_t*) &contentX)[3];
72

    
73
    result[6] = ((uint8_t*) &contentY)[0];
74
    result[7] = ((uint8_t*) &contentY)[1];
75
    result[8] = ((uint8_t*) &contentY)[2];
76
    result[9] = ((uint8_t*) &contentY)[3];
77

    
78
    result[10] = ((uint8_t*) &contentZ)[0];
79
    result[11] = ((uint8_t*) &contentZ)[1];
80
    result[12] = ((uint8_t*) &contentZ)[2];
81
    result[13] = ((uint8_t*) &contentZ)[3];
82

    
83
    result[14] = ((uint8_t*) &contentW)[0];
84
    result[15] = ((uint8_t*) &contentW)[1];
85
    result[16] = ((uint8_t*) &contentW)[2];
86
    result[17] = ((uint8_t*) &contentW)[3];
87
    return result;
88
}
89

    
90
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
91
    QByteArray result(2 + 4 + length, 0);
92
    result[0] = ((uint8_t*) &messageId)[0];
93
    result[1] = ((uint8_t*) &messageId)[1];
94
    result[2] = ((uint8_t*) &length)[0];
95
    result[3] = ((uint8_t*) &length)[1];
96
    result[4] = ((uint8_t*) &length)[2];
97
    result[5] = ((uint8_t*) &length)[3];
98

    
99
    if (content) {
100
        for (uint32_t i = 0; i < length; i++) {
101
            result[6 + i] = content[i];
102
        }
103
    }
104
    return result;
105
}
106

    
107
inline uint64_t currentTimeMillis() {
108
    return std::chrono::duration_cast<std::chrono::milliseconds>(
109
                std::chrono::steady_clock::now().time_since_epoch())
110
            .count();
111
}
112

    
113

    
114
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
115

    
116
CurveDataServer::Client::Client(QTcpSocket* socket) :
117
    socket(socket),
118
    lastPong(currentTimeMillis()),
119
    bufUsed(0)
120
{}
121

    
122

    
123
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
124

    
125
CurveDataServer::CurveDataServer() :
126
    _server(this),
127
    _pingThread([this](){threadFunction();}),
128
    _sockets()
129
{
130
    _server.listen(QHostAddress::Any, 4242);
131
    connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
132
}
133

    
134
CurveDataServer::~CurveDataServer()
135
{
136
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
137
    for (auto& client : _sockets)
138
    {
139
        terminateConnection(client, "Server is stopping");
140
    }
141
    _sockets.clear();
142

    
143
    _threadIsRunning = false;
144
}
145

    
146
void CurveDataServer::onNewConnection()
147
{
148
    QTcpSocket *clientSocket = _server.nextPendingConnection();
149
    connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
150
    connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
151

    
152
    sendPreamble(clientSocket);
153
    _sockets.push_back(Client(clientSocket));
154
}
155

    
156
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
157
{
158
    if (socketState == QAbstractSocket::UnconnectedState)
159
    {
160
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
161

    
162
        removeSocket(sender);
163
    }
164
}
165

    
166
void CurveDataServer::onReadyRead()
167
{
168
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
169

    
170
    QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
171
    auto it = clientOf(sender);
172
    if (it == _sockets.end()) {
173
        return;
174
    }
175
    Client& client = *it;
176

    
177
    while (sender->bytesAvailable()) {
178
        // read the message ID
179
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
180
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
181
            if (toRead > 0) {
182
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
183
            }
184
        }
185

    
186
        if (client.bufUsed < 2) {
187
            // no ID yet, wait for more data
188
            return;
189
        }
190

    
191
        uint16_t messageId = *((uint16_t*) client.buf);
192
        bool variableLength = false;
193
        qint64 contentSize;
194
        switch (messageId & 0xF000) {
195
        case 0x0000:
196
            contentSize = 1;
197
            break;
198
        case 0x1000:
199
            contentSize = 2;
200
            break;
201
        case 0x2000:
202
            contentSize = 4;
203
            break;
204
        case 0x3000:
205
            contentSize = 8;
206
            break;
207
        case 0x4000:
208
            contentSize = 16;
209
            break;
210
        case 0xF000:
211
            variableLength = true;
212
            contentSize = 0;
213
            break;
214
        default:
215
            // unsupported message size
216
            return;
217
        }
218

    
219
        size_t contentOffset = Client::MESSAGE_ID_SIZE;
220

    
221
        if (variableLength) {
222
            // read the message's content length
223
            qint64 toRead = qMin(
224
                        qint64(Client::MESSAGE_LENGTH_SIZE - (client.bufUsed - Client::MESSAGE_ID_SIZE)),
225
                        sender->bytesAvailable());
226
            if (toRead > 0) {
227
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
228
            }
229

    
230
            if (client.bufUsed < (Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE)) {
231
                // we do not yet know the content size, do not proceed any further
232
                return;
233
            }
234

    
235
            contentSize = qint64(*((uint32_t*) &client.buf[Client::MESSAGE_ID_SIZE]));
236
            contentOffset = Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE;
237
        }
238

    
239
        // read message content
240
        qint64 toRead = qMin(
241
                    qint64(contentSize - (client.bufUsed - contentOffset)),
242
                    sender->bytesAvailable());
243
        if (toRead > 0) {
244
            qint64 bufRemaining = qint64(Client::BUF_SIZE - client.bufUsed);
245
            if (toRead > bufRemaining) {
246
                // message would overflow the buffer - we'll read whatever fits and skip the rest
247
                if (bufRemaining > 0) {
248
                    qint64 actuallyRead = sender->read(&client.buf[client.bufUsed], bufRemaining);
249
                    client.bufUsed += actuallyRead;
250
                    toRead -= actuallyRead;
251
                }
252

    
253
                if (toRead > 0) {
254
                    sender->skip(toRead);
255
                }
256
            } else {
257
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
258
            }
259
        }
260

    
261
        if (client.bufUsed == (contentOffset + contentSize)) {
262
            handleMessage(client, messageId, &client.buf[contentOffset], contentSize);
263
            client.bufUsed = 0;
264
        }
265
    }
266
}
267

    
268
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content, size_t contentSize) {
269
    switch (messageId) {
270
    case 0x3001:
271
        qDebug() << "Magic arrived!";
272
        handleProtocolMagic(client, (char *) content);
273
        break;
274
    case 0x2002:
275
        qDebug() << "Version arrived!";
276
        handleVersion(client, *((uint8_t*) content));
277
        break;
278
    case 0x3005:
279
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
280
        handlePong(client, (uint64_t*) content);
281
        break;
282
    case 0xF006:
283
        qDebug() << "EOT!";
284
        handleEOT(client, (char *) content, contentSize);
285
        break;
286
    }
287
}
288

    
289
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
290
{
291
    char *correct = "DeltaRVr";
292

    
293
    if (strncmp(content, correct, 8) == 0) {
294
        client.magic = true;
295
    }
296
}
297

    
298
void CurveDataServer::handleVersion(Client& client, uint8_t content)
299
{
300
    if (((uint8_t*) &content)[0] == 1)
301
    {
302
        client.version = true;
303
    }
304
}
305

    
306
void CurveDataServer::handlePong(Client& client, uint64_t* content)
307
{
308

    
309
    uint64_t time = currentTimeMillis();
310

    
311
    uint64_t number = *content;
312

    
313
    uint64_t timeSent = client.ping.at(number);
314

    
315
    client.ping.erase(number);
316

    
317
    client.latency = time - timeSent;
318
    client.lastPong = time;
319
    //qDebug() << "prisel pong" << number;
320
}
321

    
322
void CurveDataServer::handleEOT(Client& client, char* content, size_t contentSize)
323
{
324
    char cstrContent[contentSize + 1];
325
    std::memcpy(cstrContent, content, contentSize);
326
    cstrContent[contentSize] = '\0';
327

    
328
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << cstrContent;
329
    terminateConnection(client, "Recieved EOT from client");
330
}
331

    
332

    
333
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
334
{
335
    if (client.socket->isOpen())
336
    {
337
        sendEot(client.socket, std::forward<std::string&&>(reason));
338

    
339
        client.socket->close();
340
    }
341

    
342
    removeSocket(client.socket);
343
}
344

    
345
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
346
{
347
    QByteArray message = message128(0x4003,
348
                                    *((uint32_t*) &x),
349
                                    *((uint32_t*) &y),
350
                                    *((uint32_t*) &z));
351

    
352
    sendMessageToAllConnected(message);
353
}
354

    
355
void CurveDataServer::sendActualDirection(float x, float y, float z)
356
{
357
    QByteArray message = message128(0x4007,
358
                                    *((uint32_t*) &x),
359
                                    *((uint32_t*) &y),
360
                                    *((uint32_t*) &z));
361

    
362
    sendMessageToAllConnected(message);
363
}
364

    
365
void CurveDataServer::sendTargetDirection(float x, float y, float z)
366
{
367
    QByteArray message = message128(0x4008,
368
                                    *((uint32_t*) &x),
369
                                    *((uint32_t*) &y),
370
                                    *((uint32_t*) &z));
371

    
372
    sendMessageToAllConnected(message);
373
}
374

    
375
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
376
{
377
    int size = points.size();
378

    
379
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
380

    
381
    for (int i = 0; i < size; i++)
382
    {
383
        float xf = points[i].x();
384
        float yf = points[i].y();
385
        float zf = points[i].z();
386

    
387
        uint8_t *x = ((uint8_t*) &xf);
388
        uint8_t *y = ((uint8_t*) &yf);
389
        uint8_t *z = ((uint8_t*) &zf);
390

    
391
        message[2 + 4 + i*12 + 0] = x[0];
392
        message[2 + 4 + i*12 + 1] = x[1];
393
        message[2 + 4 + i*12 + 2] = x[2];
394
        message[2 + 4 + i*12 + 3] = x[3];
395

    
396
        message[2 + 4 + i*12 + 4] = y[0];
397
        message[2 + 4 + i*12 + 5] = y[1];
398
        message[2 + 4 + i*12 + 6] = y[2];
399
        message[2 + 4 + i*12 + 7] = y[3];
400

    
401
        message[2 + 4 + i*12 + 8] = z[0];
402
        message[2 + 4 + i*12 + 9] = z[1];
403
        message[2 + 4 + i*12 + 10] = z[2];
404
        message[2 + 4 + i*12 + 11] = z[3];
405
    }
406

    
407
    sendMessageToAllConnected(message);
408

    
409

    
410
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
411
    _currentCurve.clear();
412
    _currentCurve.append(points);
413
}
414

    
415
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
416
{
417
    QList<QVector3D> curve3d;
418
    int itemCount = points.size();
419

    
420
    curve3d.clear();
421
    if (itemCount > MAX_CURVE_SIZE)
422
        itemCount = MAX_CURVE_SIZE;
423
    for (int i = 0; i < itemCount; i++)
424
    {
425
        curve3d.append(points[i].toVector3D());
426
    }
427

    
428
    sendNewCurve(curve3d);
429
}
430

    
431
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
432
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
433
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
434
        if (it->socket == socket) {
435
            return it;
436
        }
437
    }
438
    return _sockets.end();
439
}
440

    
441
void CurveDataServer::sendPreamble(QTcpSocket *socket)
442
{
443
    sendProtocolMagic(socket);
444
    sendVersion(socket);
445
}
446

    
447
void CurveDataServer::sendProtocolMagic(QTcpSocket *socket)
448
{
449
    char *messageContent = "DeltaRVr";
450
    QByteArray message = message64(0x3001, *((uint64_t*) messageContent));
451

    
452
    socket->write(message);
453

    
454
    qDebug() << "Sent Protocol Magic to" << socket->peerAddress();
455
}
456

    
457
void CurveDataServer::sendVersion(QTcpSocket *socket)
458
{
459
    QByteArray message = message32(0x2002, 1);
460

    
461
    socket->write(message);
462

    
463
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
464
}
465

    
466
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
467
{
468
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
469
    socket->write(message);
470
}
471

    
472

    
473

    
474
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
475
{
476
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
477
    for (auto& client : _sockets) {
478
        if (client.magic && client.version)
479
        {
480
            client.socket->write(message);
481
        }
482
    }
483
}
484

    
485
void CurveDataServer::threadFunction()
486
{
487
    while (_threadIsRunning)
488
    {
489
        //processPings();
490

    
491
        // TODO tim se zpravilo varovani se socketama
492

    
493
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
494
    }
495
}
496

    
497
void CurveDataServer::processPings()
498
{
499
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
500
    for (auto& client : _sockets) {
501

    
502
        if (client.magic && client.version)
503
        {
504
            if (client.lastPong  < currentTimeMillis() - 10000)
505
            {
506
                terminateConnection(client, "Timeout");
507
                continue;
508
            }
509

    
510
            sendPing(client);
511

    
512
            std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
513
            if (!client.initialized && !_currentCurve.isEmpty())
514
            {
515
                sendCurve(client);
516
                client.initialized = true;
517
            }
518
        }
519
        else if (client.lastPong  < currentTimeMillis() - 10000)
520
        {
521
            terminateConnection(client, "Preamble timeout");
522
            continue;
523
        }
524
    }
525
}
526

    
527
void CurveDataServer::removeSocket(QTcpSocket *socket)
528
{
529
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
530
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
531
        if (it->socket == socket) {
532
            _sockets.erase(it);
533
            break;
534
        }
535
    }
536
}
537

    
538
void CurveDataServer::sendPing(Client& client)
539
{
540
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
541

    
542
    //qDebug() << "posilam ping" << ((qint64) number) << ".";
543

    
544
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
545
    //client.ping.insert({number, currentTimeMillis()});
546

    
547
    client.socket->write(message64(0x3004, (uint64_t) number));
548
}
549

    
550
void CurveDataServer::sendCurve(Client &client)
551
{
552
    int size = _currentCurve.size();
553

    
554
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
555

    
556
    for (int i = 0; i < size; i++)
557
    {
558
        float xf = _currentCurve[i].x();
559
        float yf = _currentCurve[i].y();
560
        float zf = _currentCurve[i].z();
561

    
562
        uint8_t *x = ((uint8_t*) &xf);
563
        uint8_t *y = ((uint8_t*) &yf);
564
        uint8_t *z = ((uint8_t*) &zf);
565

    
566
        message[2 + 4 + i*12 + 0] = x[0];
567
        message[2 + 4 + i*12 + 1] = x[1];
568
        message[2 + 4 + i*12 + 2] = x[2];
569
        message[2 + 4 + i*12 + 3] = x[3];
570

    
571
        message[2 + 4 + i*12 + 4] = y[0];
572
        message[2 + 4 + i*12 + 5] = y[1];
573
        message[2 + 4 + i*12 + 6] = y[2];
574
        message[2 + 4 + i*12 + 7] = y[3];
575

    
576
        message[2 + 4 + i*12 + 8] = z[0];
577
        message[2 + 4 + i*12 + 9] = z[1];
578
        message[2 + 4 + i*12 + 10] = z[2];
579
        message[2 + 4 + i*12 + 11] = z[3];
580
    }
581

    
582
    client.socket->write(message);
583
}
(2-2/19)