Projekt

Obecné

Profil

Stáhnout (23.4 KB) Statistiky
| Větev: | Revize:
1 3a515b92 cagy
'use strict';
2
3
const EventEmitter = require('events');
4
const crypto = require('crypto');
5
const https = require('https');
6
const http = require('http');
7
const net = require('net');
8
const tls = require('tls');
9
const url = require('url');
10
11
const PerMessageDeflate = require('./permessage-deflate');
12
const EventTarget = require('./event-target');
13
const extension = require('./extension');
14
const Receiver = require('./receiver');
15
const Sender = require('./sender');
16
const {
17
  BINARY_TYPES,
18
  EMPTY_BUFFER,
19
  GUID,
20
  kStatusCode,
21
  kWebSocket,
22
  NOOP
23
} = require('./constants');
24
25
const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
26
const protocolVersions = [8, 13];
27
const closeTimeout = 30 * 1000;
28
29
/**
30
 * Class representing a WebSocket.
31
 *
32
 * @extends EventEmitter
33
 */
34
class WebSocket extends EventEmitter {
35
  /**
36
   * Create a new `WebSocket`.
37
   *
38
   * @param {(String|url.Url|url.URL)} address The URL to which to connect
39
   * @param {(String|String[])} protocols The subprotocols
40
   * @param {Object} options Connection options
41
   */
42
  constructor(address, protocols, options) {
43
    super();
44
45
    this.readyState = WebSocket.CONNECTING;
46
    this.protocol = '';
47
48
    this._binaryType = BINARY_TYPES[0];
49
    this._closeFrameReceived = false;
50
    this._closeFrameSent = false;
51
    this._closeMessage = '';
52
    this._closeTimer = null;
53
    this._closeCode = 1006;
54
    this._extensions = {};
55
    this._receiver = null;
56
    this._sender = null;
57
    this._socket = null;
58
59
    if (address !== null) {
60
      this._isServer = false;
61
      this._redirects = 0;
62
63
      if (Array.isArray(protocols)) {
64
        protocols = protocols.join(', ');
65
      } else if (typeof protocols === 'object' && protocols !== null) {
66
        options = protocols;
67
        protocols = undefined;
68
      }
69
70
      initAsClient(this, address, protocols, options);
71
    } else {
72
      this._isServer = true;
73
    }
74
  }
75
76
  get CONNECTING() {
77
    return WebSocket.CONNECTING;
78
  }
79
  get CLOSING() {
80
    return WebSocket.CLOSING;
81
  }
82
  get CLOSED() {
83
    return WebSocket.CLOSED;
84
  }
85
  get OPEN() {
86
    return WebSocket.OPEN;
87
  }
88
89
  /**
90
   * This deviates from the WHATWG interface since ws doesn't support the
91
   * required default "blob" type (instead we define a custom "nodebuffer"
92
   * type).
93
   *
94
   * @type {String}
95
   */
96
  get binaryType() {
97
    return this._binaryType;
98
  }
99
100
  set binaryType(type) {
101
    if (!BINARY_TYPES.includes(type)) return;
102
103
    this._binaryType = type;
104
105
    //
106
    // Allow to change `binaryType` on the fly.
107
    //
108
    if (this._receiver) this._receiver._binaryType = type;
109
  }
110
111
  /**
112
   * @type {Number}
113
   */
114
  get bufferedAmount() {
115
    if (!this._socket) return 0;
116
117
    //
118
    // `socket.bufferSize` is `undefined` if the socket is closed.
119
    //
120
    return (this._socket.bufferSize || 0) + this._sender._bufferedBytes;
121
  }
122
123
  /**
124
   * @type {String}
125
   */
126
  get extensions() {
127
    return Object.keys(this._extensions).join();
128
  }
129
130
  /**
131
   * Set up the socket and the internal resources.
132
   *
133
   * @param {net.Socket} socket The network socket between the server and client
134
   * @param {Buffer} head The first packet of the upgraded stream
135
   * @param {Number} maxPayload The maximum allowed message size
136
   * @private
137
   */
138
  setSocket(socket, head, maxPayload) {
139
    const receiver = new Receiver(
140
      this._binaryType,
141
      this._extensions,
142
      maxPayload
143
    );
144
145
    this._sender = new Sender(socket, this._extensions);
146
    this._receiver = receiver;
147
    this._socket = socket;
148
149
    receiver[kWebSocket] = this;
150
    socket[kWebSocket] = this;
151
152
    receiver.on('conclude', receiverOnConclude);
153
    receiver.on('drain', receiverOnDrain);
154
    receiver.on('error', receiverOnError);
155
    receiver.on('message', receiverOnMessage);
156
    receiver.on('ping', receiverOnPing);
157
    receiver.on('pong', receiverOnPong);
158
159
    socket.setTimeout(0);
160
    socket.setNoDelay();
161
162
    if (head.length > 0) socket.unshift(head);
163
164
    socket.on('close', socketOnClose);
165
    socket.on('data', socketOnData);
166
    socket.on('end', socketOnEnd);
167
    socket.on('error', socketOnError);
168
169
    this.readyState = WebSocket.OPEN;
170
    this.emit('open');
171
  }
172
173
  /**
174
   * Emit the `'close'` event.
175
   *
176
   * @private
177
   */
178
  emitClose() {
179
    this.readyState = WebSocket.CLOSED;
180
181
    if (!this._socket) {
182
      this.emit('close', this._closeCode, this._closeMessage);
183
      return;
184
    }
185
186
    if (this._extensions[PerMessageDeflate.extensionName]) {
187
      this._extensions[PerMessageDeflate.extensionName].cleanup();
188
    }
189
190
    this._receiver.removeAllListeners();
191
    this.emit('close', this._closeCode, this._closeMessage);
192
  }
193
194
  /**
195
   * Start a closing handshake.
196
   *
197
   *          +----------+   +-----------+   +----------+
198
   *     - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
199
   *    |     +----------+   +-----------+   +----------+     |
200
   *          +----------+   +-----------+         |
201
   * CLOSING  |ws.close()|<--|close frame|<--+-----+       CLOSING
202
   *          +----------+   +-----------+   |
203
   *    |           |                        |   +---+        |
204
   *                +------------------------+-->|fin| - - - -
205
   *    |         +---+                      |   +---+
206
   *     - - - - -|fin|<---------------------+
207
   *              +---+
208
   *
209
   * @param {Number} code Status code explaining why the connection is closing
210
   * @param {String} data A string explaining why the connection is closing
211
   * @public
212
   */
213
  close(code, data) {
214
    if (this.readyState === WebSocket.CLOSED) return;
215
    if (this.readyState === WebSocket.CONNECTING) {
216
      const msg = 'WebSocket was closed before the connection was established';
217
      return abortHandshake(this, this._req, msg);
218
    }
219
220
    if (this.readyState === WebSocket.CLOSING) {
221
      if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
222
      return;
223
    }
224
225
    this.readyState = WebSocket.CLOSING;
226
    this._sender.close(code, data, !this._isServer, (err) => {
227
      //
228
      // This error is handled by the `'error'` listener on the socket. We only
229
      // want to know if the close frame has been sent here.
230
      //
231
      if (err) return;
232
233
      this._closeFrameSent = true;
234
      if (this._closeFrameReceived) this._socket.end();
235
    });
236
237
    //
238
    // Specify a timeout for the closing handshake to complete.
239
    //
240
    this._closeTimer = setTimeout(
241
      this._socket.destroy.bind(this._socket),
242
      closeTimeout
243
    );
244
  }
245
246
  /**
247
   * Send a ping.
248
   *
249
   * @param {*} data The data to send
250
   * @param {Boolean} mask Indicates whether or not to mask `data`
251
   * @param {Function} cb Callback which is executed when the ping is sent
252
   * @public
253
   */
254
  ping(data, mask, cb) {
255
    if (typeof data === 'function') {
256
      cb = data;
257
      data = mask = undefined;
258
    } else if (typeof mask === 'function') {
259
      cb = mask;
260
      mask = undefined;
261
    }
262
263
    if (this.readyState !== WebSocket.OPEN) {
264
      const err = new Error(
265
        `WebSocket is not open: readyState ${this.readyState} ` +
266
          `(${readyStates[this.readyState]})`
267
      );
268
269
      if (cb) return cb(err);
270
      throw err;
271
    }
272
273
    if (typeof data === 'number') data = data.toString();
274
    if (mask === undefined) mask = !this._isServer;
275
    this._sender.ping(data || EMPTY_BUFFER, mask, cb);
276
  }
277
278
  /**
279
   * Send a pong.
280
   *
281
   * @param {*} data The data to send
282
   * @param {Boolean} mask Indicates whether or not to mask `data`
283
   * @param {Function} cb Callback which is executed when the pong is sent
284
   * @public
285
   */
286
  pong(data, mask, cb) {
287
    if (typeof data === 'function') {
288
      cb = data;
289
      data = mask = undefined;
290
    } else if (typeof mask === 'function') {
291
      cb = mask;
292
      mask = undefined;
293
    }
294
295
    if (this.readyState !== WebSocket.OPEN) {
296
      const err = new Error(
297
        `WebSocket is not open: readyState ${this.readyState} ` +
298
          `(${readyStates[this.readyState]})`
299
      );
300
301
      if (cb) return cb(err);
302
      throw err;
303
    }
304
305
    if (typeof data === 'number') data = data.toString();
306
    if (mask === undefined) mask = !this._isServer;
307
    this._sender.pong(data || EMPTY_BUFFER, mask, cb);
308
  }
309
310
  /**
311
   * Send a data message.
312
   *
313
   * @param {*} data The message to send
314
   * @param {Object} options Options object
315
   * @param {Boolean} options.compress Specifies whether or not to compress `data`
316
   * @param {Boolean} options.binary Specifies whether `data` is binary or text
317
   * @param {Boolean} options.fin Specifies whether the fragment is the last one
318
   * @param {Boolean} options.mask Specifies whether or not to mask `data`
319
   * @param {Function} cb Callback which is executed when data is written out
320
   * @public
321
   */
322
  send(data, options, cb) {
323
    if (typeof options === 'function') {
324
      cb = options;
325
      options = {};
326
    }
327
328
    if (this.readyState !== WebSocket.OPEN) {
329
      const err = new Error(
330
        `WebSocket is not open: readyState ${this.readyState} ` +
331
          `(${readyStates[this.readyState]})`
332
      );
333
334
      if (cb) return cb(err);
335
      throw err;
336
    }
337
338
    if (typeof data === 'number') data = data.toString();
339
340
    const opts = Object.assign(
341
      {
342
        binary: typeof data !== 'string',
343
        mask: !this._isServer,
344
        compress: true,
345
        fin: true
346
      },
347
      options
348
    );
349
350
    if (!this._extensions[PerMessageDeflate.extensionName]) {
351
      opts.compress = false;
352
    }
353
354
    this._sender.send(data || EMPTY_BUFFER, opts, cb);
355
  }
356
357
  /**
358
   * Forcibly close the connection.
359
   *
360
   * @public
361
   */
362
  terminate() {
363
    if (this.readyState === WebSocket.CLOSED) return;
364
    if (this.readyState === WebSocket.CONNECTING) {
365
      const msg = 'WebSocket was closed before the connection was established';
366
      return abortHandshake(this, this._req, msg);
367
    }
368
369
    if (this._socket) {
370
      this.readyState = WebSocket.CLOSING;
371
      this._socket.destroy();
372
    }
373
  }
374
}
375
376
readyStates.forEach((readyState, i) => {
377
  WebSocket[readyState] = i;
378
});
379
380
//
381
// Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
382
// See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
383
//
384
['open', 'error', 'close', 'message'].forEach((method) => {
385
  Object.defineProperty(WebSocket.prototype, `on${method}`, {
386
    /**
387
     * Return the listener of the event.
388
     *
389
     * @return {(Function|undefined)} The event listener or `undefined`
390
     * @public
391
     */
392
    get() {
393
      const listeners = this.listeners(method);
394
      for (var i = 0; i < listeners.length; i++) {
395
        if (listeners[i]._listener) return listeners[i]._listener;
396
      }
397
398
      return undefined;
399
    },
400
    /**
401
     * Add a listener for the event.
402
     *
403
     * @param {Function} listener The listener to add
404
     * @public
405
     */
406
    set(listener) {
407
      const listeners = this.listeners(method);
408
      for (var i = 0; i < listeners.length; i++) {
409
        //
410
        // Remove only the listeners added via `addEventListener`.
411
        //
412
        if (listeners[i]._listener) this.removeListener(method, listeners[i]);
413
      }
414
      this.addEventListener(method, listener);
415
    }
416
  });
417
});
418
419
WebSocket.prototype.addEventListener = EventTarget.addEventListener;
420
WebSocket.prototype.removeEventListener = EventTarget.removeEventListener;
421
422
module.exports = WebSocket;
423
424
/**
425
 * Initialize a WebSocket client.
426
 *
427
 * @param {WebSocket} websocket The client to initialize
428
 * @param {(String|url.Url|url.URL)} address The URL to which to connect
429
 * @param {String} protocols The subprotocols
430
 * @param {Object} options Connection options
431
 * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable
432
 *     permessage-deflate
433
 * @param {Number} options.handshakeTimeout Timeout in milliseconds for the
434
 *     handshake request
435
 * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version`
436
 *     header
437
 * @param {String} options.origin Value of the `Origin` or
438
 *     `Sec-WebSocket-Origin` header
439
 * @param {Number} options.maxPayload The maximum allowed message size
440
 * @param {Boolean} options.followRedirects Whether or not to follow redirects
441
 * @param {Number} options.maxRedirects The maximum number of redirects allowed
442
 * @private
443
 */
444
function initAsClient(websocket, address, protocols, options) {
445
  const opts = Object.assign(
446
    {
447
      protocolVersion: protocolVersions[1],
448
      maxPayload: 100 * 1024 * 1024,
449
      perMessageDeflate: true,
450
      followRedirects: false,
451
      maxRedirects: 10
452
    },
453
    options,
454
    {
455
      createConnection: undefined,
456
      socketPath: undefined,
457
      hostname: undefined,
458
      protocol: undefined,
459
      timeout: undefined,
460
      method: undefined,
461
      auth: undefined,
462
      host: undefined,
463
      path: undefined,
464
      port: undefined
465
    }
466
  );
467
468
  if (!protocolVersions.includes(opts.protocolVersion)) {
469
    throw new RangeError(
470
      `Unsupported protocol version: ${opts.protocolVersion} ` +
471
        `(supported versions: ${protocolVersions.join(', ')})`
472
    );
473
  }
474
475
  var parsedUrl;
476
477
  if (typeof address === 'object' && address.href !== undefined) {
478
    parsedUrl = address;
479
    websocket.url = address.href;
480
  } else {
481
    //
482
    // The WHATWG URL constructor is not available on Node.js < 6.13.0
483
    //
484
    parsedUrl = url.URL ? new url.URL(address) : url.parse(address);
485
    websocket.url = address;
486
  }
487
488
  const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
489
490
  if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
491
    throw new Error(`Invalid URL: ${websocket.url}`);
492
  }
493
494
  const isSecure =
495
    parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
496
  const defaultPort = isSecure ? 443 : 80;
497
  const key = crypto.randomBytes(16).toString('base64');
498
  const get = isSecure ? https.get : http.get;
499
  const path = parsedUrl.search
500
    ? `${parsedUrl.pathname || '/'}${parsedUrl.search}`
501
    : parsedUrl.pathname || '/';
502
  var perMessageDeflate;
503
504
  opts.createConnection = isSecure ? tlsConnect : netConnect;
505
  opts.defaultPort = opts.defaultPort || defaultPort;
506
  opts.port = parsedUrl.port || defaultPort;
507
  opts.host = parsedUrl.hostname.startsWith('[')
508
    ? parsedUrl.hostname.slice(1, -1)
509
    : parsedUrl.hostname;
510
  opts.headers = Object.assign(
511
    {
512
      'Sec-WebSocket-Version': opts.protocolVersion,
513
      'Sec-WebSocket-Key': key,
514
      Connection: 'Upgrade',
515
      Upgrade: 'websocket'
516
    },
517
    opts.headers
518
  );
519
  opts.path = path;
520
  opts.timeout = opts.handshakeTimeout;
521
522
  if (opts.perMessageDeflate) {
523
    perMessageDeflate = new PerMessageDeflate(
524
      opts.perMessageDeflate !== true ? opts.perMessageDeflate : {},
525
      false,
526
      opts.maxPayload
527
    );
528
    opts.headers['Sec-WebSocket-Extensions'] = extension.format({
529
      [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
530
    });
531
  }
532
  if (protocols) {
533
    opts.headers['Sec-WebSocket-Protocol'] = protocols;
534
  }
535
  if (opts.origin) {
536
    if (opts.protocolVersion < 13) {
537
      opts.headers['Sec-WebSocket-Origin'] = opts.origin;
538
    } else {
539
      opts.headers.Origin = opts.origin;
540
    }
541
  }
542
  if (parsedUrl.auth) {
543
    opts.auth = parsedUrl.auth;
544
  } else if (parsedUrl.username || parsedUrl.password) {
545
    opts.auth = `${parsedUrl.username}:${parsedUrl.password}`;
546
  }
547
548
  if (isUnixSocket) {
549
    const parts = path.split(':');
550
551
    opts.socketPath = parts[0];
552
    opts.path = parts[1];
553
  }
554
555
  var req = (websocket._req = get(opts));
556
557
  if (opts.timeout) {
558
    req.on('timeout', () => {
559
      abortHandshake(websocket, req, 'Opening handshake has timed out');
560
    });
561
  }
562
563
  req.on('error', (err) => {
564
    if (websocket._req.aborted) return;
565
566
    req = websocket._req = null;
567
    websocket.readyState = WebSocket.CLOSING;
568
    websocket.emit('error', err);
569
    websocket.emitClose();
570
  });
571
572
  req.on('response', (res) => {
573
    const location = res.headers.location;
574
    const statusCode = res.statusCode;
575
576
    if (
577
      location &&
578
      opts.followRedirects &&
579
      statusCode >= 300 &&
580
      statusCode < 400
581
    ) {
582
      if (++websocket._redirects > opts.maxRedirects) {
583
        abortHandshake(websocket, req, 'Maximum redirects exceeded');
584
        return;
585
      }
586
587
      req.abort();
588
589
      const addr = url.URL
590
        ? new url.URL(location, address)
591
        : url.resolve(address, location);
592
593
      initAsClient(websocket, addr, protocols, options);
594
    } else if (!websocket.emit('unexpected-response', req, res)) {
595
      abortHandshake(
596
        websocket,
597
        req,
598
        `Unexpected server response: ${res.statusCode}`
599
      );
600
    }
601
  });
602
603
  req.on('upgrade', (res, socket, head) => {
604
    websocket.emit('upgrade', res);
605
606
    //
607
    // The user may have closed the connection from a listener of the `upgrade`
608
    // event.
609
    //
610
    if (websocket.readyState !== WebSocket.CONNECTING) return;
611
612
    req = websocket._req = null;
613
614
    const digest = crypto
615
      .createHash('sha1')
616
      .update(key + GUID)
617
      .digest('base64');
618
619
    if (res.headers['sec-websocket-accept'] !== digest) {
620
      abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header');
621
      return;
622
    }
623
624
    const serverProt = res.headers['sec-websocket-protocol'];
625
    const protList = (protocols || '').split(/, */);
626
    var protError;
627
628
    if (!protocols && serverProt) {
629
      protError = 'Server sent a subprotocol but none was requested';
630
    } else if (protocols && !serverProt) {
631
      protError = 'Server sent no subprotocol';
632
    } else if (serverProt && !protList.includes(serverProt)) {
633
      protError = 'Server sent an invalid subprotocol';
634
    }
635
636
    if (protError) {
637
      abortHandshake(websocket, socket, protError);
638
      return;
639
    }
640
641
    if (serverProt) websocket.protocol = serverProt;
642
643
    if (perMessageDeflate) {
644
      try {
645
        const extensions = extension.parse(
646
          res.headers['sec-websocket-extensions']
647
        );
648
649
        if (extensions[PerMessageDeflate.extensionName]) {
650
          perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
651
          websocket._extensions[
652
            PerMessageDeflate.extensionName
653
          ] = perMessageDeflate;
654
        }
655
      } catch (err) {
656
        abortHandshake(
657
          websocket,
658
          socket,
659
          'Invalid Sec-WebSocket-Extensions header'
660
        );
661
        return;
662
      }
663
    }
664
665
    websocket.setSocket(socket, head, opts.maxPayload);
666
  });
667
}
668
669
/**
670
 * Create a `net.Socket` and initiate a connection.
671
 *
672
 * @param {Object} options Connection options
673
 * @return {net.Socket} The newly created socket used to start the connection
674
 * @private
675
 */
676
function netConnect(options) {
677
  //
678
  // Override `options.path` only if `options` is a copy of the original options
679
  // object. This is always true on Node.js >= 8 but not on Node.js 6 where
680
  // `options.socketPath` might be `undefined` even if the `socketPath` option
681
  // was originally set.
682
  //
683
  if (options.protocolVersion) options.path = options.socketPath;
684
  return net.connect(options);
685
}
686
687
/**
688
 * Create a `tls.TLSSocket` and initiate a connection.
689
 *
690
 * @param {Object} options Connection options
691
 * @return {tls.TLSSocket} The newly created socket used to start the connection
692
 * @private
693
 */
694
function tlsConnect(options) {
695
  options.path = undefined;
696
  options.servername = options.servername || options.host;
697
  return tls.connect(options);
698
}
699
700
/**
701
 * Abort the handshake and emit an error.
702
 *
703
 * @param {WebSocket} websocket The WebSocket instance
704
 * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
705
 *     socket to destroy
706
 * @param {String} message The error message
707
 * @private
708
 */
709
function abortHandshake(websocket, stream, message) {
710
  websocket.readyState = WebSocket.CLOSING;
711
712
  const err = new Error(message);
713
  Error.captureStackTrace(err, abortHandshake);
714
715
  if (stream.setHeader) {
716
    stream.abort();
717
    stream.once('abort', websocket.emitClose.bind(websocket));
718
    websocket.emit('error', err);
719
  } else {
720
    stream.destroy(err);
721
    stream.once('error', websocket.emit.bind(websocket, 'error'));
722
    stream.once('close', websocket.emitClose.bind(websocket));
723
  }
724
}
725
726
/**
727
 * The listener of the `Receiver` `'conclude'` event.
728
 *
729
 * @param {Number} code The status code
730
 * @param {String} reason The reason for closing
731
 * @private
732
 */
733
function receiverOnConclude(code, reason) {
734
  const websocket = this[kWebSocket];
735
736
  websocket._socket.removeListener('data', socketOnData);
737
  websocket._socket.resume();
738
739
  websocket._closeFrameReceived = true;
740
  websocket._closeMessage = reason;
741
  websocket._closeCode = code;
742
743
  if (code === 1005) websocket.close();
744
  else websocket.close(code, reason);
745
}
746
747
/**
748
 * The listener of the `Receiver` `'drain'` event.
749
 *
750
 * @private
751
 */
752
function receiverOnDrain() {
753
  this[kWebSocket]._socket.resume();
754
}
755
756
/**
757
 * The listener of the `Receiver` `'error'` event.
758
 *
759
 * @param {(RangeError|Error)} err The emitted error
760
 * @private
761
 */
762
function receiverOnError(err) {
763
  const websocket = this[kWebSocket];
764
765
  websocket._socket.removeListener('data', socketOnData);
766
767
  websocket.readyState = WebSocket.CLOSING;
768
  websocket._closeCode = err[kStatusCode];
769
  websocket.emit('error', err);
770
  websocket._socket.destroy();
771
}
772
773
/**
774
 * The listener of the `Receiver` `'finish'` event.
775
 *
776
 * @private
777
 */
778
function receiverOnFinish() {
779
  this[kWebSocket].emitClose();
780
}
781
782
/**
783
 * The listener of the `Receiver` `'message'` event.
784
 *
785
 * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message
786
 * @private
787
 */
788
function receiverOnMessage(data) {
789
  this[kWebSocket].emit('message', data);
790
}
791
792
/**
793
 * The listener of the `Receiver` `'ping'` event.
794
 *
795
 * @param {Buffer} data The data included in the ping frame
796
 * @private
797
 */
798
function receiverOnPing(data) {
799
  const websocket = this[kWebSocket];
800
801
  websocket.pong(data, !websocket._isServer, NOOP);
802
  websocket.emit('ping', data);
803
}
804
805
/**
806
 * The listener of the `Receiver` `'pong'` event.
807
 *
808
 * @param {Buffer} data The data included in the pong frame
809
 * @private
810
 */
811
function receiverOnPong(data) {
812
  this[kWebSocket].emit('pong', data);
813
}
814
815
/**
816
 * The listener of the `net.Socket` `'close'` event.
817
 *
818
 * @private
819
 */
820
function socketOnClose() {
821
  const websocket = this[kWebSocket];
822
823
  this.removeListener('close', socketOnClose);
824
  this.removeListener('end', socketOnEnd);
825
826
  websocket.readyState = WebSocket.CLOSING;
827
828
  //
829
  // The close frame might not have been received or the `'end'` event emitted,
830
  // for example, if the socket was destroyed due to an error. Ensure that the
831
  // `receiver` stream is closed after writing any remaining buffered data to
832
  // it. If the readable side of the socket is in flowing mode then there is no
833
  // buffered data as everything has been already written and `readable.read()`
834
  // will return `null`. If instead, the socket is paused, any possible buffered
835
  // data will be read as a single chunk and emitted synchronously in a single
836
  // `'data'` event.
837
  //
838
  websocket._socket.read();
839
  websocket._receiver.end();
840
841
  this.removeListener('data', socketOnData);
842
  this[kWebSocket] = undefined;
843
844
  clearTimeout(websocket._closeTimer);
845
846
  if (
847
    websocket._receiver._writableState.finished ||
848
    websocket._receiver._writableState.errorEmitted
849
  ) {
850
    websocket.emitClose();
851
  } else {
852
    websocket._receiver.on('error', receiverOnFinish);
853
    websocket._receiver.on('finish', receiverOnFinish);
854
  }
855
}
856
857
/**
858
 * The listener of the `net.Socket` `'data'` event.
859
 *
860
 * @param {Buffer} chunk A chunk of data
861
 * @private
862
 */
863
function socketOnData(chunk) {
864
  if (!this[kWebSocket]._receiver.write(chunk)) {
865
    this.pause();
866
  }
867
}
868
869
/**
870
 * The listener of the `net.Socket` `'end'` event.
871
 *
872
 * @private
873
 */
874
function socketOnEnd() {
875
  const websocket = this[kWebSocket];
876
877
  websocket.readyState = WebSocket.CLOSING;
878
  websocket._receiver.end();
879
  this.end();
880
}
881
882
/**
883
 * The listener of the `net.Socket` `'error'` event.
884
 *
885
 * @private
886
 */
887
function socketOnError() {
888
  const websocket = this[kWebSocket];
889
890
  this.removeListener('error', socketOnError);
891
  this.on('error', NOOP);
892
893
  websocket.readyState = WebSocket.CLOSING;
894
  this.destroy();
895
}