Projekt

Obecné

Profil

Stáhnout (8.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
#include "curvedataserver.h"
2

    
3
#include <QDebug>
4
#include <QHostAddress>
5
#include <QAbstractSocket>
6
#include <chrono>
7

    
8

    
9
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
10

    
11
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
12
    QByteArray result(2 + 1, 0);
13
    result[0] = ((uint8_t*) &messageId)[0];
14
    result[1] = ((uint8_t*) &messageId)[1];
15
    result[2] = content;
16
    return result;
17
}
18

    
19
static QByteArray message16(uint16_t messageId, uint16_t content = 0) {
20
    QByteArray result(2 + 2, 0);
21
    result[0] = ((uint8_t*) &messageId)[0];
22
    result[1] = ((uint8_t*) &messageId)[1];
23
    result[2] = ((uint8_t*) &content)[0];
24
    result[3] = ((uint8_t*) &content)[1];
25
    return result;
26
}
27

    
28
static QByteArray message32(uint16_t messageId, uint32_t content = 0) {
29
    QByteArray result(2 + 4, 0);
30
    result[0] = ((uint8_t*) &messageId)[0];
31
    result[1] = ((uint8_t*) &messageId)[1];
32
    result[2] = ((uint8_t*) &content)[0];
33
    result[3] = ((uint8_t*) &content)[1];
34
    result[4] = ((uint8_t*) &content)[2];
35
    result[5] = ((uint8_t*) &content)[3];
36
    return result;
37
}
38

    
39
static QByteArray message64(uint16_t messageId, uint64_t content = 0) {
40
    QByteArray result(2 + 8, 0);
41
    result[0] = ((uint8_t*) &messageId)[0];
42
    result[1] = ((uint8_t*) &messageId)[1];
43
    result[2] = ((uint8_t*) &content)[0];
44
    result[3] = ((uint8_t*) &content)[1];
45
    result[4] = ((uint8_t*) &content)[2];
46
    result[5] = ((uint8_t*) &content)[3];
47
    result[6] = ((uint8_t*) &content)[4];
48
    result[7] = ((uint8_t*) &content)[5];
49
    result[8] = ((uint8_t*) &content)[6];
50
    result[9] = ((uint8_t*) &content)[7];
51
    return result;
52
}
53

    
54
static QByteArray message128(uint16_t messageId,
55
                             uint32_t contentX = 0,
56
                             uint32_t contentY = 0,
57
                             uint32_t contentZ = 0,
58
                             uint32_t contentW = 0)
59
{
60
    QByteArray result(2 + 16, 0);
61
    result[0] = ((uint8_t*) &messageId)[0];
62
    result[1] = ((uint8_t*) &messageId)[1];
63

    
64
    result[2] = ((uint8_t*) &contentX)[0];
65
    result[3] = ((uint8_t*) &contentX)[1];
66
    result[4] = ((uint8_t*) &contentX)[2];
67
    result[5] = ((uint8_t*) &contentX)[3];
68

    
69
    result[6] = ((uint8_t*) &contentY)[0];
70
    result[7] = ((uint8_t*) &contentY)[1];
71
    result[8] = ((uint8_t*) &contentY)[2];
72
    result[9] = ((uint8_t*) &contentY)[3];
73

    
74
    result[10] = ((uint8_t*) &contentZ)[0];
75
    result[11] = ((uint8_t*) &contentZ)[1];
76
    result[12] = ((uint8_t*) &contentZ)[2];
77
    result[13] = ((uint8_t*) &contentZ)[3];
78

    
79
    result[14] = ((uint8_t*) &contentW)[0];
80
    result[15] = ((uint8_t*) &contentW)[1];
81
    result[16] = ((uint8_t*) &contentW)[2];
82
    result[17] = ((uint8_t*) &contentW)[3];
83
    return result;
84
}
85

    
86
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
87
    QByteArray result(2 + 4 + length, 0);
88
    result[0] = ((uint8_t*) &messageId)[0];
89
    result[1] = ((uint8_t*) &messageId)[1];
90
    result[2] = ((uint8_t*) &length)[0];
91
    result[3] = ((uint8_t*) &length)[1];
92
    result[4] = ((uint8_t*) &length)[2];
93
    result[5] = ((uint8_t*) &length)[3];
94

    
95
    if (content) {
96
        for (uint32_t i = 0; i < length; i++) {
97
            result[6 + i] = content[i];
98
        }
99
    }
100
    return result;
101
}
102

    
103
inline long currentTimeMillis() {
104
    return std::chrono::duration_cast<std::chrono::milliseconds>(
105
                std::chrono::steady_clock::now().time_since_epoch())
106
            .count();
107
}
108

    
109

    
110
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
111

    
112
CurveDataServer::Client::Client(QTcpSocket* socket) :
113
    socket(socket),
114
    lastPong(currentTimeMillis()),
115
    bufUsed(0)
116
{}
117

    
118

    
119
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
120

    
121
CurveDataServer::CurveDataServer() :
122
    _server(this),
123
    _sockets()
124
{
125
    _server.listen(QHostAddress::Any, 4242);
126
    connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
127
}
128

    
129
CurveDataServer::~CurveDataServer()
130
{
131
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
132
    for (auto& client : _sockets)
133
    {
134
        sendEot(client.socket, "Server is stopping");
135
        client.socket->close();
136
    }
137
    _sockets.clear();
138
}
139

    
140
void CurveDataServer::onNewConnection()
141
{
142
    QTcpSocket *clientSocket = _server.nextPendingConnection();
143
    connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
144
    connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
145

    
146
    sendPreamble(clientSocket);
147
    _sockets.push_back(Client(clientSocket));
148
}
149

    
150
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
151
{
152
    if (socketState == QAbstractSocket::UnconnectedState)
153
    {
154
        QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
155

    
156
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
157
        for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
158
            if (it->socket == sender) {
159
                _sockets.erase(it);
160
                break;
161
            }
162
        }
163
    }
164
}
165

    
166
void CurveDataServer::onReadyRead()
167
{
168
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
169

    
170
    QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
171
    auto it = clientOf(sender);
172
    if (it == _sockets.end()) {
173
        return;
174
    }
175
    Client& client = *it;
176

    
177
    while (sender->bytesAvailable()) {
178
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
179
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
180
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
181
        }
182

    
183
        if (client.bufUsed < 2) {
184
            // no ID yet, wait for more data
185
            return;
186
        }
187

    
188
        uint16_t messageId = *((uint16_t*) client.buf);
189
        qint64 contentSize;
190
        switch (messageId & 0xF000) {
191
        case 0x0000:
192
            contentSize = 1;
193
            break;
194
        case 0x1000:
195
            contentSize = 2;
196
            break;
197
        case 0x2000:
198
            contentSize = 4;
199
            break;
200
        case 0x3000:
201
            contentSize = 8;
202
            break;
203
        case 0x4000:
204
            contentSize = 16;
205
            break;
206
        case 0xF000:
207
            // TODO - implement support for variable length messages
208
            return;
209
        default:
210
            // unsupported message size
211
            return;
212
        }
213

    
214
        qint64 toRead = qMin(qint64(contentSize - (client.bufUsed - Client::MESSAGE_ID_SIZE)), sender->bytesAvailable());
215
        if (toRead > 0) {
216
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
217
        }
218

    
219
        if (client.bufUsed == (Client::MESSAGE_ID_SIZE + contentSize)) {
220
            handleMessage(client, messageId, &client.buf[Client::MESSAGE_ID_SIZE]);
221
            client.bufUsed = 0;
222
        }
223
    }
224
}
225

    
226
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content) {
227
    switch (messageId) {
228
    case 0x3001:
229
        qDebug() << "Magic arrived!";
230
        break;
231
    case 0x2002:
232
        qDebug() << "Version arrived!";
233
        break;
234
    case 0x3005:
235
        qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
236
        break;
237
    }
238
}
239

    
240
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
241
{
242
    QByteArray message = message128(0x4003,
243
                                    *((uint32_t*) &x),
244
                                    *((uint32_t*) &y),
245
                                    *((uint32_t*) &z));
246

    
247
    {
248
        std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
249
        for (auto& client : _sockets) {
250
                client.socket->write(message);
251
        }
252
    }
253
}
254

    
255
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
256
    std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
257
    for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
258
        if (it->socket == socket) {
259
            return it;
260
        }
261
    }
262
    return _sockets.end();
263
}
264

    
265
void CurveDataServer::sendPreamble(QTcpSocket *socket)
266
{
267
    sendProtocolMagic(socket);
268
    sendVersion(socket);
269
}
270

    
271
void CurveDataServer::sendProtocolMagic(QTcpSocket *socket)
272
{
273
    char *messageContent = "DeltaRVr";
274
    QByteArray message = message64(0x3001, *((uint64_t*) messageContent));
275

    
276
    socket->write(message);
277

    
278
    qDebug() << "Sent Protocol Magic to" << socket->peerAddress();
279
    //qDebug() << message.toHex();
280
}
281

    
282
void CurveDataServer::sendVersion(QTcpSocket *socket)
283
{
284
    QByteArray message = message32(0x2002, 1);
285

    
286
    socket->write(message);
287

    
288
    qDebug() << "Sent Protocol Version to" << socket->peerAddress();
289
    //qDebug() << message.toHex();
290
}
291

    
292
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
293
{
294
    QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
295
    socket->write(message);
296
}
(2-2/19)