Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 6d9543aa

Přidáno uživatelem Jakub Hejman před téměř 4 roky(ů)

Re #8899 - Server data reception

Zobrazit rozdíly:

deltarobot/curvedataserver.cpp
6 6
#include <QVector4D>
7 7
#include <chrono>
8 8

  
9
#include "../element/trajectory.hpp"
9 10

  
10 11
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
11 12

  
......
101 102
    return result;
102 103
}
103 104

  
104
inline long currentTimeMillis() {
105
inline uint64_t currentTimeMillis() {
105 106
    return std::chrono::duration_cast<std::chrono::milliseconds>(
106 107
                std::chrono::steady_clock::now().time_since_epoch())
107 108
            .count();
......
121 122

  
122 123
CurveDataServer::CurveDataServer() :
123 124
    _server(this),
125
    _pingThread([this](){threadFunction();}),
124 126
    _sockets()
125 127
{
126 128
    _server.listen(QHostAddress::Any, 4242);
......
132 134
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
133 135
    for (auto& client : _sockets)
134 136
    {
135
        sendEot(client.socket, "Server is stopping");
136
        client.socket->close();
137
        terminateConnection(client, "Server is stopping");
137 138
    }
138 139
    _sockets.clear();
140

  
141
    _threadIsRunning = false;
139 142
}
140 143

  
141 144
void CurveDataServer::onNewConnection()
......
154 157
    {
155 158
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
156 159

  
157
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
158
        for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
159
            if (it->socket == sender) {
160
                _sockets.erase(it);
161
                break;
162
            }
163
        }
160
        removeSocket(sender);
164 161
    }
165 162
}
166 163

  
......
206 203
            break;
207 204
        case 0xF000:
208 205
            // TODO - implement support for variable length messages
209
            return;
206
            contentSize = 0;
207
            break;
210 208
        default:
211 209
            // unsupported message size
212 210
            return;
......
228 226
    switch (messageId) {
229 227
    case 0x3001:
230 228
        qDebug() << "Magic arrived!";
229
        handleProtocolMagic(client, (char *) content);
231 230
        break;
232 231
    case 0x2002:
233 232
        qDebug() << "Version arrived!";
233
        handleVersion(client, *((uint8_t*) content));
234 234
        break;
235 235
    case 0x3005:
236 236
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
237
        handlePong(client, (uint64_t*) content);
237 238
        break;
239
    case 0xF006:
240
        qDebug() << "EOT!";
241
        handleEOT(client, (char *) content);
242
        break;
243
    }
244
}
245

  
246
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
247
{
248
    char *correct = "DeltaRVr";
249

  
250
    if (strncmp(content, correct, 8) == 0) {
251
        client.magic = true;
238 252
    }
239 253
}
240 254

  
255
void CurveDataServer::handleVersion(Client& client, uint8_t content)
256
{
257
    if (((uint8_t*) &content)[0] == 1)
258
    {
259
        client.version = true;
260
    }
261
}
262

  
263
void CurveDataServer::handlePong(Client& client, uint64_t* content)
264
{
265

  
266
    uint64_t time = currentTimeMillis();
267

  
268
    uint64_t number = *content;
269

  
270
    uint64_t timeSent = client.ping.at(number);
271

  
272
    client.ping.erase(number);
273

  
274
    client.latency = time - timeSent;
275
    client.lastPong = time;
276
    //qDebug() << "prisel pong" << number;
277
}
278

  
279
void CurveDataServer::handleEOT(Client& client, char* content)
280
{
281
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << content;
282
    terminateConnection(client, "Recieved EOT from client");
283
}
284

  
285

  
286
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
287
{
288
    if (client.socket->isOpen())
289
    {
290
        sendEot(client.socket, std::forward<std::string&&>(reason));
291

  
292
        client.socket->close();
293
    }
294

  
295
    removeSocket(client.socket);
296
}
297

  
241 298
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
242 299
{
243 300
    QByteArray message = message128(0x4003,
......
245 302
                                    *((uint32_t*) &y),
246 303
                                    *((uint32_t*) &z));
247 304

  
248
    {
249
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
250
        for (auto& client : _sockets) {
251
                client.socket->write(message);
252
        }
253
    }
305
    sendMessageToAllConnected(message);
254 306
}
255 307

  
256 308
void CurveDataServer::sendActualDirection(float x, float y, float z)
......
260 312
                                    *((uint32_t*) &y),
261 313
                                    *((uint32_t*) &z));
262 314

  
263
    {
264
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
265
        for (auto& client : _sockets) {
266
                client.socket->write(message);
267
        }
268
    }
315
    sendMessageToAllConnected(message);
269 316
}
270 317

  
271 318
void CurveDataServer::sendTargetDirection(float x, float y, float z)
......
275 322
                                    *((uint32_t*) &y),
276 323
                                    *((uint32_t*) &z));
277 324

  
278
    {
279
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
280
        for (auto& client : _sockets) {
281
                client.socket->write(message);
282
        }
283
    }
325
    sendMessageToAllConnected(message);
284 326
}
285 327

  
286 328
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
......
315 357
        message[2 + 4 + i*12 + 11] = z[3];
316 358
    }
317 359

  
318
    {
319
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
320
        for (auto& client : _sockets) {
321
                client.socket->write(message);
322
        }
323
    }
360
    sendMessageToAllConnected(message);
361

  
362

  
363
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
364
    _currentCurve.clear();
365
    _currentCurve.append(points);
324 366
}
325 367

  
326 368
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
......
363 405
    socket->write(message);
364 406

  
365 407
    qDebug() << "Sent Protocol Magic to" << socket->peerAddress();
366
    //qDebug() << message.toHex();
367 408
}
368 409

  
369 410
void CurveDataServer::sendVersion(QTcpSocket *socket)
......
373 414
    socket->write(message);
374 415

  
375 416
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
376
    //qDebug() << message.toHex();
377 417
}
378 418

  
379 419
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
......
381 421
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
382 422
    socket->write(message);
383 423
}
424

  
425

  
426

  
427
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
428
{
429
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
430
    for (auto& client : _sockets) {
431
        if (client.magic && client.version)
432
        {
433
            client.socket->write(message);
434
        }
435
    }
436
}
437

  
438
void CurveDataServer::threadFunction()
439
{
440
    while (_threadIsRunning)
441
    {
442
        //processPings();
443

  
444
        // TODO tim se zpravilo varovani se socketama
445

  
446
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
447
    }
448
}
449

  
450
void CurveDataServer::processPings()
451
{
452
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
453
    for (auto& client : _sockets) {
454

  
455
        if (client.magic && client.version)
456
        {
457
            if (client.lastPong  < currentTimeMillis() - 10000)
458
            {
459
                terminateConnection(client, "Timeout");
460
                continue;
461
            }
462

  
463
            sendPing(client);
464

  
465
            std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
466
            if (!client.initialized && !_currentCurve.isEmpty())
467
            {
468
                sendCurve(client);
469
                client.initialized = true;
470
            }
471
        }
472
        else if (client.lastPong  < currentTimeMillis() - 10000)
473
        {
474
            terminateConnection(client, "Preamble timeout");
475
            continue;
476
        }
477
    }
478
}
479

  
480
void CurveDataServer::removeSocket(QTcpSocket *socket)
481
{
482
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
483
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
484
        if (it->socket == socket) {
485
            _sockets.erase(it);
486
            break;
487
        }
488
    }
489
}
490

  
491
void CurveDataServer::sendPing(Client& client)
492
{
493
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
494

  
495
    //qDebug() << "posilam ping" << ((qint64) number) << ".";
496

  
497
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
498
    //client.ping.insert({number, currentTimeMillis()});
499

  
500
    client.socket->write(message64(0x3004, (uint64_t) number));
501
}
502

  
503
void CurveDataServer::sendCurve(Client &client)
504
{
505
    int size = _currentCurve.size();
506

  
507
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
508

  
509
    for (int i = 0; i < size; i++)
510
    {
511
        float xf = _currentCurve[i].x();
512
        float yf = _currentCurve[i].y();
513
        float zf = _currentCurve[i].z();
514

  
515
        uint8_t *x = ((uint8_t*) &xf);
516
        uint8_t *y = ((uint8_t*) &yf);
517
        uint8_t *z = ((uint8_t*) &zf);
518

  
519
        message[2 + 4 + i*12 + 0] = x[0];
520
        message[2 + 4 + i*12 + 1] = x[1];
521
        message[2 + 4 + i*12 + 2] = x[2];
522
        message[2 + 4 + i*12 + 3] = x[3];
523

  
524
        message[2 + 4 + i*12 + 4] = y[0];
525
        message[2 + 4 + i*12 + 5] = y[1];
526
        message[2 + 4 + i*12 + 6] = y[2];
527
        message[2 + 4 + i*12 + 7] = y[3];
528

  
529
        message[2 + 4 + i*12 + 8] = z[0];
530
        message[2 + 4 + i*12 + 9] = z[1];
531
        message[2 + 4 + i*12 + 10] = z[2];
532
        message[2 + 4 + i*12 + 11] = z[3];
533
    }
534

  
535
    client.socket->write(message);
536
}

Také k dispozici: Unified diff