Projekt

Obecné

Profil

Stáhnout (15.1 KB) Statistiky
| Větev: | Tag: | Revize:
1
#include "curvedataserver.h"
2

    
3
#include <QDebug>
4
#include <QHostAddress>
5
#include <QAbstractSocket>
6
#include <QVector4D>
7
#include <chrono>
8

    
9
#include "../element/trajectory.hpp"
10

    
11
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
12

    
13
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
14
    QByteArray result(2 + 1, 0);
15
    result[0] = ((uint8_t*) &messageId)[0];
16
    result[1] = ((uint8_t*) &messageId)[1];
17
    result[2] = content;
18
    return result;
19
}
20

    
21
static QByteArray message16(uint16_t messageId, uint16_t content = 0) {
22
    QByteArray result(2 + 2, 0);
23
    result[0] = ((uint8_t*) &messageId)[0];
24
    result[1] = ((uint8_t*) &messageId)[1];
25
    result[2] = ((uint8_t*) &content)[0];
26
    result[3] = ((uint8_t*) &content)[1];
27
    return result;
28
}
29

    
30
static QByteArray message32(uint16_t messageId, uint32_t content = 0) {
31
    QByteArray result(2 + 4, 0);
32
    result[0] = ((uint8_t*) &messageId)[0];
33
    result[1] = ((uint8_t*) &messageId)[1];
34
    result[2] = ((uint8_t*) &content)[0];
35
    result[3] = ((uint8_t*) &content)[1];
36
    result[4] = ((uint8_t*) &content)[2];
37
    result[5] = ((uint8_t*) &content)[3];
38
    return result;
39
}
40

    
41
static QByteArray message64(uint16_t messageId, uint64_t content = 0) {
42
    QByteArray result(2 + 8, 0);
43
    result[0] = ((uint8_t*) &messageId)[0];
44
    result[1] = ((uint8_t*) &messageId)[1];
45
    result[2] = ((uint8_t*) &content)[0];
46
    result[3] = ((uint8_t*) &content)[1];
47
    result[4] = ((uint8_t*) &content)[2];
48
    result[5] = ((uint8_t*) &content)[3];
49
    result[6] = ((uint8_t*) &content)[4];
50
    result[7] = ((uint8_t*) &content)[5];
51
    result[8] = ((uint8_t*) &content)[6];
52
    result[9] = ((uint8_t*) &content)[7];
53
    return result;
54
}
55

    
56
static QByteArray message128(uint16_t messageId,
57
                             uint32_t contentX = 0,
58
                             uint32_t contentY = 0,
59
                             uint32_t contentZ = 0,
60
                             uint32_t contentW = 0)
61
{
62
    QByteArray result(2 + 16, 0);
63
    result[0] = ((uint8_t*) &messageId)[0];
64
    result[1] = ((uint8_t*) &messageId)[1];
65

    
66
    result[2] = ((uint8_t*) &contentX)[0];
67
    result[3] = ((uint8_t*) &contentX)[1];
68
    result[4] = ((uint8_t*) &contentX)[2];
69
    result[5] = ((uint8_t*) &contentX)[3];
70

    
71
    result[6] = ((uint8_t*) &contentY)[0];
72
    result[7] = ((uint8_t*) &contentY)[1];
73
    result[8] = ((uint8_t*) &contentY)[2];
74
    result[9] = ((uint8_t*) &contentY)[3];
75

    
76
    result[10] = ((uint8_t*) &contentZ)[0];
77
    result[11] = ((uint8_t*) &contentZ)[1];
78
    result[12] = ((uint8_t*) &contentZ)[2];
79
    result[13] = ((uint8_t*) &contentZ)[3];
80

    
81
    result[14] = ((uint8_t*) &contentW)[0];
82
    result[15] = ((uint8_t*) &contentW)[1];
83
    result[16] = ((uint8_t*) &contentW)[2];
84
    result[17] = ((uint8_t*) &contentW)[3];
85
    return result;
86
}
87

    
88
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
89
    QByteArray result(2 + 4 + length, 0);
90
    result[0] = ((uint8_t*) &messageId)[0];
91
    result[1] = ((uint8_t*) &messageId)[1];
92
    result[2] = ((uint8_t*) &length)[0];
93
    result[3] = ((uint8_t*) &length)[1];
94
    result[4] = ((uint8_t*) &length)[2];
95
    result[5] = ((uint8_t*) &length)[3];
96

    
97
    if (content) {
98
        for (uint32_t i = 0; i < length; i++) {
99
            result[6 + i] = content[i];
100
        }
101
    }
102
    return result;
103
}
104

    
105
inline uint64_t currentTimeMillis() {
106
    return std::chrono::duration_cast<std::chrono::milliseconds>(
107
                std::chrono::steady_clock::now().time_since_epoch())
108
            .count();
109
}
110

    
111

    
112
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
113

    
114
CurveDataServer::Client::Client(QTcpSocket* socket) :
115
    socket(socket),
116
    lastPong(currentTimeMillis()),
117
    bufUsed(0)
118
{}
119

    
120

    
121
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
122

    
123
CurveDataServer::CurveDataServer() :
124
    _server(this),
125
    _pingThread([this](){threadFunction();}),
126
    _sockets()
127
{
128
    _server.listen(QHostAddress::Any, 4242);
129
    connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
130
}
131

    
132
CurveDataServer::~CurveDataServer()
133
{
134
    _threadIsRunning = false;
135
    _pingThread.join();
136

    
137
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
138
    for (auto& client : _sockets)
139
    {
140
        terminateConnection(client, "Server is stopping");
141
    }
142
    _sockets.clear();
143
}
144

    
145
void CurveDataServer::onNewConnection()
146
{
147
    QTcpSocket *clientSocket = _server.nextPendingConnection();
148
    connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
149
    connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
150

    
151
    sendPreamble(clientSocket);
152
    _sockets.push_back(Client(clientSocket));
153
}
154

    
155
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
156
{
157
    if (socketState == QAbstractSocket::UnconnectedState)
158
    {
159
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
160

    
161
        removeSocket(sender);
162
    }
163
}
164

    
165
void CurveDataServer::onReadyRead()
166
{
167
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
168

    
169
    QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
170
    auto it = clientOf(sender);
171
    if (it == _sockets.end()) {
172
        return;
173
    }
174
    Client& client = *it;
175

    
176
    while (sender->bytesAvailable()) {
177
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
178
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
179
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
180
        }
181

    
182
        if (client.bufUsed < 2) {
183
            // no ID yet, wait for more data
184
            return;
185
        }
186

    
187
        uint16_t messageId = *((uint16_t*) client.buf);
188
        qint64 contentSize;
189
        switch (messageId & 0xF000) {
190
        case 0x0000:
191
            contentSize = 1;
192
            break;
193
        case 0x1000:
194
            contentSize = 2;
195
            break;
196
        case 0x2000:
197
            contentSize = 4;
198
            break;
199
        case 0x3000:
200
            contentSize = 8;
201
            break;
202
        case 0x4000:
203
            contentSize = 16;
204
            break;
205
        case 0xF000:
206
            // TODO - implement support for variable length messages
207
            contentSize = 0;
208
            break;
209
        default:
210
            // unsupported message size
211
            return;
212
        }
213

    
214
        qint64 toRead = qMin(qint64(contentSize - (client.bufUsed - Client::MESSAGE_ID_SIZE)), sender->bytesAvailable());
215
        if (toRead > 0) {
216
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
217
        }
218

    
219
        if (client.bufUsed == (Client::MESSAGE_ID_SIZE + contentSize)) {
220
            handleMessage(client, messageId, &client.buf[Client::MESSAGE_ID_SIZE]);
221
            client.bufUsed = 0;
222
        }
223
    }
224
}
225

    
226
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content) {
227
    switch (messageId) {
228
    case 0x3001:
229
        qDebug() << "Magic arrived!";
230
        handleProtocolMagic(client, (char *) content);
231
        break;
232
    case 0x2002:
233
        qDebug() << "Version arrived!";
234
        handleVersion(client, *((uint8_t*) content));
235
        break;
236
    case 0x3005:
237
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
238
        handlePong(client, (uint64_t*) content);
239
        break;
240
    case 0xF006:
241
        qDebug() << "EOT!";
242
        handleEOT(client, (char *) content);
243
        break;
244
    }
245
}
246

    
247
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
248
{
249
    char *correct = "DeltaRVr";
250

    
251
    if (strncmp(content, correct, 8) == 0) {
252
        client.magic = true;
253
    }
254
}
255

    
256
void CurveDataServer::handleVersion(Client& client, uint8_t content)
257
{
258
    if (((uint8_t*) &content)[0] == 1)
259
    {
260
        client.version = true;
261
    }
262
}
263

    
264
void CurveDataServer::handlePong(Client& client, uint64_t* content)
265
{
266

    
267
    uint64_t time = currentTimeMillis();
268

    
269
    uint64_t number = *content;
270

    
271
    uint64_t timeSent = client.ping.at(number);
272

    
273
    client.ping.erase(number);
274

    
275
    client.latency = time - timeSent;
276
    client.lastPong = time;
277
    //qDebug() << "prisel pong" << number;
278
}
279

    
280
void CurveDataServer::handleEOT(Client& client, char* content)
281
{
282
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << content;
283
    terminateConnection(client, "Recieved EOT from client");
284
}
285

    
286

    
287
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
288
{
289
    if (client.socket->isOpen())
290
    {
291
        sendEot(client.socket, std::forward<std::string&&>(reason));
292

    
293
        client.socket->close();
294
    }
295

    
296
    removeSocket(client.socket);
297
}
298

    
299
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
300
{
301
    QByteArray message = message128(0x4003,
302
                                    *((uint32_t*) &x),
303
                                    *((uint32_t*) &y),
304
                                    *((uint32_t*) &z));
305

    
306
    sendMessageToAllConnected(message);
307
}
308

    
309
void CurveDataServer::sendActualDirection(float x, float y, float z)
310
{
311
    QByteArray message = message128(0x4007,
312
                                    *((uint32_t*) &x),
313
                                    *((uint32_t*) &y),
314
                                    *((uint32_t*) &z));
315

    
316
    sendMessageToAllConnected(message);
317
}
318

    
319
void CurveDataServer::sendTargetDirection(float x, float y, float z)
320
{
321
    QByteArray message = message128(0x4008,
322
                                    *((uint32_t*) &x),
323
                                    *((uint32_t*) &y),
324
                                    *((uint32_t*) &z));
325

    
326
    sendMessageToAllConnected(message);
327
}
328

    
329
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
330
{
331
    int size = points.size();
332

    
333
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
334

    
335
    for (int i = 0; i < size; i++)
336
    {
337
        float xf = points[i].x();
338
        float yf = points[i].y();
339
        float zf = points[i].z();
340

    
341
        uint8_t *x = ((uint8_t*) &xf);
342
        uint8_t *y = ((uint8_t*) &yf);
343
        uint8_t *z = ((uint8_t*) &zf);
344

    
345
        message[2 + 4 + i*12 + 0] = x[0];
346
        message[2 + 4 + i*12 + 1] = x[1];
347
        message[2 + 4 + i*12 + 2] = x[2];
348
        message[2 + 4 + i*12 + 3] = x[3];
349

    
350
        message[2 + 4 + i*12 + 4] = y[0];
351
        message[2 + 4 + i*12 + 5] = y[1];
352
        message[2 + 4 + i*12 + 6] = y[2];
353
        message[2 + 4 + i*12 + 7] = y[3];
354

    
355
        message[2 + 4 + i*12 + 8] = z[0];
356
        message[2 + 4 + i*12 + 9] = z[1];
357
        message[2 + 4 + i*12 + 10] = z[2];
358
        message[2 + 4 + i*12 + 11] = z[3];
359
    }
360

    
361
    sendMessageToAllConnected(message);
362

    
363

    
364
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
365
    _currentCurve.clear();
366
    _currentCurve.append(points);
367
}
368

    
369
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
370
{
371
    QList<QVector3D> curve3d;
372
    int itemCount = points.size();
373

    
374
    curve3d.clear();
375
    if (itemCount > MAX_CURVE_SIZE)
376
        itemCount = MAX_CURVE_SIZE;
377
    for (int i = 0; i < itemCount; i++)
378
    {
379
        curve3d.append(points[i].toVector3D());
380
    }
381

    
382
    sendNewCurve(curve3d);
383
}
384

    
385
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
386
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
387
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
388
        if (it->socket == socket) {
389
            return it;
390
        }
391
    }
392
    return _sockets.end();
393
}
394

    
395
void CurveDataServer::sendPreamble(QTcpSocket *socket)
396
{
397
    sendProtocolMagic(socket);
398
    sendVersion(socket);
399
}
400

    
401
void CurveDataServer::sendProtocolMagic(QTcpSocket *socket)
402
{
403
    char *messageContent = "DeltaRVr";
404
    QByteArray message = message64(0x3001, *((uint64_t*) messageContent));
405

    
406
    socket->write(message);
407

    
408
    qDebug() << "Sent Protocol Magic to" << socket->peerAddress();
409
}
410

    
411
void CurveDataServer::sendVersion(QTcpSocket *socket)
412
{
413
    QByteArray message = message32(0x2002, 1);
414

    
415
    socket->write(message);
416

    
417
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
418
}
419

    
420
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
421
{
422
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
423
    socket->write(message);
424
}
425

    
426

    
427

    
428
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
429
{
430
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
431
    for (auto& client : _sockets) {
432
        if (client.magic && client.version)
433
        {
434
            client.socket->write(message);
435
        }
436
    }
437
}
438

    
439
void CurveDataServer::threadFunction()
440
{
441
    /*
442
    while (_threadIsRunning)
443
    {
444
        //processPings();
445

    
446
        // TODO tim se zpravilo varovani se socketama
447

    
448
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
449
    }
450
    */
451
}
452

    
453
void CurveDataServer::processPings()
454
{
455
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
456
    for (auto& client : _sockets) {
457

    
458
        if (client.magic && client.version)
459
        {
460
            if (client.lastPong  < currentTimeMillis() - 10000)
461
            {
462
                terminateConnection(client, "Timeout");
463
                continue;
464
            }
465

    
466
            sendPing(client);
467

    
468
            std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
469
            if (!client.initialized && !_currentCurve.isEmpty())
470
            {
471
                sendCurve(client);
472
                client.initialized = true;
473
            }
474
        }
475
        else if (client.lastPong  < currentTimeMillis() - 10000)
476
        {
477
            terminateConnection(client, "Preamble timeout");
478
            continue;
479
        }
480
    }
481
}
482

    
483
void CurveDataServer::removeSocket(QTcpSocket *socket)
484
{
485
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
486
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
487
        if (it->socket == socket) {
488
            _sockets.erase(it);
489
            break;
490
        }
491
    }
492
}
493

    
494
void CurveDataServer::sendPing(Client& client)
495
{
496
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
497

    
498
    //qDebug() << "posilam ping" << ((qint64) number) << ".";
499

    
500
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
501
    //client.ping.insert({number, currentTimeMillis()});
502

    
503
    client.socket->write(message64(0x3004, (uint64_t) number));
504
}
505

    
506
void CurveDataServer::sendCurve(Client &client)
507
{
508
    int size = _currentCurve.size();
509

    
510
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
511

    
512
    for (int i = 0; i < size; i++)
513
    {
514
        float xf = _currentCurve[i].x();
515
        float yf = _currentCurve[i].y();
516
        float zf = _currentCurve[i].z();
517

    
518
        uint8_t *x = ((uint8_t*) &xf);
519
        uint8_t *y = ((uint8_t*) &yf);
520
        uint8_t *z = ((uint8_t*) &zf);
521

    
522
        message[2 + 4 + i*12 + 0] = x[0];
523
        message[2 + 4 + i*12 + 1] = x[1];
524
        message[2 + 4 + i*12 + 2] = x[2];
525
        message[2 + 4 + i*12 + 3] = x[3];
526

    
527
        message[2 + 4 + i*12 + 4] = y[0];
528
        message[2 + 4 + i*12 + 5] = y[1];
529
        message[2 + 4 + i*12 + 6] = y[2];
530
        message[2 + 4 + i*12 + 7] = y[3];
531

    
532
        message[2 + 4 + i*12 + 8] = z[0];
533
        message[2 + 4 + i*12 + 9] = z[1];
534
        message[2 + 4 + i*12 + 10] = z[2];
535
        message[2 + 4 + i*12 + 11] = z[3];
536
    }
537

    
538
    client.socket->write(message);
539
}
(2-2/19)