Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 6d9543aa

Přidáno uživatelem Jakub Hejman před téměř 4 roky(ů)

Re #8899 - Server data reception

Zobrazit rozdíly:

deltarobot-vr/Assets/DeltaRobotVr/Client.cs
36 36
        private const UInt16 ProtocolMagicId = 0x3001;
37 37
        private const UInt16 ProtocolVersionId = 0x2002;
38 38
        private const UInt16 PingId = 0x3004;
39
        private const UInt16 PongId = 0x3005;
39 40
        private const UInt16 EotId = 0xF006;
40 41
        private const UInt16 CurrentActuatorPositionId = 0x4003;
41 42
        private const UInt16 CurrentDirectionVectorId = 0x4007;
......
416 417
        private void ProcessPing(BinaryReader reader, BinaryWriter writer)
417 418
        {
418 419
            UInt64 pingValue = reader.ReadUInt64();
420

  
421
            writer.Write(PongId);
419 422
            writer.Write(pingValue);
420 423
        }
421 424
    }
deltarobot/curvedataserver.cpp
6 6
#include <QVector4D>
7 7
#include <chrono>
8 8

  
9
#include "../element/trajectory.hpp"
9 10

  
10 11
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
11 12

  
......
101 102
    return result;
102 103
}
103 104

  
104
inline long currentTimeMillis() {
105
inline uint64_t currentTimeMillis() {
105 106
    return std::chrono::duration_cast<std::chrono::milliseconds>(
106 107
                std::chrono::steady_clock::now().time_since_epoch())
107 108
            .count();
......
121 122

  
122 123
CurveDataServer::CurveDataServer() :
123 124
    _server(this),
125
    _pingThread([this](){threadFunction();}),
124 126
    _sockets()
125 127
{
126 128
    _server.listen(QHostAddress::Any, 4242);
......
132 134
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
133 135
    for (auto& client : _sockets)
134 136
    {
135
        sendEot(client.socket, "Server is stopping");
136
        client.socket->close();
137
        terminateConnection(client, "Server is stopping");
137 138
    }
138 139
    _sockets.clear();
140

  
141
    _threadIsRunning = false;
139 142
}
140 143

  
141 144
void CurveDataServer::onNewConnection()
......
154 157
    {
155 158
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
156 159

  
157
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
158
        for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
159
            if (it->socket == sender) {
160
                _sockets.erase(it);
161
                break;
162
            }
163
        }
160
        removeSocket(sender);
164 161
    }
165 162
}
166 163

  
......
206 203
            break;
207 204
        case 0xF000:
208 205
            // TODO - implement support for variable length messages
209
            return;
206
            contentSize = 0;
207
            break;
210 208
        default:
211 209
            // unsupported message size
212 210
            return;
......
228 226
    switch (messageId) {
229 227
    case 0x3001:
230 228
        qDebug() << "Magic arrived!";
229
        handleProtocolMagic(client, (char *) content);
231 230
        break;
232 231
    case 0x2002:
233 232
        qDebug() << "Version arrived!";
233
        handleVersion(client, *((uint8_t*) content));
234 234
        break;
235 235
    case 0x3005:
236 236
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
237
        handlePong(client, (uint64_t*) content);
237 238
        break;
239
    case 0xF006:
240
        qDebug() << "EOT!";
241
        handleEOT(client, (char *) content);
242
        break;
243
    }
244
}
245

  
246
void CurveDataServer::handleProtocolMagic(Client& client, char *content)
247
{
248
    char *correct = "DeltaRVr";
249

  
250
    if (strncmp(content, correct, 8) == 0) {
251
        client.magic = true;
238 252
    }
239 253
}
240 254

  
255
void CurveDataServer::handleVersion(Client& client, uint8_t content)
256
{
257
    if (((uint8_t*) &content)[0] == 1)
258
    {
259
        client.version = true;
260
    }
261
}
262

  
263
void CurveDataServer::handlePong(Client& client, uint64_t* content)
264
{
265

  
266
    uint64_t time = currentTimeMillis();
267

  
268
    uint64_t number = *content;
269

  
270
    uint64_t timeSent = client.ping.at(number);
271

  
272
    client.ping.erase(number);
273

  
274
    client.latency = time - timeSent;
275
    client.lastPong = time;
276
    //qDebug() << "prisel pong" << number;
277
}
278

  
279
void CurveDataServer::handleEOT(Client& client, char* content)
280
{
281
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << content;
282
    terminateConnection(client, "Recieved EOT from client");
283
}
284

  
285

  
286
void CurveDataServer::terminateConnection(Client& client, std::string&& reason)
287
{
288
    if (client.socket->isOpen())
289
    {
290
        sendEot(client.socket, std::forward<std::string&&>(reason));
291

  
292
        client.socket->close();
293
    }
294

  
295
    removeSocket(client.socket);
296
}
297

  
241 298
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
242 299
{
243 300
    QByteArray message = message128(0x4003,
......
245 302
                                    *((uint32_t*) &y),
246 303
                                    *((uint32_t*) &z));
247 304

  
248
    {
249
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
250
        for (auto& client : _sockets) {
251
                client.socket->write(message);
252
        }
253
    }
305
    sendMessageToAllConnected(message);
254 306
}
255 307

  
256 308
void CurveDataServer::sendActualDirection(float x, float y, float z)
......
260 312
                                    *((uint32_t*) &y),
261 313
                                    *((uint32_t*) &z));
262 314

  
263
    {
264
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
265
        for (auto& client : _sockets) {
266
                client.socket->write(message);
267
        }
268
    }
315
    sendMessageToAllConnected(message);
269 316
}
270 317

  
271 318
void CurveDataServer::sendTargetDirection(float x, float y, float z)
......
275 322
                                    *((uint32_t*) &y),
276 323
                                    *((uint32_t*) &z));
277 324

  
278
    {
279
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
280
        for (auto& client : _sockets) {
281
                client.socket->write(message);
282
        }
283
    }
325
    sendMessageToAllConnected(message);
284 326
}
285 327

  
286 328
void CurveDataServer::sendNewCurve(QList<QVector3D> &points)
......
315 357
        message[2 + 4 + i*12 + 11] = z[3];
316 358
    }
317 359

  
318
    {
319
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
320
        for (auto& client : _sockets) {
321
                client.socket->write(message);
322
        }
323
    }
360
    sendMessageToAllConnected(message);
361

  
362

  
363
    std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
364
    _currentCurve.clear();
365
    _currentCurve.append(points);
324 366
}
325 367

  
326 368
void CurveDataServer::sendNewCurve(QList<QVector4D> &points)
......
363 405
    socket->write(message);
364 406

  
365 407
    qDebug() << "Sent Protocol Magic to" << socket->peerAddress();
366
    //qDebug() << message.toHex();
367 408
}
368 409

  
369 410
void CurveDataServer::sendVersion(QTcpSocket *socket)
......
373 414
    socket->write(message);
374 415

  
375 416
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
376
    //qDebug() << message.toHex();
377 417
}
378 418

  
379 419
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
......
381 421
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
382 422
    socket->write(message);
383 423
}
424

  
425

  
426

  
427
void CurveDataServer::sendMessageToAllConnected(QByteArray &message)
428
{
429
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
430
    for (auto& client : _sockets) {
431
        if (client.magic && client.version)
432
        {
433
            client.socket->write(message);
434
        }
435
    }
436
}
437

  
438
void CurveDataServer::threadFunction()
439
{
440
    while (_threadIsRunning)
441
    {
442
        //processPings();
443

  
444
        // TODO tim se zpravilo varovani se socketama
445

  
446
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
447
    }
448
}
449

  
450
void CurveDataServer::processPings()
451
{
452
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
453
    for (auto& client : _sockets) {
454

  
455
        if (client.magic && client.version)
456
        {
457
            if (client.lastPong  < currentTimeMillis() - 10000)
458
            {
459
                terminateConnection(client, "Timeout");
460
                continue;
461
            }
462

  
463
            sendPing(client);
464

  
465
            std::lock_guard<decltype (_currentCurveLock)> l(_currentCurveLock);
466
            if (!client.initialized && !_currentCurve.isEmpty())
467
            {
468
                sendCurve(client);
469
                client.initialized = true;
470
            }
471
        }
472
        else if (client.lastPong  < currentTimeMillis() - 10000)
473
        {
474
            terminateConnection(client, "Preamble timeout");
475
            continue;
476
        }
477
    }
478
}
479

  
480
void CurveDataServer::removeSocket(QTcpSocket *socket)
481
{
482
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
483
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
484
        if (it->socket == socket) {
485
            _sockets.erase(it);
486
            break;
487
        }
488
    }
489
}
490

  
491
void CurveDataServer::sendPing(Client& client)
492
{
493
    uint64_t number = (((uint64_t) rand()) << 32) + (uint64_t) rand();
494

  
495
    //qDebug() << "posilam ping" << ((qint64) number) << ".";
496

  
497
    client.ping.emplace(std::make_pair(number, currentTimeMillis()));
498
    //client.ping.insert({number, currentTimeMillis()});
499

  
500
    client.socket->write(message64(0x3004, (uint64_t) number));
501
}
502

  
503
void CurveDataServer::sendCurve(Client &client)
504
{
505
    int size = _currentCurve.size();
506

  
507
    QByteArray message = messageVarLength(0xF009, (uint32_t) size * 12);
508

  
509
    for (int i = 0; i < size; i++)
510
    {
511
        float xf = _currentCurve[i].x();
512
        float yf = _currentCurve[i].y();
513
        float zf = _currentCurve[i].z();
514

  
515
        uint8_t *x = ((uint8_t*) &xf);
516
        uint8_t *y = ((uint8_t*) &yf);
517
        uint8_t *z = ((uint8_t*) &zf);
518

  
519
        message[2 + 4 + i*12 + 0] = x[0];
520
        message[2 + 4 + i*12 + 1] = x[1];
521
        message[2 + 4 + i*12 + 2] = x[2];
522
        message[2 + 4 + i*12 + 3] = x[3];
523

  
524
        message[2 + 4 + i*12 + 4] = y[0];
525
        message[2 + 4 + i*12 + 5] = y[1];
526
        message[2 + 4 + i*12 + 6] = y[2];
527
        message[2 + 4 + i*12 + 7] = y[3];
528

  
529
        message[2 + 4 + i*12 + 8] = z[0];
530
        message[2 + 4 + i*12 + 9] = z[1];
531
        message[2 + 4 + i*12 + 10] = z[2];
532
        message[2 + 4 + i*12 + 11] = z[3];
533
    }
534

  
535
    client.socket->write(message);
536
}
deltarobot/curvedataserver.h
6 6
#include <QTcpSocket>
7 7
#include <map>
8 8
#include <mutex>
9

  
10

  
11
#include "../element/trajectory.hpp" //obsahuje MAX_CURVE_SIZE kterou se ridi sendNewCurve 4D
9
#include <thread>
12 10

  
13 11
class CurveDataServer : QObject
14 12
{
......
16 14

  
17 15
public:
18 16
    struct Client {
19
        static const size_t BUF_SIZE = 32;
17
        static const size_t BUF_SIZE = 4096;
20 18
        static const size_t MESSAGE_ID_SIZE = 2;
21 19

  
22 20
        QTcpSocket* socket;
23
        std::map<long, long> ping;  ///< key: ping ID;  value: ping timestamp (msec);
24
        long latency;               ///< client's latency
25
        long lastPong;              ///< timestamp of last pong
21
        std::map<uint64_t, uint64_t> ping;  ///< key: ping ID;  value: ping timestamp (msec);
22
        uint64_t latency;               ///< client's latency
23
        uint64_t lastPong;              ///< timestamp of last pong
26 24

  
27 25
        char buf[BUF_SIZE];
28 26
        size_t bufUsed;
29 27

  
28
        bool magic = false;
29
        bool version = false;
30
        bool initialized = false;
31

  
30 32
        Client(QTcpSocket* socket);
31 33
    };
32 34

  
......
38 40
    void sendTargetDirection(float x, float y, float z);
39 41
    void sendNewCurve(QList<QVector4D> &points);
40 42
    void sendNewCurve(QList<QVector3D> &points);
43
    void sendCurve(Client &client);
44

  
45
    void processPings();            ///< send pings, check timeouts
41 46

  
42 47
public slots:
43 48
    void onNewConnection();
......
48 53
    QList<Client>::iterator clientOf(QTcpSocket *socket);
49 54

  
50 55
    void handleMessage(Client& client, uint16_t messageId, void* content);
56
    void handleProtocolMagic(Client& client, char* content);
57
    void handleVersion(Client& client, uint8_t content);
58
    void handlePong(Client& client, uint64_t* content);
59
    void handleEOT(Client& client, char* content);
60

  
61
    void validPreamble(Client& client);
62
    void removeSocket(QTcpSocket *socket);
63
    void terminateConnection(Client& client, std::string&& reason);
64
    void sendPing(Client& client);
51 65

  
52 66
    void sendPreamble(QTcpSocket *socket);
53 67
    void sendProtocolMagic(QTcpSocket *socket);
54 68
    void sendVersion(QTcpSocket *socket);
55 69
    void sendEot(QTcpSocket *socket, std::string&& message);
56 70

  
71
    void sendMessageToAllConnected(QByteArray &message);
72

  
73
    void threadFunction();
74

  
57 75
    QTcpServer _server;
76
    std::thread _pingThread;
77

  
78
    volatile bool _threadIsRunning = true;
58 79

  
59 80
    std::recursive_mutex _socketsLock;
60 81
    QList<Client> _sockets;
82

  
83
    std::recursive_mutex _currentCurveLock;
84
    QList<QVector3D> _currentCurve;
61 85
};
62 86

  
63 87
#endif // CURVEDATASERVER_H
deltarobot/mainwindow.cpp
1074 1074
    cdServer->sendActuatorPosition(spherePos.x(), spherePos.y(), spherePos.z());
1075 1075
    cdServer->sendActualDirection(actualArrow.x(), actualArrow.y(), actualArrow.z());
1076 1076
    cdServer->sendTargetDirection(dirArrow.x(), dirArrow.y(), dirArrow.z());
1077
    cdServer->processPings();
1077 1078
}
1078 1079

  
1079 1080
//! Příjem dat ze stroje

Také k dispozici: Unified diff