Projekt

Obecné

Profil

Stáhnout (13.8 KB) Statistiky
| Větev: | Revize:
1
'use strict';
2

    
3
const Limiter = require('async-limiter');
4
const zlib = require('zlib');
5

    
6
const bufferUtil = require('./buffer-util');
7
const { kStatusCode, NOOP } = require('./constants');
8

    
9
const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
10
const EMPTY_BLOCK = Buffer.from([0x00]);
11

    
12
const kPerMessageDeflate = Symbol('permessage-deflate');
13
const kTotalLength = Symbol('total-length');
14
const kCallback = Symbol('callback');
15
const kBuffers = Symbol('buffers');
16
const kError = Symbol('error');
17

    
18
//
19
// We limit zlib concurrency, which prevents severe memory fragmentation
20
// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
21
// and https://github.com/websockets/ws/issues/1202
22
//
23
// Intentionally global; it's the global thread pool that's an issue.
24
//
25
let zlibLimiter;
26

    
27
/**
28
 * permessage-deflate implementation.
29
 */
30
class PerMessageDeflate {
31
  /**
32
   * Creates a PerMessageDeflate instance.
33
   *
34
   * @param {Object} options Configuration options
35
   * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
36
   *     of server context takeover
37
   * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
38
   *     disabling of client context takeover
39
   * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
40
   *     use of a custom server window size
41
   * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
42
   *     for, or request, a custom client window size
43
   * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
44
   * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
45
   * @param {Number} options.threshold Size (in bytes) below which messages
46
   *     should not be compressed
47
   * @param {Number} options.concurrencyLimit The number of concurrent calls to
48
   *     zlib
49
   * @param {Boolean} isServer Create the instance in either server or client
50
   *     mode
51
   * @param {Number} maxPayload The maximum allowed message length
52
   */
53
  constructor(options, isServer, maxPayload) {
54
    this._maxPayload = maxPayload | 0;
55
    this._options = options || {};
56
    this._threshold =
57
      this._options.threshold !== undefined ? this._options.threshold : 1024;
58
    this._isServer = !!isServer;
59
    this._deflate = null;
60
    this._inflate = null;
61

    
62
    this.params = null;
63

    
64
    if (!zlibLimiter) {
65
      const concurrency =
66
        this._options.concurrencyLimit !== undefined
67
          ? this._options.concurrencyLimit
68
          : 10;
69
      zlibLimiter = new Limiter({ concurrency });
70
    }
71
  }
72

    
73
  /**
74
   * @type {String}
75
   */
76
  static get extensionName() {
77
    return 'permessage-deflate';
78
  }
79

    
80
  /**
81
   * Create an extension negotiation offer.
82
   *
83
   * @return {Object} Extension parameters
84
   * @public
85
   */
86
  offer() {
87
    const params = {};
88

    
89
    if (this._options.serverNoContextTakeover) {
90
      params.server_no_context_takeover = true;
91
    }
92
    if (this._options.clientNoContextTakeover) {
93
      params.client_no_context_takeover = true;
94
    }
95
    if (this._options.serverMaxWindowBits) {
96
      params.server_max_window_bits = this._options.serverMaxWindowBits;
97
    }
98
    if (this._options.clientMaxWindowBits) {
99
      params.client_max_window_bits = this._options.clientMaxWindowBits;
100
    } else if (this._options.clientMaxWindowBits == null) {
101
      params.client_max_window_bits = true;
102
    }
103

    
104
    return params;
105
  }
106

    
107
  /**
108
   * Accept an extension negotiation offer/response.
109
   *
110
   * @param {Array} configurations The extension negotiation offers/reponse
111
   * @return {Object} Accepted configuration
112
   * @public
113
   */
114
  accept(configurations) {
115
    configurations = this.normalizeParams(configurations);
116

    
117
    this.params = this._isServer
118
      ? this.acceptAsServer(configurations)
119
      : this.acceptAsClient(configurations);
120

    
121
    return this.params;
122
  }
123

    
124
  /**
125
   * Releases all resources used by the extension.
126
   *
127
   * @public
128
   */
129
  cleanup() {
130
    if (this._inflate) {
131
      this._inflate.close();
132
      this._inflate = null;
133
    }
134

    
135
    if (this._deflate) {
136
      this._deflate.close();
137
      this._deflate = null;
138
    }
139
  }
140

    
141
  /**
142
   *  Accept an extension negotiation offer.
143
   *
144
   * @param {Array} offers The extension negotiation offers
145
   * @return {Object} Accepted configuration
146
   * @private
147
   */
148
  acceptAsServer(offers) {
149
    const opts = this._options;
150
    const accepted = offers.find((params) => {
151
      if (
152
        (opts.serverNoContextTakeover === false &&
153
          params.server_no_context_takeover) ||
154
        (params.server_max_window_bits &&
155
          (opts.serverMaxWindowBits === false ||
156
            (typeof opts.serverMaxWindowBits === 'number' &&
157
              opts.serverMaxWindowBits > params.server_max_window_bits))) ||
158
        (typeof opts.clientMaxWindowBits === 'number' &&
159
          !params.client_max_window_bits)
160
      ) {
161
        return false;
162
      }
163

    
164
      return true;
165
    });
166

    
167
    if (!accepted) {
168
      throw new Error('None of the extension offers can be accepted');
169
    }
170

    
171
    if (opts.serverNoContextTakeover) {
172
      accepted.server_no_context_takeover = true;
173
    }
174
    if (opts.clientNoContextTakeover) {
175
      accepted.client_no_context_takeover = true;
176
    }
177
    if (typeof opts.serverMaxWindowBits === 'number') {
178
      accepted.server_max_window_bits = opts.serverMaxWindowBits;
179
    }
180
    if (typeof opts.clientMaxWindowBits === 'number') {
181
      accepted.client_max_window_bits = opts.clientMaxWindowBits;
182
    } else if (
183
      accepted.client_max_window_bits === true ||
184
      opts.clientMaxWindowBits === false
185
    ) {
186
      delete accepted.client_max_window_bits;
187
    }
188

    
189
    return accepted;
190
  }
191

    
192
  /**
193
   * Accept the extension negotiation response.
194
   *
195
   * @param {Array} response The extension negotiation response
196
   * @return {Object} Accepted configuration
197
   * @private
198
   */
199
  acceptAsClient(response) {
200
    const params = response[0];
201

    
202
    if (
203
      this._options.clientNoContextTakeover === false &&
204
      params.client_no_context_takeover
205
    ) {
206
      throw new Error('Unexpected parameter "client_no_context_takeover"');
207
    }
208

    
209
    if (!params.client_max_window_bits) {
210
      if (typeof this._options.clientMaxWindowBits === 'number') {
211
        params.client_max_window_bits = this._options.clientMaxWindowBits;
212
      }
213
    } else if (
214
      this._options.clientMaxWindowBits === false ||
215
      (typeof this._options.clientMaxWindowBits === 'number' &&
216
        params.client_max_window_bits > this._options.clientMaxWindowBits)
217
    ) {
218
      throw new Error(
219
        'Unexpected or invalid parameter "client_max_window_bits"'
220
      );
221
    }
222

    
223
    return params;
224
  }
225

    
226
  /**
227
   * Normalize parameters.
228
   *
229
   * @param {Array} configurations The extension negotiation offers/reponse
230
   * @return {Array} The offers/response with normalized parameters
231
   * @private
232
   */
233
  normalizeParams(configurations) {
234
    configurations.forEach((params) => {
235
      Object.keys(params).forEach((key) => {
236
        var value = params[key];
237

    
238
        if (value.length > 1) {
239
          throw new Error(`Parameter "${key}" must have only a single value`);
240
        }
241

    
242
        value = value[0];
243

    
244
        if (key === 'client_max_window_bits') {
245
          if (value !== true) {
246
            const num = +value;
247
            if (!Number.isInteger(num) || num < 8 || num > 15) {
248
              throw new TypeError(
249
                `Invalid value for parameter "${key}": ${value}`
250
              );
251
            }
252
            value = num;
253
          } else if (!this._isServer) {
254
            throw new TypeError(
255
              `Invalid value for parameter "${key}": ${value}`
256
            );
257
          }
258
        } else if (key === 'server_max_window_bits') {
259
          const num = +value;
260
          if (!Number.isInteger(num) || num < 8 || num > 15) {
261
            throw new TypeError(
262
              `Invalid value for parameter "${key}": ${value}`
263
            );
264
          }
265
          value = num;
266
        } else if (
267
          key === 'client_no_context_takeover' ||
268
          key === 'server_no_context_takeover'
269
        ) {
270
          if (value !== true) {
271
            throw new TypeError(
272
              `Invalid value for parameter "${key}": ${value}`
273
            );
274
          }
275
        } else {
276
          throw new Error(`Unknown parameter "${key}"`);
277
        }
278

    
279
        params[key] = value;
280
      });
281
    });
282

    
283
    return configurations;
284
  }
285

    
286
  /**
287
   * Decompress data. Concurrency limited by async-limiter.
288
   *
289
   * @param {Buffer} data Compressed data
290
   * @param {Boolean} fin Specifies whether or not this is the last fragment
291
   * @param {Function} callback Callback
292
   * @public
293
   */
294
  decompress(data, fin, callback) {
295
    zlibLimiter.push((done) => {
296
      this._decompress(data, fin, (err, result) => {
297
        done();
298
        callback(err, result);
299
      });
300
    });
301
  }
302

    
303
  /**
304
   * Compress data. Concurrency limited by async-limiter.
305
   *
306
   * @param {Buffer} data Data to compress
307
   * @param {Boolean} fin Specifies whether or not this is the last fragment
308
   * @param {Function} callback Callback
309
   * @public
310
   */
311
  compress(data, fin, callback) {
312
    zlibLimiter.push((done) => {
313
      this._compress(data, fin, (err, result) => {
314
        done();
315
        callback(err, result);
316
      });
317
    });
318
  }
319

    
320
  /**
321
   * Decompress data.
322
   *
323
   * @param {Buffer} data Compressed data
324
   * @param {Boolean} fin Specifies whether or not this is the last fragment
325
   * @param {Function} callback Callback
326
   * @private
327
   */
328
  _decompress(data, fin, callback) {
329
    const endpoint = this._isServer ? 'client' : 'server';
330

    
331
    if (!this._inflate) {
332
      const key = `${endpoint}_max_window_bits`;
333
      const windowBits =
334
        typeof this.params[key] !== 'number'
335
          ? zlib.Z_DEFAULT_WINDOWBITS
336
          : this.params[key];
337

    
338
      this._inflate = zlib.createInflateRaw(
339
        Object.assign({}, this._options.zlibInflateOptions, { windowBits })
340
      );
341
      this._inflate[kPerMessageDeflate] = this;
342
      this._inflate[kTotalLength] = 0;
343
      this._inflate[kBuffers] = [];
344
      this._inflate.on('error', inflateOnError);
345
      this._inflate.on('data', inflateOnData);
346
    }
347

    
348
    this._inflate[kCallback] = callback;
349

    
350
    this._inflate.write(data);
351
    if (fin) this._inflate.write(TRAILER);
352

    
353
    this._inflate.flush(() => {
354
      const err = this._inflate[kError];
355

    
356
      if (err) {
357
        this._inflate.close();
358
        this._inflate = null;
359
        callback(err);
360
        return;
361
      }
362

    
363
      const data = bufferUtil.concat(
364
        this._inflate[kBuffers],
365
        this._inflate[kTotalLength]
366
      );
367

    
368
      if (fin && this.params[`${endpoint}_no_context_takeover`]) {
369
        this._inflate.close();
370
        this._inflate = null;
371
      } else {
372
        this._inflate[kTotalLength] = 0;
373
        this._inflate[kBuffers] = [];
374
      }
375

    
376
      callback(null, data);
377
    });
378
  }
379

    
380
  /**
381
   * Compress data.
382
   *
383
   * @param {Buffer} data Data to compress
384
   * @param {Boolean} fin Specifies whether or not this is the last fragment
385
   * @param {Function} callback Callback
386
   * @private
387
   */
388
  _compress(data, fin, callback) {
389
    if (!data || data.length === 0) {
390
      process.nextTick(callback, null, EMPTY_BLOCK);
391
      return;
392
    }
393

    
394
    const endpoint = this._isServer ? 'server' : 'client';
395

    
396
    if (!this._deflate) {
397
      const key = `${endpoint}_max_window_bits`;
398
      const windowBits =
399
        typeof this.params[key] !== 'number'
400
          ? zlib.Z_DEFAULT_WINDOWBITS
401
          : this.params[key];
402

    
403
      this._deflate = zlib.createDeflateRaw(
404
        Object.assign({}, this._options.zlibDeflateOptions, { windowBits })
405
      );
406

    
407
      this._deflate[kTotalLength] = 0;
408
      this._deflate[kBuffers] = [];
409

    
410
      //
411
      // An `'error'` event is emitted, only on Node.js < 10.0.0, if the
412
      // `zlib.DeflateRaw` instance is closed while data is being processed.
413
      // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong
414
      // time due to an abnormal WebSocket closure.
415
      //
416
      this._deflate.on('error', NOOP);
417
      this._deflate.on('data', deflateOnData);
418
    }
419

    
420
    this._deflate.write(data);
421
    this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
422
      if (!this._deflate) {
423
        //
424
        // This `if` statement is only needed for Node.js < 10.0.0 because as of
425
        // commit https://github.com/nodejs/node/commit/5e3f5164, the flush
426
        // callback is no longer called if the deflate stream is closed while
427
        // data is being processed.
428
        //
429
        return;
430
      }
431

    
432
      var data = bufferUtil.concat(
433
        this._deflate[kBuffers],
434
        this._deflate[kTotalLength]
435
      );
436

    
437
      if (fin) data = data.slice(0, data.length - 4);
438

    
439
      if (fin && this.params[`${endpoint}_no_context_takeover`]) {
440
        this._deflate.close();
441
        this._deflate = null;
442
      } else {
443
        this._deflate[kTotalLength] = 0;
444
        this._deflate[kBuffers] = [];
445
      }
446

    
447
      callback(null, data);
448
    });
449
  }
450
}
451

    
452
module.exports = PerMessageDeflate;
453

    
454
/**
455
 * The listener of the `zlib.DeflateRaw` stream `'data'` event.
456
 *
457
 * @param {Buffer} chunk A chunk of data
458
 * @private
459
 */
460
function deflateOnData(chunk) {
461
  this[kBuffers].push(chunk);
462
  this[kTotalLength] += chunk.length;
463
}
464

    
465
/**
466
 * The listener of the `zlib.InflateRaw` stream `'data'` event.
467
 *
468
 * @param {Buffer} chunk A chunk of data
469
 * @private
470
 */
471
function inflateOnData(chunk) {
472
  this[kTotalLength] += chunk.length;
473

    
474
  if (
475
    this[kPerMessageDeflate]._maxPayload < 1 ||
476
    this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
477
  ) {
478
    this[kBuffers].push(chunk);
479
    return;
480
  }
481

    
482
  this[kError] = new RangeError('Max payload size exceeded');
483
  this[kError][kStatusCode] = 1009;
484
  this.removeListener('data', inflateOnData);
485
  this.reset();
486
}
487

    
488
/**
489
 * The listener of the `zlib.InflateRaw` stream `'error'` event.
490
 *
491
 * @param {Error} err The emitted error
492
 * @private
493
 */
494
function inflateOnError(err) {
495
  //
496
  // There is no need to call `Zlib#close()` as the handle is automatically
497
  // closed when an error is emitted.
498
  //
499
  this[kPerMessageDeflate]._inflate = null;
500
  err[kStatusCode] = 1007;
501
  this[kCallback](err);
502
}
(5-5/10)