3 |
3 |
#include <QDebug>
|
4 |
4 |
#include <QHostAddress>
|
5 |
5 |
#include <QAbstractSocket>
|
|
6 |
#include <chrono>
|
|
7 |
|
|
8 |
|
|
9 |
// Utility functions ///////////////////////////////////////////////////////////////////////////////////////////////////
|
6 |
10 |
|
7 |
11 |
static QByteArray message8(uint16_t messageId, uint8_t content = 0) {
|
8 |
12 |
QByteArray result(2 + 1, 0);
|
... | ... | |
79 |
83 |
return result;
|
80 |
84 |
}
|
81 |
85 |
|
82 |
|
static QByteArray messageVarLength(uint16_t messageId, uint32_t length) {
|
|
86 |
static QByteArray messageVarLength(uint16_t messageId, uint32_t length, const char* content = nullptr) {
|
83 |
87 |
QByteArray result(2 + 4 + length, 0);
|
84 |
88 |
result[0] = ((uint8_t*) &messageId)[0];
|
85 |
89 |
result[1] = ((uint8_t*) &messageId)[1];
|
... | ... | |
87 |
91 |
result[3] = ((uint8_t*) &length)[1];
|
88 |
92 |
result[4] = ((uint8_t*) &length)[2];
|
89 |
93 |
result[5] = ((uint8_t*) &length)[3];
|
|
94 |
|
|
95 |
if (content) {
|
|
96 |
for (uint32_t i = 0; i < length; i++) {
|
|
97 |
result[6 + i] = content[i];
|
|
98 |
}
|
|
99 |
}
|
90 |
100 |
return result;
|
91 |
101 |
}
|
92 |
102 |
|
|
103 |
inline long currentTimeMillis() {
|
|
104 |
return std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
105 |
std::chrono::steady_clock::now().time_since_epoch())
|
|
106 |
.count();
|
|
107 |
}
|
|
108 |
|
|
109 |
|
|
110 |
// Client //////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
111 |
|
|
112 |
CurveDataServer::Client::Client(QTcpSocket* socket) :
|
|
113 |
socket(socket),
|
|
114 |
lastPong(currentTimeMillis()),
|
|
115 |
bufUsed(0)
|
|
116 |
{}
|
|
117 |
|
|
118 |
|
|
119 |
// Server //////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
120 |
|
93 |
121 |
CurveDataServer::CurveDataServer() :
|
94 |
122 |
_server(this),
|
95 |
123 |
_sockets()
|
... | ... | |
98 |
126 |
connect(&_server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
|
99 |
127 |
}
|
100 |
128 |
|
|
129 |
CurveDataServer::~CurveDataServer()
|
|
130 |
{
|
|
131 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
|
|
132 |
for (auto& client : _sockets)
|
|
133 |
{
|
|
134 |
sendEot(client.socket, "Server is stopping");
|
|
135 |
client.socket->close();
|
|
136 |
}
|
|
137 |
_sockets.clear();
|
|
138 |
}
|
|
139 |
|
101 |
140 |
void CurveDataServer::onNewConnection()
|
102 |
141 |
{
|
103 |
142 |
QTcpSocket *clientSocket = _server.nextPendingConnection();
|
104 |
143 |
connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
|
105 |
144 |
connect(clientSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
|
106 |
145 |
|
107 |
|
_sockets.push_back(clientSocket);
|
108 |
|
|
109 |
146 |
sendPreamble(clientSocket);
|
|
147 |
_sockets.push_back(Client(clientSocket));
|
110 |
148 |
}
|
111 |
149 |
|
112 |
150 |
void CurveDataServer::onSocketStateChanged(QAbstractSocket::SocketState socketState)
|
... | ... | |
114 |
152 |
if (socketState == QAbstractSocket::UnconnectedState)
|
115 |
153 |
{
|
116 |
154 |
QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
|
117 |
|
_sockets.removeOne(sender);
|
|
155 |
|
|
156 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
|
|
157 |
for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
|
|
158 |
if (it->socket == sender) {
|
|
159 |
_sockets.erase(it);
|
|
160 |
break;
|
|
161 |
}
|
|
162 |
}
|
118 |
163 |
}
|
119 |
164 |
}
|
120 |
165 |
|
121 |
166 |
void CurveDataServer::onReadyRead()
|
122 |
167 |
{
|
123 |
|
// tady actually nakonec switch zpracovani ruznych zprav dle identifikatoru
|
|
168 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
|
|
169 |
|
124 |
170 |
QTcpSocket* sender = static_cast<QTcpSocket*>(QObject::sender());
|
125 |
|
QByteArray data = sender->readAll();
|
126 |
|
for (QTcpSocket* socket : _sockets) {
|
127 |
|
if (socket != sender)
|
128 |
|
socket->write(QByteArray::fromStdString(sender->peerAddress().toString().toStdString() + ": " + data.toStdString()));
|
|
171 |
auto it = clientOf(sender);
|
|
172 |
if (it == _sockets.end()) {
|
|
173 |
return;
|
|
174 |
}
|
|
175 |
Client& client = *it;
|
|
176 |
|
|
177 |
while (sender->bytesAvailable()) {
|
|
178 |
if (client.bufUsed < Client::MESSAGE_ID_SIZE) {
|
|
179 |
qint64 toRead = qMin(qint64(Client::MESSAGE_ID_SIZE - client.bufUsed), sender->bytesAvailable());
|
|
180 |
client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
|
|
181 |
}
|
|
182 |
|
|
183 |
if (client.bufUsed < 2) {
|
|
184 |
// no ID yet, wait for more data
|
|
185 |
return;
|
|
186 |
}
|
|
187 |
|
|
188 |
uint16_t messageId = *((uint16_t*) client.buf);
|
|
189 |
qint64 contentSize;
|
|
190 |
switch (messageId & 0xF000) {
|
|
191 |
case 0x0000:
|
|
192 |
contentSize = 1;
|
|
193 |
break;
|
|
194 |
case 0x1000:
|
|
195 |
contentSize = 2;
|
|
196 |
break;
|
|
197 |
case 0x2000:
|
|
198 |
contentSize = 4;
|
|
199 |
break;
|
|
200 |
case 0x3000:
|
|
201 |
contentSize = 8;
|
|
202 |
break;
|
|
203 |
case 0x4000:
|
|
204 |
contentSize = 16;
|
|
205 |
break;
|
|
206 |
case 0xF000:
|
|
207 |
// TODO - implement support for variable length messages
|
|
208 |
return;
|
|
209 |
default:
|
|
210 |
// unsupported message size
|
|
211 |
return;
|
|
212 |
}
|
|
213 |
|
|
214 |
qint64 toRead = qMin(qint64(contentSize - (client.bufUsed - Client::MESSAGE_ID_SIZE)), sender->bytesAvailable());
|
|
215 |
if (toRead > 0) {
|
|
216 |
client.bufUsed += sender->read(&client.buf[client.bufUsed], toRead);
|
|
217 |
}
|
|
218 |
|
|
219 |
if (client.bufUsed == (Client::MESSAGE_ID_SIZE + contentSize)) {
|
|
220 |
handleMessage(client, messageId, &client.buf[Client::MESSAGE_ID_SIZE]);
|
|
221 |
client.bufUsed = 0;
|
|
222 |
}
|
129 |
223 |
}
|
130 |
224 |
}
|
131 |
225 |
|
132 |
|
QByteArray *CurveDataServer::putFloatIntoMessage(QByteArray *message, int index, float f)
|
133 |
|
{
|
134 |
|
if (!message || message->size() < index + 4)
|
135 |
|
return message;
|
136 |
|
|
137 |
|
unsigned char *c = reinterpret_cast<unsigned char *>(&f);
|
138 |
|
|
139 |
|
for (int i = 0; i < 4; i++)
|
140 |
|
{
|
141 |
|
(*message)[index + i] = c[i];
|
|
226 |
void CurveDataServer::handleMessage(Client& client, uint16_t messageId, void *content) {
|
|
227 |
switch (messageId) {
|
|
228 |
case 0x3001:
|
|
229 |
qDebug() << "Magic arrived!";
|
|
230 |
break;
|
|
231 |
case 0x2002:
|
|
232 |
qDebug() << "Version arrived!";
|
|
233 |
break;
|
|
234 |
case 0x3005:
|
|
235 |
qDebug() << "Pong " << *((uint64_t*) content) << " arrived!";
|
|
236 |
break;
|
142 |
237 |
}
|
143 |
|
|
144 |
|
return message;
|
145 |
238 |
}
|
146 |
239 |
|
147 |
|
|
148 |
240 |
void CurveDataServer::sendActuatorPosition(float x, float y, float z)
|
149 |
241 |
{
|
150 |
242 |
QByteArray message = message128(0x4003,
|
... | ... | |
152 |
244 |
*((uint32_t*) &y),
|
153 |
245 |
*((uint32_t*) &z));
|
154 |
246 |
|
155 |
|
for (QTcpSocket* socket : _sockets) {
|
156 |
|
socket->write(message);
|
|
247 |
{
|
|
248 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
|
|
249 |
for (auto& client : _sockets) {
|
|
250 |
client.socket->write(message);
|
|
251 |
}
|
157 |
252 |
}
|
158 |
|
//qDebug() << message.toHex();
|
159 |
|
//qDebug() << x << " " << y << " " << z;
|
160 |
253 |
}
|
161 |
254 |
|
|
255 |
QList<CurveDataServer::Client>::iterator CurveDataServer::clientOf(QTcpSocket *socket) {
|
|
256 |
std::lock_guard<decltype (_socketsLock)> l(_socketsLock);
|
|
257 |
for (auto it = _sockets.begin(); it != _sockets.end(); it++) {
|
|
258 |
if (it->socket == socket) {
|
|
259 |
return it;
|
|
260 |
}
|
|
261 |
}
|
|
262 |
return _sockets.end();
|
|
263 |
}
|
162 |
264 |
|
163 |
265 |
void CurveDataServer::sendPreamble(QTcpSocket *socket)
|
164 |
266 |
{
|
... | ... | |
186 |
288 |
qDebug() << "Sent Protocol Version to" << socket->peerAddress();
|
187 |
289 |
//qDebug() << message.toHex();
|
188 |
290 |
}
|
|
291 |
|
|
292 |
void CurveDataServer::sendEot(QTcpSocket *socket, std::string&& reason)
|
|
293 |
{
|
|
294 |
QByteArray message = messageVarLength(0xF006, ((uint32_t) reason.length()), reason.c_str());
|
|
295 |
socket->write(message);
|
|
296 |
}
|
Re #8735 - Implement base server message retrieval