Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 9360266c

Přidáno uživatelem Jakub Hejman před téměř 4 roky(ů)

  • ID 9360266c1be920813d9c8f9016e2003a5f8ae1f1
  • Rodič 93572855

Re #8735 - Server data retrieval structure

Zobrazit rozdíly:

deltarobot/curvedataserver.cpp
6 6
#include <QVector4D>
7 7
#include <chrono>
8 8

  
9
#include "../element/trajectory.hpp"
9 10

  
10 11
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
11 12

  
......
101 102
    return result;
102 103
}
103 104

  
104
inline long currentTimeMillis() {
105
inline uint64_t currentTimeMillis() {
105 106
    return std::chrono::duration_cast<std::chrono::milliseconds>(
106 107
                std::chrono::steady_clock::now().time_since_epoch())
107 108
            .count();
......
121 122

  
122 123
CurveDataServer::CurveDataServer() :
123 124
    _server(this),
125
    _pingThread([this](){threadFunction();}),
124 126
    _sockets()
125 127
{
126 128
    _server.listen(QHostAddress::Any, 4242);
......
132 134
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
133 135
    for (auto& client : _sockets)
134 136
    {
135
        sendEot(client.socket, "Server is stopping");
136
        client.socket->close();
137
        terminateConnection(client, "Server is stopping");
137 138
    }
138 139
    _sockets.clear();
140

  
141
    _threadIsRunning = false;
139 142
}
140 143

  
141 144
void CurveDataServer::onNewConnection()
......
154 157
    {
155 158
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
156 159

  
157
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
158
        for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
159
            if (it->socket == sender) {
160
                _sockets.erase(it);
161
                break;
162
            }
163
        }
160
        removeSocket(sender);
164 161
    }
165 162
}
166 163

  
......
206 203
            break;
207 204
        case 0xF000:
208 205
            // TODO - implement support for variable length messages
209
            return;
206
            contentSize = 0;
207
            break;
210 208
        default:
211 209
            // unsupported message size
212 210
            return;
......
228 226
    switch (messageId) {
229 227
    case 0x3001:
230 228
        qDebug() << "Magic arrived!";
229
        handleProtocolMagic(client, (char *) content);
231 230
        break;
232 231
    case 0x2002:
233 232
        qDebug() << "Version arrived!";
233
        handleVersion(client, *((uint8_t*) content));
234 234
        break;
235 235
    case 0x3005:
236 236
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
237
        handlePong(client, content);
238
        break;
239
    case 0xF006:
240
        qDebug() << "EOT!";
241
        handleEOT(client, content);
237 242
        break;
238 243
    }
239 244
}
240 245

  
246
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
247
{
248
    char *correct = "DeltaRVr";
249

  
250
    if (strncmp(content, correct, 8) == 0) {
251
        client.magic = true;
252
    }
253
}
254

  
255
void CurveDataServer::handleVersion(Client& client, uint8_t content)
256
{
257
    if (((uint8_t*) &content)[0] == 1)
258
    {
259
        client.version = true;
260
    }
261
}
262

  
263
void CurveDataServer::handlePong(Client& client, void* content)
264
{
265
    long time = currentTimeMillis();
266

  
267
    long number = *((long*) content);
268

  
269
    long timeSent = client.ping.at(number);
270

  
271
    client.ping.erase(number);
272

  
273
    client.latency = time - timeSent;
274
    client.lastPong = time;
275
}
276

  
277
void CurveDataServer::handleEOT(Client& client, void* content)
278
{
279
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << content;
280
    terminateConnection(client, "Recieved EOT from client");
281
}
282

  
283

  
284
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
285
{
286
    removeSocket(client.socket);
287

  
288
    sendEot(client.socket, std::forward<std::string&&>(reason));
289

  
290
    client.socket->close();
291
}
292

  
241 293
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
242 294
{
243 295
    QByteArray message = message128(0x4003,
......
245 297
                                    *((uint32_t*) &y),
246 298
                                    *((uint32_t*) &z));
247 299

  
248
    {
249
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
250
        for (auto& client : _sockets) {
251
                client.socket->write(message);
252
        }
253
    }
300
    sendMessageToAllConnected(message);
254 301
}
255 302

  
256 303
void CurveDataServer::sendActualDirection(float x, float y, float z)
......
260 307
                                    *((uint32_t*) &y),
261 308
                                    *((uint32_t*) &z));
262 309

  
263
    {
264
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
265
        for (auto& client : _sockets) {
266
                client.socket->write(message);
267
        }
268
    }
310
    sendMessageToAllConnected(message);
269 311
}
270 312

  
271 313
void CurveDataServer::sendTargetDirection(float x, float y, float z)
......
275 317
                                    *((uint32_t*) &y),
276 318
                                    *((uint32_t*) &z));
277 319

  
278
    {
279
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
280
        for (auto& client : _sockets) {
281
                client.socket->write(message);
282
        }
283
    }
320
    sendMessageToAllConnected(message);
284 321
}
285 322

  
286 323
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
......
315 352
        message[2 + 4 + i*12 + 11] = z[3];
316 353
    }
317 354

  
318
    {
319
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
320
        for (auto& client : _sockets) {
321
                client.socket->write(message);
322
        }
323
    }
355
    sendMessageToAllConnected(message);
356

  
357

  
358
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
359
    _currentCurve.clear();
360
    _currentCurve.append(points);
324 361
}
325 362

  
326 363
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
......
363 400
    socket->write(message);
364 401

  
365 402
    qDebug() << "Sent Protocol Magic to" << socket->peerAddress();
366
    //qDebug() << message.toHex();
367 403
}
368 404

  
369 405
void CurveDataServer::sendVersion(QTcpSocket *socket)
......
373 409
    socket->write(message);
374 410

  
375 411
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
376
    //qDebug() << message.toHex();
377 412
}
378 413

  
379 414
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
......
381 416
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
382 417
    socket->write(message);
383 418
}
419

  
420

  
421

  
422
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
423
{
424
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
425
    for (auto& client : _sockets) {
426
        if (client.magic && client.version)
427
        {
428
            client.socket->write(message);
429
        }
430
    }
431
}
432

  
433
void CurveDataServer::threadFunction()
434
{
435
    while (_threadIsRunning)
436
    {
437
        // odeslat ping, popr krivku vsem, take kontrola odpojeni
438
        {
439
            std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
440
            for (auto& client : _sockets) {
441
                if (client.magic && client.version)
442
                {
443
                    if (client.lastPong  < currentTimeMillis() - 10000)
444
                    {
445
                        terminateConnection(client, "Timeout");
446
                    }
447

  
448
                    sendPing(client);
449

  
450
                    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
451
                    if (!client.initialized && !_currentCurve.isEmpty())
452
                    {
453
                        sendCurve(client);
454
                        client.initialized = true;
455
                    }
456
                }
457
                else if (client.lastPong  < currentTimeMillis() - 10000)
458
                {
459
                    terminateConnection(client, "Preamble timeout");
460
                }
461
            }
462
        }
463

  
464
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
465
    }
466
}
467

  
468
void CurveDataServer::removeSocket(QTcpSocket *socket)
469
{
470
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
471
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
472
        if (it->socket == socket) {
473
            _sockets.erase(it);
474
            break;
475
        }
476
    }
477
}
478

  
479
void CurveDataServer::sendPing(Client& client)
480
{
481
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
482

  
483
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
484
    //client.ping.insert({number, currentTimeMillis()});
485

  
486
    client.socket->write(message64(0x3004, (uint64_t) number));
487
}
488

  
489
void CurveDataServer::sendCurve(Client &client)
490
{
491
    int size = _currentCurve.size();
492

  
493
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
494

  
495
    for (int i = 0; i < size; i++)
496
    {
497
        float xf = _currentCurve[i].x();
498
        float yf = _currentCurve[i].y();
499
        float zf = _currentCurve[i].z();
500

  
501
        uint8_t *x = ((uint8_t*) &xf);
502
        uint8_t *y = ((uint8_t*) &yf);
503
        uint8_t *z = ((uint8_t*) &zf);
504

  
505
        message[2 + 4 + i*12 + 0] = x[0];
506
        message[2 + 4 + i*12 + 1] = x[1];
507
        message[2 + 4 + i*12 + 2] = x[2];
508
        message[2 + 4 + i*12 + 3] = x[3];
509

  
510
        message[2 + 4 + i*12 + 4] = y[0];
511
        message[2 + 4 + i*12 + 5] = y[1];
512
        message[2 + 4 + i*12 + 6] = y[2];
513
        message[2 + 4 + i*12 + 7] = y[3];
514

  
515
        message[2 + 4 + i*12 + 8] = z[0];
516
        message[2 + 4 + i*12 + 9] = z[1];
517
        message[2 + 4 + i*12 + 10] = z[2];
518
        message[2 + 4 + i*12 + 11] = z[3];
519
    }
520

  
521
    client.socket->write(message);
522
}
deltarobot/curvedataserver.h
6 6
#include <QTcpSocket>
7 7
#include <map>
8 8
#include <mutex>
9

  
10

  
11
#include "../element/trajectory.hpp" //obsahuje MAX_CURVE_SIZE kterou se ridi sendNewCurve 4D
9
#include <thread>
12 10

  
13 11
class CurveDataServer : QObject
14 12
{
......
16 14

  
17 15
public:
18 16
    struct Client {
19
        static const size_t BUF_SIZE = 32;
17
        static const size_t BUF_SIZE = 4096;
20 18
        static const size_t MESSAGE_ID_SIZE = 2;
21 19

  
22 20
        QTcpSocket* socket;
23
        std::map<long, long> ping;  ///< key: ping ID;  value: ping timestamp (msec);
24
        long latency;               ///< client's latency
25
        long lastPong;              ///< timestamp of last pong
21
        std::map<uint64_t, uint64_t> ping;  ///< key: ping ID;  value: ping timestamp (msec);
22
        uint64_t latency;               ///< client's latency
23
        uint64_t lastPong;              ///< timestamp of last pong
26 24

  
27 25
        char buf[BUF_SIZE];
28 26
        size_t bufUsed;
29 27

  
28
        bool magic = false;
29
        bool version = false;
30
        bool initialized = false;
31

  
30 32
        Client(QTcpSocket* socket);
31 33
    };
32 34

  
......
38 40
    void sendTargetDirection(float x, float y, float z);
39 41
    void sendNewCurve(QList<QVector4D> &points);
40 42
    void sendNewCurve(QList<QVector3D> &points);
43
    void sendCurve(Client &client);
41 44

  
42 45
public slots:
43 46
    void onNewConnection();
......
48 51
    QList<Client>::iterator clientOf(QTcpSocket *socket);
49 52

  
50 53
    void handleMessage(Client& client, uint16_t messageId, void* content);
54
    void handleProtocolMagic(Client& client, char* content);
55
    void handleVersion(Client& client, uint8_t content);
56
    void handlePong(Client& client, void* content);
57
    void handleEOT(Client& client, void* content);
58

  
59
    void validPreamble(Client& client);
60
    void removeSocket(QTcpSocket *socket);
61
    void terminateConnection(Client& client, std::string&& reason);
62
    void sendPing(Client& client);
51 63

  
52 64
    void sendPreamble(QTcpSocket *socket);
53 65
    void sendProtocolMagic(QTcpSocket *socket);
54 66
    void sendVersion(QTcpSocket *socket);
55 67
    void sendEot(QTcpSocket *socket, std::string&& message);
56 68

  
69
    void sendMessageToAllConnected(QByteArray &message);
70

  
71
    void threadFunction();
72

  
57 73
    QTcpServer _server;
74
    std::thread _pingThread;
75

  
76
    volatile bool _threadIsRunning = true;
58 77

  
59 78
    std::recursive_mutex _socketsLock;
60 79
    QList<Client> _sockets;
80

  
81
    std::recursive_mutex _currentCurveLock;
82
    QList<QVector3D> _currentCurve;
61 83
};
62 84

  
63 85
#endif // CURVEDATASERVER_H

Také k dispozici: Unified diff