Projekt

Obecné

Profil

Stáhnout (11.6 KB) Statistiky
| Větev: | Revize:
1 3a515b92 cagy
'use strict';
2
3
const { Writable } = require('stream');
4
5
const PerMessageDeflate = require('./permessage-deflate');
6
const {
7
  BINARY_TYPES,
8
  EMPTY_BUFFER,
9
  kStatusCode,
10
  kWebSocket
11
} = require('./constants');
12
const { concat, toArrayBuffer, unmask } = require('./buffer-util');
13
const { isValidStatusCode, isValidUTF8 } = require('./validation');
14
15
const GET_INFO = 0;
16
const GET_PAYLOAD_LENGTH_16 = 1;
17
const GET_PAYLOAD_LENGTH_64 = 2;
18
const GET_MASK = 3;
19
const GET_DATA = 4;
20
const INFLATING = 5;
21
22
/**
23
 * HyBi Receiver implementation.
24
 *
25
 * @extends stream.Writable
26
 */
27
class Receiver extends Writable {
28
  /**
29
   * Creates a Receiver instance.
30
   *
31
   * @param {String} binaryType The type for binary data
32
   * @param {Object} extensions An object containing the negotiated extensions
33
   * @param {Number} maxPayload The maximum allowed message length
34
   */
35
  constructor(binaryType, extensions, maxPayload) {
36
    super();
37
38
    this._binaryType = binaryType || BINARY_TYPES[0];
39
    this[kWebSocket] = undefined;
40
    this._extensions = extensions || {};
41
    this._maxPayload = maxPayload | 0;
42
43
    this._bufferedBytes = 0;
44
    this._buffers = [];
45
46
    this._compressed = false;
47
    this._payloadLength = 0;
48
    this._mask = undefined;
49
    this._fragmented = 0;
50
    this._masked = false;
51
    this._fin = false;
52
    this._opcode = 0;
53
54
    this._totalPayloadLength = 0;
55
    this._messageLength = 0;
56
    this._fragments = [];
57
58
    this._state = GET_INFO;
59
    this._loop = false;
60
  }
61
62
  /**
63
   * Implements `Writable.prototype._write()`.
64
   *
65
   * @param {Buffer} chunk The chunk of data to write
66
   * @param {String} encoding The character encoding of `chunk`
67
   * @param {Function} cb Callback
68
   */
69
  _write(chunk, encoding, cb) {
70
    if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
71
72
    this._bufferedBytes += chunk.length;
73
    this._buffers.push(chunk);
74
    this.startLoop(cb);
75
  }
76
77
  /**
78
   * Consumes `n` bytes from the buffered data.
79
   *
80
   * @param {Number} n The number of bytes to consume
81
   * @return {Buffer} The consumed bytes
82
   * @private
83
   */
84
  consume(n) {
85
    this._bufferedBytes -= n;
86
87
    if (n === this._buffers[0].length) return this._buffers.shift();
88
89
    if (n < this._buffers[0].length) {
90
      const buf = this._buffers[0];
91
      this._buffers[0] = buf.slice(n);
92
      return buf.slice(0, n);
93
    }
94
95
    const dst = Buffer.allocUnsafe(n);
96
97
    do {
98
      const buf = this._buffers[0];
99
100
      if (n >= buf.length) {
101
        this._buffers.shift().copy(dst, dst.length - n);
102
      } else {
103
        buf.copy(dst, dst.length - n, 0, n);
104
        this._buffers[0] = buf.slice(n);
105
      }
106
107
      n -= buf.length;
108
    } while (n > 0);
109
110
    return dst;
111
  }
112
113
  /**
114
   * Starts the parsing loop.
115
   *
116
   * @param {Function} cb Callback
117
   * @private
118
   */
119
  startLoop(cb) {
120
    var err;
121
    this._loop = true;
122
123
    do {
124
      switch (this._state) {
125
        case GET_INFO:
126
          err = this.getInfo();
127
          break;
128
        case GET_PAYLOAD_LENGTH_16:
129
          err = this.getPayloadLength16();
130
          break;
131
        case GET_PAYLOAD_LENGTH_64:
132
          err = this.getPayloadLength64();
133
          break;
134
        case GET_MASK:
135
          this.getMask();
136
          break;
137
        case GET_DATA:
138
          err = this.getData(cb);
139
          break;
140
        default:
141
          // `INFLATING`
142
          this._loop = false;
143
          return;
144
      }
145
    } while (this._loop);
146
147
    cb(err);
148
  }
149
150
  /**
151
   * Reads the first two bytes of a frame.
152
   *
153
   * @return {(RangeError|undefined)} A possible error
154
   * @private
155
   */
156
  getInfo() {
157
    if (this._bufferedBytes < 2) {
158
      this._loop = false;
159
      return;
160
    }
161
162
    const buf = this.consume(2);
163
164
    if ((buf[0] & 0x30) !== 0x00) {
165
      this._loop = false;
166
      return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
167
    }
168
169
    const compressed = (buf[0] & 0x40) === 0x40;
170
171
    if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
172
      this._loop = false;
173
      return error(RangeError, 'RSV1 must be clear', true, 1002);
174
    }
175
176
    this._fin = (buf[0] & 0x80) === 0x80;
177
    this._opcode = buf[0] & 0x0f;
178
    this._payloadLength = buf[1] & 0x7f;
179
180
    if (this._opcode === 0x00) {
181
      if (compressed) {
182
        this._loop = false;
183
        return error(RangeError, 'RSV1 must be clear', true, 1002);
184
      }
185
186
      if (!this._fragmented) {
187
        this._loop = false;
188
        return error(RangeError, 'invalid opcode 0', true, 1002);
189
      }
190
191
      this._opcode = this._fragmented;
192
    } else if (this._opcode === 0x01 || this._opcode === 0x02) {
193
      if (this._fragmented) {
194
        this._loop = false;
195
        return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
196
      }
197
198
      this._compressed = compressed;
199
    } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
200
      if (!this._fin) {
201
        this._loop = false;
202
        return error(RangeError, 'FIN must be set', true, 1002);
203
      }
204
205
      if (compressed) {
206
        this._loop = false;
207
        return error(RangeError, 'RSV1 must be clear', true, 1002);
208
      }
209
210
      if (this._payloadLength > 0x7d) {
211
        this._loop = false;
212
        return error(
213
          RangeError,
214
          `invalid payload length ${this._payloadLength}`,
215
          true,
216
          1002
217
        );
218
      }
219
    } else {
220
      this._loop = false;
221
      return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
222
    }
223
224
    if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
225
    this._masked = (buf[1] & 0x80) === 0x80;
226
227
    if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
228
    else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
229
    else return this.haveLength();
230
  }
231
232
  /**
233
   * Gets extended payload length (7+16).
234
   *
235
   * @return {(RangeError|undefined)} A possible error
236
   * @private
237
   */
238
  getPayloadLength16() {
239
    if (this._bufferedBytes < 2) {
240
      this._loop = false;
241
      return;
242
    }
243
244
    this._payloadLength = this.consume(2).readUInt16BE(0);
245
    return this.haveLength();
246
  }
247
248
  /**
249
   * Gets extended payload length (7+64).
250
   *
251
   * @return {(RangeError|undefined)} A possible error
252
   * @private
253
   */
254
  getPayloadLength64() {
255
    if (this._bufferedBytes < 8) {
256
      this._loop = false;
257
      return;
258
    }
259
260
    const buf = this.consume(8);
261
    const num = buf.readUInt32BE(0);
262
263
    //
264
    // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
265
    // if payload length is greater than this number.
266
    //
267
    if (num > Math.pow(2, 53 - 32) - 1) {
268
      this._loop = false;
269
      return error(
270
        RangeError,
271
        'Unsupported WebSocket frame: payload length > 2^53 - 1',
272
        false,
273
        1009
274
      );
275
    }
276
277
    this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
278
    return this.haveLength();
279
  }
280
281
  /**
282
   * Payload length has been read.
283
   *
284
   * @return {(RangeError|undefined)} A possible error
285
   * @private
286
   */
287
  haveLength() {
288
    if (this._payloadLength && this._opcode < 0x08) {
289
      this._totalPayloadLength += this._payloadLength;
290
      if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
291
        this._loop = false;
292
        return error(RangeError, 'Max payload size exceeded', false, 1009);
293
      }
294
    }
295
296
    if (this._masked) this._state = GET_MASK;
297
    else this._state = GET_DATA;
298
  }
299
300
  /**
301
   * Reads mask bytes.
302
   *
303
   * @private
304
   */
305
  getMask() {
306
    if (this._bufferedBytes < 4) {
307
      this._loop = false;
308
      return;
309
    }
310
311
    this._mask = this.consume(4);
312
    this._state = GET_DATA;
313
  }
314
315
  /**
316
   * Reads data bytes.
317
   *
318
   * @param {Function} cb Callback
319
   * @return {(Error|RangeError|undefined)} A possible error
320
   * @private
321
   */
322
  getData(cb) {
323
    var data = EMPTY_BUFFER;
324
325
    if (this._payloadLength) {
326
      if (this._bufferedBytes < this._payloadLength) {
327
        this._loop = false;
328
        return;
329
      }
330
331
      data = this.consume(this._payloadLength);
332
      if (this._masked) unmask(data, this._mask);
333
    }
334
335
    if (this._opcode > 0x07) return this.controlMessage(data);
336
337
    if (this._compressed) {
338
      this._state = INFLATING;
339
      this.decompress(data, cb);
340
      return;
341
    }
342
343
    if (data.length) {
344
      //
345
      // This message is not compressed so its lenght is the sum of the payload
346
      // length of all fragments.
347
      //
348
      this._messageLength = this._totalPayloadLength;
349
      this._fragments.push(data);
350
    }
351
352
    return this.dataMessage();
353
  }
354
355
  /**
356
   * Decompresses data.
357
   *
358
   * @param {Buffer} data Compressed data
359
   * @param {Function} cb Callback
360
   * @private
361
   */
362
  decompress(data, cb) {
363
    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
364
365
    perMessageDeflate.decompress(data, this._fin, (err, buf) => {
366
      if (err) return cb(err);
367
368
      if (buf.length) {
369
        this._messageLength += buf.length;
370
        if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
371
          return cb(
372
            error(RangeError, 'Max payload size exceeded', false, 1009)
373
          );
374
        }
375
376
        this._fragments.push(buf);
377
      }
378
379
      const er = this.dataMessage();
380
      if (er) return cb(er);
381
382
      this.startLoop(cb);
383
    });
384
  }
385
386
  /**
387
   * Handles a data message.
388
   *
389
   * @return {(Error|undefined)} A possible error
390
   * @private
391
   */
392
  dataMessage() {
393
    if (this._fin) {
394
      const messageLength = this._messageLength;
395
      const fragments = this._fragments;
396
397
      this._totalPayloadLength = 0;
398
      this._messageLength = 0;
399
      this._fragmented = 0;
400
      this._fragments = [];
401
402
      if (this._opcode === 2) {
403
        var data;
404
405
        if (this._binaryType === 'nodebuffer') {
406
          data = concat(fragments, messageLength);
407
        } else if (this._binaryType === 'arraybuffer') {
408
          data = toArrayBuffer(concat(fragments, messageLength));
409
        } else {
410
          data = fragments;
411
        }
412
413
        this.emit('message', data);
414
      } else {
415
        const buf = concat(fragments, messageLength);
416
417
        if (!isValidUTF8(buf)) {
418
          this._loop = false;
419
          return error(Error, 'invalid UTF-8 sequence', true, 1007);
420
        }
421
422
        this.emit('message', buf.toString());
423
      }
424
    }
425
426
    this._state = GET_INFO;
427
  }
428
429
  /**
430
   * Handles a control message.
431
   *
432
   * @param {Buffer} data Data to handle
433
   * @return {(Error|RangeError|undefined)} A possible error
434
   * @private
435
   */
436
  controlMessage(data) {
437
    if (this._opcode === 0x08) {
438
      this._loop = false;
439
440
      if (data.length === 0) {
441
        this.emit('conclude', 1005, '');
442
        this.end();
443
      } else if (data.length === 1) {
444
        return error(RangeError, 'invalid payload length 1', true, 1002);
445
      } else {
446
        const code = data.readUInt16BE(0);
447
448
        if (!isValidStatusCode(code)) {
449
          return error(RangeError, `invalid status code ${code}`, true, 1002);
450
        }
451
452
        const buf = data.slice(2);
453
454
        if (!isValidUTF8(buf)) {
455
          return error(Error, 'invalid UTF-8 sequence', true, 1007);
456
        }
457
458
        this.emit('conclude', code, buf.toString());
459
        this.end();
460
      }
461
    } else if (this._opcode === 0x09) {
462
      this.emit('ping', data);
463
    } else {
464
      this.emit('pong', data);
465
    }
466
467
    this._state = GET_INFO;
468
  }
469
}
470
471
module.exports = Receiver;
472
473
/**
474
 * Builds an error object.
475
 *
476
 * @param {(Error|RangeError)} ErrorCtor The error constructor
477
 * @param {String} message The error message
478
 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
479
 *     `message`
480
 * @param {Number} statusCode The status code
481
 * @return {(Error|RangeError)} The error
482
 * @private
483
 */
484
function error(ErrorCtor, message, prefix, statusCode) {
485
  const err = new ErrorCtor(
486
    prefix ? `Invalid WebSocket frame: ${message}` : message
487
  );
488
489
  Error.captureStackTrace(err, error);
490
  err[kStatusCode] = statusCode;
491
  return err;
492
}