1
|
'use strict';
|
2
|
|
3
|
const { Writable } = require('stream');
|
4
|
|
5
|
const PerMessageDeflate = require('./permessage-deflate');
|
6
|
const {
|
7
|
BINARY_TYPES,
|
8
|
EMPTY_BUFFER,
|
9
|
kStatusCode,
|
10
|
kWebSocket
|
11
|
} = require('./constants');
|
12
|
const { concat, toArrayBuffer, unmask } = require('./buffer-util');
|
13
|
const { isValidStatusCode, isValidUTF8 } = require('./validation');
|
14
|
|
15
|
const GET_INFO = 0;
|
16
|
const GET_PAYLOAD_LENGTH_16 = 1;
|
17
|
const GET_PAYLOAD_LENGTH_64 = 2;
|
18
|
const GET_MASK = 3;
|
19
|
const GET_DATA = 4;
|
20
|
const INFLATING = 5;
|
21
|
|
22
|
/**
|
23
|
* HyBi Receiver implementation.
|
24
|
*
|
25
|
* @extends stream.Writable
|
26
|
*/
|
27
|
class Receiver extends Writable {
|
28
|
/**
|
29
|
* Creates a Receiver instance.
|
30
|
*
|
31
|
* @param {String} binaryType The type for binary data
|
32
|
* @param {Object} extensions An object containing the negotiated extensions
|
33
|
* @param {Number} maxPayload The maximum allowed message length
|
34
|
*/
|
35
|
constructor(binaryType, extensions, maxPayload) {
|
36
|
super();
|
37
|
|
38
|
this._binaryType = binaryType || BINARY_TYPES[0];
|
39
|
this[kWebSocket] = undefined;
|
40
|
this._extensions = extensions || {};
|
41
|
this._maxPayload = maxPayload | 0;
|
42
|
|
43
|
this._bufferedBytes = 0;
|
44
|
this._buffers = [];
|
45
|
|
46
|
this._compressed = false;
|
47
|
this._payloadLength = 0;
|
48
|
this._mask = undefined;
|
49
|
this._fragmented = 0;
|
50
|
this._masked = false;
|
51
|
this._fin = false;
|
52
|
this._opcode = 0;
|
53
|
|
54
|
this._totalPayloadLength = 0;
|
55
|
this._messageLength = 0;
|
56
|
this._fragments = [];
|
57
|
|
58
|
this._state = GET_INFO;
|
59
|
this._loop = false;
|
60
|
}
|
61
|
|
62
|
/**
|
63
|
* Implements `Writable.prototype._write()`.
|
64
|
*
|
65
|
* @param {Buffer} chunk The chunk of data to write
|
66
|
* @param {String} encoding The character encoding of `chunk`
|
67
|
* @param {Function} cb Callback
|
68
|
*/
|
69
|
_write(chunk, encoding, cb) {
|
70
|
if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
|
71
|
|
72
|
this._bufferedBytes += chunk.length;
|
73
|
this._buffers.push(chunk);
|
74
|
this.startLoop(cb);
|
75
|
}
|
76
|
|
77
|
/**
|
78
|
* Consumes `n` bytes from the buffered data.
|
79
|
*
|
80
|
* @param {Number} n The number of bytes to consume
|
81
|
* @return {Buffer} The consumed bytes
|
82
|
* @private
|
83
|
*/
|
84
|
consume(n) {
|
85
|
this._bufferedBytes -= n;
|
86
|
|
87
|
if (n === this._buffers[0].length) return this._buffers.shift();
|
88
|
|
89
|
if (n < this._buffers[0].length) {
|
90
|
const buf = this._buffers[0];
|
91
|
this._buffers[0] = buf.slice(n);
|
92
|
return buf.slice(0, n);
|
93
|
}
|
94
|
|
95
|
const dst = Buffer.allocUnsafe(n);
|
96
|
|
97
|
do {
|
98
|
const buf = this._buffers[0];
|
99
|
|
100
|
if (n >= buf.length) {
|
101
|
this._buffers.shift().copy(dst, dst.length - n);
|
102
|
} else {
|
103
|
buf.copy(dst, dst.length - n, 0, n);
|
104
|
this._buffers[0] = buf.slice(n);
|
105
|
}
|
106
|
|
107
|
n -= buf.length;
|
108
|
} while (n > 0);
|
109
|
|
110
|
return dst;
|
111
|
}
|
112
|
|
113
|
/**
|
114
|
* Starts the parsing loop.
|
115
|
*
|
116
|
* @param {Function} cb Callback
|
117
|
* @private
|
118
|
*/
|
119
|
startLoop(cb) {
|
120
|
var err;
|
121
|
this._loop = true;
|
122
|
|
123
|
do {
|
124
|
switch (this._state) {
|
125
|
case GET_INFO:
|
126
|
err = this.getInfo();
|
127
|
break;
|
128
|
case GET_PAYLOAD_LENGTH_16:
|
129
|
err = this.getPayloadLength16();
|
130
|
break;
|
131
|
case GET_PAYLOAD_LENGTH_64:
|
132
|
err = this.getPayloadLength64();
|
133
|
break;
|
134
|
case GET_MASK:
|
135
|
this.getMask();
|
136
|
break;
|
137
|
case GET_DATA:
|
138
|
err = this.getData(cb);
|
139
|
break;
|
140
|
default:
|
141
|
// `INFLATING`
|
142
|
this._loop = false;
|
143
|
return;
|
144
|
}
|
145
|
} while (this._loop);
|
146
|
|
147
|
cb(err);
|
148
|
}
|
149
|
|
150
|
/**
|
151
|
* Reads the first two bytes of a frame.
|
152
|
*
|
153
|
* @return {(RangeError|undefined)} A possible error
|
154
|
* @private
|
155
|
*/
|
156
|
getInfo() {
|
157
|
if (this._bufferedBytes < 2) {
|
158
|
this._loop = false;
|
159
|
return;
|
160
|
}
|
161
|
|
162
|
const buf = this.consume(2);
|
163
|
|
164
|
if ((buf[0] & 0x30) !== 0x00) {
|
165
|
this._loop = false;
|
166
|
return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
|
167
|
}
|
168
|
|
169
|
const compressed = (buf[0] & 0x40) === 0x40;
|
170
|
|
171
|
if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
|
172
|
this._loop = false;
|
173
|
return error(RangeError, 'RSV1 must be clear', true, 1002);
|
174
|
}
|
175
|
|
176
|
this._fin = (buf[0] & 0x80) === 0x80;
|
177
|
this._opcode = buf[0] & 0x0f;
|
178
|
this._payloadLength = buf[1] & 0x7f;
|
179
|
|
180
|
if (this._opcode === 0x00) {
|
181
|
if (compressed) {
|
182
|
this._loop = false;
|
183
|
return error(RangeError, 'RSV1 must be clear', true, 1002);
|
184
|
}
|
185
|
|
186
|
if (!this._fragmented) {
|
187
|
this._loop = false;
|
188
|
return error(RangeError, 'invalid opcode 0', true, 1002);
|
189
|
}
|
190
|
|
191
|
this._opcode = this._fragmented;
|
192
|
} else if (this._opcode === 0x01 || this._opcode === 0x02) {
|
193
|
if (this._fragmented) {
|
194
|
this._loop = false;
|
195
|
return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
|
196
|
}
|
197
|
|
198
|
this._compressed = compressed;
|
199
|
} else if (this._opcode > 0x07 && this._opcode < 0x0b) {
|
200
|
if (!this._fin) {
|
201
|
this._loop = false;
|
202
|
return error(RangeError, 'FIN must be set', true, 1002);
|
203
|
}
|
204
|
|
205
|
if (compressed) {
|
206
|
this._loop = false;
|
207
|
return error(RangeError, 'RSV1 must be clear', true, 1002);
|
208
|
}
|
209
|
|
210
|
if (this._payloadLength > 0x7d) {
|
211
|
this._loop = false;
|
212
|
return error(
|
213
|
RangeError,
|
214
|
`invalid payload length ${this._payloadLength}`,
|
215
|
true,
|
216
|
1002
|
217
|
);
|
218
|
}
|
219
|
} else {
|
220
|
this._loop = false;
|
221
|
return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
|
222
|
}
|
223
|
|
224
|
if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
|
225
|
this._masked = (buf[1] & 0x80) === 0x80;
|
226
|
|
227
|
if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
|
228
|
else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
|
229
|
else return this.haveLength();
|
230
|
}
|
231
|
|
232
|
/**
|
233
|
* Gets extended payload length (7+16).
|
234
|
*
|
235
|
* @return {(RangeError|undefined)} A possible error
|
236
|
* @private
|
237
|
*/
|
238
|
getPayloadLength16() {
|
239
|
if (this._bufferedBytes < 2) {
|
240
|
this._loop = false;
|
241
|
return;
|
242
|
}
|
243
|
|
244
|
this._payloadLength = this.consume(2).readUInt16BE(0);
|
245
|
return this.haveLength();
|
246
|
}
|
247
|
|
248
|
/**
|
249
|
* Gets extended payload length (7+64).
|
250
|
*
|
251
|
* @return {(RangeError|undefined)} A possible error
|
252
|
* @private
|
253
|
*/
|
254
|
getPayloadLength64() {
|
255
|
if (this._bufferedBytes < 8) {
|
256
|
this._loop = false;
|
257
|
return;
|
258
|
}
|
259
|
|
260
|
const buf = this.consume(8);
|
261
|
const num = buf.readUInt32BE(0);
|
262
|
|
263
|
//
|
264
|
// The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
|
265
|
// if payload length is greater than this number.
|
266
|
//
|
267
|
if (num > Math.pow(2, 53 - 32) - 1) {
|
268
|
this._loop = false;
|
269
|
return error(
|
270
|
RangeError,
|
271
|
'Unsupported WebSocket frame: payload length > 2^53 - 1',
|
272
|
false,
|
273
|
1009
|
274
|
);
|
275
|
}
|
276
|
|
277
|
this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
|
278
|
return this.haveLength();
|
279
|
}
|
280
|
|
281
|
/**
|
282
|
* Payload length has been read.
|
283
|
*
|
284
|
* @return {(RangeError|undefined)} A possible error
|
285
|
* @private
|
286
|
*/
|
287
|
haveLength() {
|
288
|
if (this._payloadLength && this._opcode < 0x08) {
|
289
|
this._totalPayloadLength += this._payloadLength;
|
290
|
if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
|
291
|
this._loop = false;
|
292
|
return error(RangeError, 'Max payload size exceeded', false, 1009);
|
293
|
}
|
294
|
}
|
295
|
|
296
|
if (this._masked) this._state = GET_MASK;
|
297
|
else this._state = GET_DATA;
|
298
|
}
|
299
|
|
300
|
/**
|
301
|
* Reads mask bytes.
|
302
|
*
|
303
|
* @private
|
304
|
*/
|
305
|
getMask() {
|
306
|
if (this._bufferedBytes < 4) {
|
307
|
this._loop = false;
|
308
|
return;
|
309
|
}
|
310
|
|
311
|
this._mask = this.consume(4);
|
312
|
this._state = GET_DATA;
|
313
|
}
|
314
|
|
315
|
/**
|
316
|
* Reads data bytes.
|
317
|
*
|
318
|
* @param {Function} cb Callback
|
319
|
* @return {(Error|RangeError|undefined)} A possible error
|
320
|
* @private
|
321
|
*/
|
322
|
getData(cb) {
|
323
|
var data = EMPTY_BUFFER;
|
324
|
|
325
|
if (this._payloadLength) {
|
326
|
if (this._bufferedBytes < this._payloadLength) {
|
327
|
this._loop = false;
|
328
|
return;
|
329
|
}
|
330
|
|
331
|
data = this.consume(this._payloadLength);
|
332
|
if (this._masked) unmask(data, this._mask);
|
333
|
}
|
334
|
|
335
|
if (this._opcode > 0x07) return this.controlMessage(data);
|
336
|
|
337
|
if (this._compressed) {
|
338
|
this._state = INFLATING;
|
339
|
this.decompress(data, cb);
|
340
|
return;
|
341
|
}
|
342
|
|
343
|
if (data.length) {
|
344
|
//
|
345
|
// This message is not compressed so its lenght is the sum of the payload
|
346
|
// length of all fragments.
|
347
|
//
|
348
|
this._messageLength = this._totalPayloadLength;
|
349
|
this._fragments.push(data);
|
350
|
}
|
351
|
|
352
|
return this.dataMessage();
|
353
|
}
|
354
|
|
355
|
/**
|
356
|
* Decompresses data.
|
357
|
*
|
358
|
* @param {Buffer} data Compressed data
|
359
|
* @param {Function} cb Callback
|
360
|
* @private
|
361
|
*/
|
362
|
decompress(data, cb) {
|
363
|
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
|
364
|
|
365
|
perMessageDeflate.decompress(data, this._fin, (err, buf) => {
|
366
|
if (err) return cb(err);
|
367
|
|
368
|
if (buf.length) {
|
369
|
this._messageLength += buf.length;
|
370
|
if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
|
371
|
return cb(
|
372
|
error(RangeError, 'Max payload size exceeded', false, 1009)
|
373
|
);
|
374
|
}
|
375
|
|
376
|
this._fragments.push(buf);
|
377
|
}
|
378
|
|
379
|
const er = this.dataMessage();
|
380
|
if (er) return cb(er);
|
381
|
|
382
|
this.startLoop(cb);
|
383
|
});
|
384
|
}
|
385
|
|
386
|
/**
|
387
|
* Handles a data message.
|
388
|
*
|
389
|
* @return {(Error|undefined)} A possible error
|
390
|
* @private
|
391
|
*/
|
392
|
dataMessage() {
|
393
|
if (this._fin) {
|
394
|
const messageLength = this._messageLength;
|
395
|
const fragments = this._fragments;
|
396
|
|
397
|
this._totalPayloadLength = 0;
|
398
|
this._messageLength = 0;
|
399
|
this._fragmented = 0;
|
400
|
this._fragments = [];
|
401
|
|
402
|
if (this._opcode === 2) {
|
403
|
var data;
|
404
|
|
405
|
if (this._binaryType === 'nodebuffer') {
|
406
|
data = concat(fragments, messageLength);
|
407
|
} else if (this._binaryType === 'arraybuffer') {
|
408
|
data = toArrayBuffer(concat(fragments, messageLength));
|
409
|
} else {
|
410
|
data = fragments;
|
411
|
}
|
412
|
|
413
|
this.emit('message', data);
|
414
|
} else {
|
415
|
const buf = concat(fragments, messageLength);
|
416
|
|
417
|
if (!isValidUTF8(buf)) {
|
418
|
this._loop = false;
|
419
|
return error(Error, 'invalid UTF-8 sequence', true, 1007);
|
420
|
}
|
421
|
|
422
|
this.emit('message', buf.toString());
|
423
|
}
|
424
|
}
|
425
|
|
426
|
this._state = GET_INFO;
|
427
|
}
|
428
|
|
429
|
/**
|
430
|
* Handles a control message.
|
431
|
*
|
432
|
* @param {Buffer} data Data to handle
|
433
|
* @return {(Error|RangeError|undefined)} A possible error
|
434
|
* @private
|
435
|
*/
|
436
|
controlMessage(data) {
|
437
|
if (this._opcode === 0x08) {
|
438
|
this._loop = false;
|
439
|
|
440
|
if (data.length === 0) {
|
441
|
this.emit('conclude', 1005, '');
|
442
|
this.end();
|
443
|
} else if (data.length === 1) {
|
444
|
return error(RangeError, 'invalid payload length 1', true, 1002);
|
445
|
} else {
|
446
|
const code = data.readUInt16BE(0);
|
447
|
|
448
|
if (!isValidStatusCode(code)) {
|
449
|
return error(RangeError, `invalid status code ${code}`, true, 1002);
|
450
|
}
|
451
|
|
452
|
const buf = data.slice(2);
|
453
|
|
454
|
if (!isValidUTF8(buf)) {
|
455
|
return error(Error, 'invalid UTF-8 sequence', true, 1007);
|
456
|
}
|
457
|
|
458
|
this.emit('conclude', code, buf.toString());
|
459
|
this.end();
|
460
|
}
|
461
|
} else if (this._opcode === 0x09) {
|
462
|
this.emit('ping', data);
|
463
|
} else {
|
464
|
this.emit('pong', data);
|
465
|
}
|
466
|
|
467
|
this._state = GET_INFO;
|
468
|
}
|
469
|
}
|
470
|
|
471
|
module.exports = Receiver;
|
472
|
|
473
|
/**
|
474
|
* Builds an error object.
|
475
|
*
|
476
|
* @param {(Error|RangeError)} ErrorCtor The error constructor
|
477
|
* @param {String} message The error message
|
478
|
* @param {Boolean} prefix Specifies whether or not to add a default prefix to
|
479
|
* `message`
|
480
|
* @param {Number} statusCode The status code
|
481
|
* @return {(Error|RangeError)} The error
|
482
|
* @private
|
483
|
*/
|
484
|
function error(ErrorCtor, message, prefix, statusCode) {
|
485
|
const err = new ErrorCtor(
|
486
|
prefix ? `Invalid WebSocket frame: ${message}` : message
|
487
|
);
|
488
|
|
489
|
Error.captureStackTrace(err, error);
|
490
|
err[kStatusCode] = statusCode;
|
491
|
return err;
|
492
|
}
|