Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 0cda8e8d

Přidáno uživatelem Oto Šťáva před téměř 4 roky(ů)

Re #8735 - Implement base server message retrieval

Zobrazit rozdíly:

deltarobot/curvedataserver.cpp
3 3
#include <QDebug>
4 4
#include <QHostAddress>
5 5
#include <QAbstractSocket>
6
#include <chrono>
7

  
8

  
9
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
6 10

  
7 11
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
8 12
    QByteArray result(2 + 1, 0);
......
79 83
    return result;
80 84
}
81 85

  
82
static QByteArray messageVarLength(uint16_t messageId, uint32_t length) {
86
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
83 87
    QByteArray result(2 + 4 + length, 0);
84 88
    result[0] = ((uint8_t*) &messageId)[0];
85 89
    result[1] = ((uint8_t*) &messageId)[1];
......
87 91
    result[3] = ((uint8_t*) &length)[1];
88 92
    result[4] = ((uint8_t*) &length)[2];
89 93
    result[5] = ((uint8_t*) &length)[3];
94

  
95
    if (content) {
96
        for (uint32_t i = 0; i < length; i++) {
97
            result[6 + i] = content[i];
98
        }
99
    }
90 100
    return result;
91 101
}
92 102

  
103
inline long currentTimeMillis() {
104
    return std::chrono::duration_cast<std::chrono::milliseconds>(
105
                std::chrono::steady_clock::now().time_since_epoch())
106
            .count();
107
}
108

  
109

  
110
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
111

  
112
CurveDataServer::Client::Client(QTcpSocket* socket) :
113
    socket(socket),
114
    lastPong(currentTimeMillis()),
115
    bufUsed(0)
116
{}
117

  
118

  
119
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
120

  
93 121
CurveDataServer::CurveDataServer() :
94 122
    _server(this),
95 123
    _sockets()
......
98 126
    connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
99 127
}
100 128

  
129
CurveDataServer::~CurveDataServer()
130
{
131
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
132
    for (auto& client : _sockets)
133
    {
134
        sendEot(client.socket, "Server is stopping");
135
        client.socket->close();
136
    }
137
    _sockets.clear();
138
}
139

  
101 140
void CurveDataServer::onNewConnection()
102 141
{
103 142
    QTcpSocket *clientSocket = _server.nextPendingConnection();
104 143
    connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
105 144
    connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
106 145

  
107
    _sockets.push_back(clientSocket);
108

  
109 146
    sendPreamble(clientSocket);
147
    _sockets.push_back(Client(clientSocket));
110 148
}
111 149

  
112 150
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
......
114 152
    if (socketState == QAbstractSocket::UnconnectedState)
115 153
    {
116 154
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
117
        _sockets.removeOne(sender);
155

  
156
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
157
        for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
158
            if (it->socket == sender) {
159
                _sockets.erase(it);
160
                break;
161
            }
162
        }
118 163
    }
119 164
}
120 165

  
121 166
void CurveDataServer::onReadyRead()
122 167
{
123
    // tady actually nakonec switch zpracovani ruznych zprav dle identifikatoru
168
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
169

  
124 170
    QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
125
    QByteArray data = sender->readAll();
126
    for (QTcpSocket* socket : _sockets) {
127
        if (socket != sender)
128
            socket->write(QByteArray::fromStdString(sender->peerAddress().toString().toStdString() + ": " + data.toStdString()));
171
    auto it = clientOf(sender);
172
    if (it == _sockets.end()) {
173
        return;
174
    }
175
    Client& client = *it;
176

  
177
    while (sender->bytesAvailable()) {
178
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
179
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
180
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
181
        }
182

  
183
        if (client.bufUsed < 2) {
184
            // no ID yet, wait for more data
185
            return;
186
        }
187

  
188
        uint16_t messageId = *((uint16_t*) client.buf);
189
        qint64 contentSize;
190
        switch (messageId & 0xF000) {
191
        case 0x0000:
192
            contentSize = 1;
193
            break;
194
        case 0x1000:
195
            contentSize = 2;
196
            break;
197
        case 0x2000:
198
            contentSize = 4;
199
            break;
200
        case 0x3000:
201
            contentSize = 8;
202
            break;
203
        case 0x4000:
204
            contentSize = 16;
205
            break;
206
        case 0xF000:
207
            // TODO - implement support for variable length messages
208
            return;
209
        default:
210
            // unsupported message size
211
            return;
212
        }
213

  
214
        qint64 toRead = qMin(qint64(contentSize - (client.bufUsed - Client::MESSAGE_ID_SIZE)), sender->bytesAvailable());
215
        if (toRead > 0) {
216
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
217
        }
218

  
219
        if (client.bufUsed == (Client::MESSAGE_ID_SIZE + contentSize)) {
220
            handleMessage(client, messageId, &client.buf[Client::MESSAGE_ID_SIZE]);
221
            client.bufUsed = 0;
222
        }
129 223
    }
130 224
}
131 225

  
132
QByteArray *CurveDataServer::putFloatIntoMessage(QByteArray *message, int index, float f)
133
{
134
    if (!message || message->size() < index + 4)
135
        return message;
136

  
137
    unsigned char *c = reinterpret_cast<unsigned char *>(&f);
138

  
139
    for (int i = 0; i < 4; i++)
140
    {
141
        (*message)[index + i] = c[i];
226
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content) {
227
    switch (messageId) {
228
    case 0x3001:
229
        qDebug() << "Magic arrived!";
230
        break;
231
    case 0x2002:
232
        qDebug() << "Version arrived!";
233
        break;
234
    case 0x3005:
235
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
236
        break;
142 237
    }
143

  
144
    return message;
145 238
}
146 239

  
147

  
148 240
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
149 241
{
150 242
    QByteArray message = message128(0x4003,
......
152 244
                                    *((uint32_t*) &y),
153 245
                                    *((uint32_t*) &z));
154 246

  
155
    for (QTcpSocket* socket : _sockets) {
156
            socket->write(message);
247
    {
248
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
249
        for (auto& client : _sockets) {
250
                client.socket->write(message);
251
        }
157 252
    }
158
    //qDebug() << message.toHex();
159
    //qDebug() << x << " " << y << " " << z;
160 253
}
161 254

  
255
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
256
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
257
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
258
        if (it->socket == socket) {
259
            return it;
260
        }
261
    }
262
    return _sockets.end();
263
}
162 264

  
163 265
void CurveDataServer::sendPreamble(QTcpSocket *socket)
164 266
{
......
186 288
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
187 289
    //qDebug() << message.toHex();
188 290
}
291

  
292
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
293
{
294
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
295
    socket->write(message);
296
}
deltarobot/curvedataserver.h
4 4
#include <QObject>
5 5
#include <QTcpServer>
6 6
#include <QTcpSocket>
7
#include <map>
8
#include <mutex>
7 9

  
8 10
class CurveDataServer : QObject
9 11
{
10 12
    Q_OBJECT
11 13

  
12 14
public:
13
    CurveDataServer();
15
    struct Client {
16
        static const size_t BUF_SIZE = 32;
17
        static const size_t MESSAGE_ID_SIZE = 2;
18

  
19
        QTcpSocket* socket;
20
        std::map<long, long> ping;  ///< key: ping ID;  value: ping timestamp (msec);
21
        long latency;               ///< client's latency
22
        long lastPong;              ///< timestamp of last pong
23

  
24
        char buf[BUF_SIZE];
25
        size_t bufUsed;
26

  
27
        Client(QTcpSocket* socket);
28
    };
29

  
30
    explicit CurveDataServer();
31
    ~CurveDataServer();
32

  
14 33
    void sendActuatorPosition(float x, float y, float z);
15 34

  
16 35
public slots:
......
19 38
    void onReadyRead();
20 39

  
21 40
private:
22
    QByteArray *putFloatIntoMessage(QByteArray *message, int index, float f);
41
    QList<Client>::iterator clientOf(QTcpSocket *socket);
42

  
43
    void handleMessage(Client& client, uint16_t messageId, void* content);
44

  
23 45
    void sendPreamble(QTcpSocket *socket);
24 46
    void sendProtocolMagic(QTcpSocket *socket);
25 47
    void sendVersion(QTcpSocket *socket);
48
    void sendEot(QTcpSocket *socket, std::string&& message);
26 49

  
27 50
    QTcpServer _server;
28
    QList<QTcpSocket*> _sockets;
51

  
52
    std::recursive_mutex _socketsLock;
53
    QList<Client> _sockets;
29 54
};
30 55

  
31 56
#endif // CURVEDATASERVER_H

Také k dispozici: Unified diff