Projekt

Obecné

Profil

Stáhnout (9.26 KB) Statistiky
| Větev: | Revize:
1
'use strict';
2

    
3
const { randomBytes } = require('crypto');
4

    
5
const PerMessageDeflate = require('./permessage-deflate');
6
const { EMPTY_BUFFER } = require('./constants');
7
const { isValidStatusCode } = require('./validation');
8
const { mask: applyMask, toBuffer } = require('./buffer-util');
9

    
10
/**
11
 * HyBi Sender implementation.
12
 */
13
class Sender {
14
  /**
15
   * Creates a Sender instance.
16
   *
17
   * @param {net.Socket} socket The connection socket
18
   * @param {Object} extensions An object containing the negotiated extensions
19
   */
20
  constructor(socket, extensions) {
21
    this._extensions = extensions || {};
22
    this._socket = socket;
23

    
24
    this._firstFragment = true;
25
    this._compress = false;
26

    
27
    this._bufferedBytes = 0;
28
    this._deflating = false;
29
    this._queue = [];
30
  }
31

    
32
  /**
33
   * Frames a piece of data according to the HyBi WebSocket protocol.
34
   *
35
   * @param {Buffer} data The data to frame
36
   * @param {Object} options Options object
37
   * @param {Number} options.opcode The opcode
38
   * @param {Boolean} options.readOnly Specifies whether `data` can be modified
39
   * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
40
   * @param {Boolean} options.mask Specifies whether or not to mask `data`
41
   * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
42
   * @return {Buffer[]} The framed data as a list of `Buffer` instances
43
   * @public
44
   */
45
  static frame(data, options) {
46
    const merge = options.mask && options.readOnly;
47
    var offset = options.mask ? 6 : 2;
48
    var payloadLength = data.length;
49

    
50
    if (data.length >= 65536) {
51
      offset += 8;
52
      payloadLength = 127;
53
    } else if (data.length > 125) {
54
      offset += 2;
55
      payloadLength = 126;
56
    }
57

    
58
    const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
59

    
60
    target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
61
    if (options.rsv1) target[0] |= 0x40;
62

    
63
    target[1] = payloadLength;
64

    
65
    if (payloadLength === 126) {
66
      target.writeUInt16BE(data.length, 2);
67
    } else if (payloadLength === 127) {
68
      target.writeUInt32BE(0, 2);
69
      target.writeUInt32BE(data.length, 6);
70
    }
71

    
72
    if (!options.mask) return [target, data];
73

    
74
    const mask = randomBytes(4);
75

    
76
    target[1] |= 0x80;
77
    target[offset - 4] = mask[0];
78
    target[offset - 3] = mask[1];
79
    target[offset - 2] = mask[2];
80
    target[offset - 1] = mask[3];
81

    
82
    if (merge) {
83
      applyMask(data, mask, target, offset, data.length);
84
      return [target];
85
    }
86

    
87
    applyMask(data, mask, data, 0, data.length);
88
    return [target, data];
89
  }
90

    
91
  /**
92
   * Sends a close message to the other peer.
93
   *
94
   * @param {(Number|undefined)} code The status code component of the body
95
   * @param {String} data The message component of the body
96
   * @param {Boolean} mask Specifies whether or not to mask the message
97
   * @param {Function} cb Callback
98
   * @public
99
   */
100
  close(code, data, mask, cb) {
101
    var buf;
102

    
103
    if (code === undefined) {
104
      buf = EMPTY_BUFFER;
105
    } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
106
      throw new TypeError('First argument must be a valid error code number');
107
    } else if (data === undefined || data === '') {
108
      buf = Buffer.allocUnsafe(2);
109
      buf.writeUInt16BE(code, 0);
110
    } else {
111
      buf = Buffer.allocUnsafe(2 + Buffer.byteLength(data));
112
      buf.writeUInt16BE(code, 0);
113
      buf.write(data, 2);
114
    }
115

    
116
    if (this._deflating) {
117
      this.enqueue([this.doClose, buf, mask, cb]);
118
    } else {
119
      this.doClose(buf, mask, cb);
120
    }
121
  }
122

    
123
  /**
124
   * Frames and sends a close message.
125
   *
126
   * @param {Buffer} data The message to send
127
   * @param {Boolean} mask Specifies whether or not to mask `data`
128
   * @param {Function} cb Callback
129
   * @private
130
   */
131
  doClose(data, mask, cb) {
132
    this.sendFrame(
133
      Sender.frame(data, {
134
        fin: true,
135
        rsv1: false,
136
        opcode: 0x08,
137
        mask,
138
        readOnly: false
139
      }),
140
      cb
141
    );
142
  }
143

    
144
  /**
145
   * Sends a ping message to the other peer.
146
   *
147
   * @param {*} data The message to send
148
   * @param {Boolean} mask Specifies whether or not to mask `data`
149
   * @param {Function} cb Callback
150
   * @public
151
   */
152
  ping(data, mask, cb) {
153
    const buf = toBuffer(data);
154

    
155
    if (this._deflating) {
156
      this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
157
    } else {
158
      this.doPing(buf, mask, toBuffer.readOnly, cb);
159
    }
160
  }
161

    
162
  /**
163
   * Frames and sends a ping message.
164
   *
165
   * @param {*} data The message to send
166
   * @param {Boolean} mask Specifies whether or not to mask `data`
167
   * @param {Boolean} readOnly Specifies whether `data` can be modified
168
   * @param {Function} cb Callback
169
   * @private
170
   */
171
  doPing(data, mask, readOnly, cb) {
172
    this.sendFrame(
173
      Sender.frame(data, {
174
        fin: true,
175
        rsv1: false,
176
        opcode: 0x09,
177
        mask,
178
        readOnly
179
      }),
180
      cb
181
    );
182
  }
183

    
184
  /**
185
   * Sends a pong message to the other peer.
186
   *
187
   * @param {*} data The message to send
188
   * @param {Boolean} mask Specifies whether or not to mask `data`
189
   * @param {Function} cb Callback
190
   * @public
191
   */
192
  pong(data, mask, cb) {
193
    const buf = toBuffer(data);
194

    
195
    if (this._deflating) {
196
      this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
197
    } else {
198
      this.doPong(buf, mask, toBuffer.readOnly, cb);
199
    }
200
  }
201

    
202
  /**
203
   * Frames and sends a pong message.
204
   *
205
   * @param {*} data The message to send
206
   * @param {Boolean} mask Specifies whether or not to mask `data`
207
   * @param {Boolean} readOnly Specifies whether `data` can be modified
208
   * @param {Function} cb Callback
209
   * @private
210
   */
211
  doPong(data, mask, readOnly, cb) {
212
    this.sendFrame(
213
      Sender.frame(data, {
214
        fin: true,
215
        rsv1: false,
216
        opcode: 0x0a,
217
        mask,
218
        readOnly
219
      }),
220
      cb
221
    );
222
  }
223

    
224
  /**
225
   * Sends a data message to the other peer.
226
   *
227
   * @param {*} data The message to send
228
   * @param {Object} options Options object
229
   * @param {Boolean} options.compress Specifies whether or not to compress `data`
230
   * @param {Boolean} options.binary Specifies whether `data` is binary or text
231
   * @param {Boolean} options.fin Specifies whether the fragment is the last one
232
   * @param {Boolean} options.mask Specifies whether or not to mask `data`
233
   * @param {Function} cb Callback
234
   * @public
235
   */
236
  send(data, options, cb) {
237
    const buf = toBuffer(data);
238
    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
239
    var opcode = options.binary ? 2 : 1;
240
    var rsv1 = options.compress;
241

    
242
    if (this._firstFragment) {
243
      this._firstFragment = false;
244
      if (rsv1 && perMessageDeflate) {
245
        rsv1 = buf.length >= perMessageDeflate._threshold;
246
      }
247
      this._compress = rsv1;
248
    } else {
249
      rsv1 = false;
250
      opcode = 0;
251
    }
252

    
253
    if (options.fin) this._firstFragment = true;
254

    
255
    if (perMessageDeflate) {
256
      const opts = {
257
        fin: options.fin,
258
        rsv1,
259
        opcode,
260
        mask: options.mask,
261
        readOnly: toBuffer.readOnly
262
      };
263

    
264
      if (this._deflating) {
265
        this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
266
      } else {
267
        this.dispatch(buf, this._compress, opts, cb);
268
      }
269
    } else {
270
      this.sendFrame(
271
        Sender.frame(buf, {
272
          fin: options.fin,
273
          rsv1: false,
274
          opcode,
275
          mask: options.mask,
276
          readOnly: toBuffer.readOnly
277
        }),
278
        cb
279
      );
280
    }
281
  }
282

    
283
  /**
284
   * Dispatches a data message.
285
   *
286
   * @param {Buffer} data The message to send
287
   * @param {Boolean} compress Specifies whether or not to compress `data`
288
   * @param {Object} options Options object
289
   * @param {Number} options.opcode The opcode
290
   * @param {Boolean} options.readOnly Specifies whether `data` can be modified
291
   * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
292
   * @param {Boolean} options.mask Specifies whether or not to mask `data`
293
   * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
294
   * @param {Function} cb Callback
295
   * @private
296
   */
297
  dispatch(data, compress, options, cb) {
298
    if (!compress) {
299
      this.sendFrame(Sender.frame(data, options), cb);
300
      return;
301
    }
302

    
303
    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
304

    
305
    this._deflating = true;
306
    perMessageDeflate.compress(data, options.fin, (_, buf) => {
307
      this._deflating = false;
308
      options.readOnly = false;
309
      this.sendFrame(Sender.frame(buf, options), cb);
310
      this.dequeue();
311
    });
312
  }
313

    
314
  /**
315
   * Executes queued send operations.
316
   *
317
   * @private
318
   */
319
  dequeue() {
320
    while (!this._deflating && this._queue.length) {
321
      const params = this._queue.shift();
322

    
323
      this._bufferedBytes -= params[1].length;
324
      params[0].apply(this, params.slice(1));
325
    }
326
  }
327

    
328
  /**
329
   * Enqueues a send operation.
330
   *
331
   * @param {Array} params Send operation parameters.
332
   * @private
333
   */
334
  enqueue(params) {
335
    this._bufferedBytes += params[1].length;
336
    this._queue.push(params);
337
  }
338

    
339
  /**
340
   * Sends a frame.
341
   *
342
   * @param {Buffer[]} list The frame to send
343
   * @param {Function} cb Callback
344
   * @private
345
   */
346
  sendFrame(list, cb) {
347
    if (list.length === 2) {
348
      this._socket.cork();
349
      this._socket.write(list[0]);
350
      this._socket.write(list[1], cb);
351
      this._socket.uncork();
352
    } else {
353
      this._socket.write(list[0], cb);
354
    }
355
  }
356
}
357

    
358
module.exports = Sender;
(7-7/10)