Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 285c6fe5

Přidáno uživatelem Oto Šťáva před téměř 4 roky(ů)

Re #8899 - Server - Implement variable-length message reception

Zobrazit rozdíly:

deltarobot-vr/Assets/DeltaRobotVr/Client.cs
265 265
                    {
266 266
                        ReadMessages(reader, writer);
267 267
                    }
268

  
269
                    if (client.Connected)
270
                    {
271
                        SendEot(writer, "Client stopped by user");
272
                    }
268 273
                }
269 274
                catch (Exception e)
270 275
                {
......
276 281
                    {
277 282
                        Debug.LogError($"Exception in communication thread:\n{e}");
278 283
                    }
279
                }

280
                finally

281
                {

282
                    IsConnected = false;

284
                }
285
                finally
286
                {
287
                    IsConnected = false;
283 288
                }
284 289
                
285 290
                // wait before reconnection - short polls to prevent blocking if application is closed
......
473 478
            writer.Write(PongId);
474 479
            writer.Write(pingValue);
475 480
        }
481

  
482
        private void SendEot(BinaryWriter writer, string reason)
483
        {
484
            writer.Write(EotId);
485
            writer.Write((UInt32) reason.Length);
486
            writer.Write(Encoding.ASCII.GetBytes(reason));
487
        }
476 488
    }
477 489
}
deltarobot/curvedataserver.cpp
5 5
#include <QAbstractSocket>
6 6
#include <QVector4D>
7 7
#include <chrono>
8
#include <string>
9
#include <cstring>
8 10

  
9 11
#include "../element/trajectory.hpp"
10 12

  
......
173 175
    Client& client = *it;
174 176

  
175 177
    while (sender->bytesAvailable()) {
178
        // read the message ID
176 179
        if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
177 180
            qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
178
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
181
            if (toRead > 0) {
182
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
183
            }
179 184
        }
180 185

  
181 186
        if (client.bufUsed < 2) {
......
184 189
        }
185 190

  
186 191
        uint16_t messageId = *((uint16_t*) client.buf);
192
        bool variableLength = false;
187 193
        qint64 contentSize;
188 194
        switch (messageId & 0xF000) {
189 195
        case 0x0000:
......
202 208
            contentSize = 16;
203 209
            break;
204 210
        case 0xF000:
205
            // TODO - implement support for variable length messages
211
            variableLength = true;
206 212
            contentSize = 0;
207 213
            break;
208 214
        default:
......
210 216
            return;
211 217
        }
212 218

  
213
        qint64 toRead = qMin(qint64(contentSize - (client.bufUsed - Client::MESSAGE_ID_SIZE)), sender->bytesAvailable());
219
        size_t contentOffset = Client::MESSAGE_ID_SIZE;
220

  
221
        if (variableLength) {
222
            // read the message's content length
223
            qint64 toRead = qMin(
224
                        qint64(Client::MESSAGE_LENGTH_SIZE - (client.bufUsed - Client::MESSAGE_ID_SIZE)),
225
                        sender->bytesAvailable());
226
            if (toRead > 0) {
227
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
228
            }
229

  
230
            if (client.bufUsed < (Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE)) {
231
                // we do not yet know the content size, do not proceed any further
232
                return;
233
            }
234

  
235
            contentSize = qint64(*((uint32_t*) &client.buf[Client::MESSAGE_ID_SIZE]));
236
            contentOffset = Client::MESSAGE_ID_SIZE + Client::MESSAGE_LENGTH_SIZE;
237
        }
238

  
239
        // read message content
240
        qint64 toRead = qMin(
241
                    qint64(contentSize - (client.bufUsed - contentOffset)),
242
                    sender->bytesAvailable());
214 243
        if (toRead > 0) {
215
            client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
244
            qint64 bufRemaining = qint64(Client::BUF_SIZE - client.bufUsed);
245
            if (toRead > bufRemaining) {
246
                // message would overflow the buffer - we'll read whatever fits and skip the rest
247
                if (bufRemaining > 0) {
248
                    qint64 actuallyRead = sender->read(&client.buf[client.bufUsed], bufRemaining);
249
                    client.bufUsed += actuallyRead;
250
                    toRead -= actuallyRead;
251
                }
252

  
253
                if (toRead > 0) {
254
                    sender->skip(toRead);
255
                }
256
            } else {
257
                client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
258
            }
216 259
        }
217 260

  
218
        if (client.bufUsed == (Client::MESSAGE_ID_SIZE + contentSize)) {
219
            handleMessage(client, messageId, &client.buf[Client::MESSAGE_ID_SIZE]);
261
        if (client.bufUsed == (contentOffset + contentSize)) {
262
            handleMessage(client, messageId, &client.buf[contentOffset], contentSize);
220 263
            client.bufUsed = 0;
221 264
        }
222 265
    }
223 266
}
224 267

  
225
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content) {
268
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content, size_t contentSize) {
226 269
    switch (messageId) {
227 270
    case 0x3001:
228 271
        qDebug() << "Magic arrived!";
......
238 281
        break;
239 282
    case 0xF006:
240 283
        qDebug() << "EOT!";
241
        handleEOT(client, (char *) content);
284
        handleEOT(client, (char *) content, contentSize);
242 285
        break;
243 286
    }
244 287
}
......
276 319
    //qDebug() << "prisel pong" << number;
277 320
}
278 321

  
279
void CurveDataServer::handleEOT(Client& client, char* content)
322
void CurveDataServer::handleEOT(Client& client, char* content, size_t contentSize)
280 323
{
281
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << content;
324
    char cstrContent[contentSize + 1];
325
    std::memcpy(cstrContent, content, contentSize);
326
    cstrContent[contentSize] = '\0';
327

  
328
    qDebug() << "Client" << client.socket->peerName() << "disconnected:" << cstrContent;
282 329
    terminateConnection(client, "Recieved EOT from client");
283 330
}
284 331

  
deltarobot/curvedataserver.h
14 14

  
15 15
public:
16 16
    struct Client {
17
        /**
18
         * Size of the client buffer.
19
         *
20
         * @note Current implementation truncates any variable-length messages longer than what would fit in the buffer.
21
         */
17 22
        static const size_t BUF_SIZE = 4096;
18
        static const size_t MESSAGE_ID_SIZE = 2;
23

  
24
        static const size_t MESSAGE_ID_SIZE = sizeof(uint16_t);      ///< Size of the protocol message identifier.
25
        static const size_t MESSAGE_LENGTH_SIZE = sizeof(uint32_t);  ///< Size of the variable-length message's content length.
19 26

  
20 27
        QTcpSocket* socket;
21 28
        std::map<uint64_t, uint64_t> ping;  ///< key: ping ID;  value: ping timestamp (msec);
......
52 59
private:
53 60
    QList<Client>::iterator clientOf(QTcpSocket *socket);
54 61

  
55
    void handleMessage(Client& client, uint16_t messageId, void* content);
62
    void handleMessage(Client& client, uint16_t messageId, void* content, size_t contentSize);
56 63
    void handleProtocolMagic(Client& client, char* content);
57 64
    void handleVersion(Client& client, uint8_t content);
58 65
    void handlePong(Client& client, uint64_t* content);
59
    void handleEOT(Client& client, char* content);
66
    void handleEOT(Client& client, char* content, size_t contentSize);
60 67

  
61 68
    void validPreamble(Client& client);
62 69
    void removeSocket(QTcpSocket *socket);

Také k dispozici: Unified diff