Projekt

Obecné

Profil

Stáhnout (2.2 KB) Statistiky
| Větev: | Revize:
1
'use strict';
2

    
3
var util = require('util');
4
var EventEmitter = require('events').EventEmitter;
5

    
6
function Hose(socket, options, filter) {
7
  EventEmitter.call(this);
8

    
9
  if (typeof options === 'function') {
10
    filter = options;
11
    options = {};
12
  }
13

    
14
  this.socket = socket;
15
  this.options = options;
16
  this.filter = filter;
17

    
18
  this.buffer = null;
19

    
20
  var self = this;
21
  this.listeners = {
22
    error: function(err) {
23
      return self.onError(err);
24
    },
25
    data: function(chunk) {
26
      return self.onData(chunk);
27
    },
28
    end: function() {
29
      return self.onEnd();
30
    }
31
  };
32

    
33
  this.socket.on('error', this.listeners.error);
34
  this.socket.on('data', this.listeners.data);
35
  this.socket.on('end', this.listeners.end);
36
}
37
util.inherits(Hose, EventEmitter);
38
module.exports = Hose;
39

    
40
Hose.create = function create(socket, options, filter) {
41
  return new Hose(socket, options, filter);
42
};
43

    
44
Hose.prototype.detach = function detach() {
45
  // Stop the flow
46
  this.socket.pause();
47

    
48
  this.socket.removeListener('error', this.listeners.error);
49
  this.socket.removeListener('data', this.listeners.data);
50
  this.socket.removeListener('end', this.listeners.end);
51
};
52

    
53
Hose.prototype.reemit = function reemit() {
54
  var buffer = this.buffer;
55
  this.buffer = null;
56

    
57
  // Modern age
58
  if (this.socket.unshift) {
59
    this.socket.unshift(buffer);
60
    if (this.socket.listeners('data').length > 0)
61
      this.socket.resume();
62
    return;
63
  }
64

    
65
  // Rusty node v0.8
66
  if (this.socket.ondata)
67
    this.socket.ondata(buffer, 0, buffer.length);
68
  this.socket.emit('data', buffer);
69
  this.socket.resume();
70
};
71

    
72
Hose.prototype.onError = function onError(err) {
73
  this.detach();
74
  this.emit('error', err);
75
};
76

    
77
Hose.prototype.onData = function onData(chunk) {
78
  if (this.buffer)
79
    this.buffer = Buffer.concat([ this.buffer, chunk ]);
80
  else
81
    this.buffer = chunk;
82

    
83
  var self = this;
84
  this.filter(this.buffer, function(err, protocol) {
85
    if (err)
86
      return self.onError(err);
87

    
88
    // No protocol selected yet
89
    if (!protocol)
90
      return;
91

    
92
    self.detach();
93
    self.emit('select', protocol, self.socket);
94
    self.reemit();
95
  });
96
};
97

    
98
Hose.prototype.onEnd = function onEnd() {
99
  this.detach();
100
  this.emit('error', new Error('Not enough data to recognize protocol'));
101
};
    (1-1/1)