Projekt

Obecné

Profil

Stáhnout (2.37 KB) Statistiky
| Větev: | Revize:
1
'use strict';
2

    
3
var stream = require('readable-stream');
4
var util = require('util');
5

    
6
var Readable = stream.Readable;
7

    
8
module.exports = ReaddirpReadable;
9

    
10
util.inherits(ReaddirpReadable, Readable);
11

    
12
function ReaddirpReadable (opts) {
13
  if (!(this instanceof ReaddirpReadable)) return new ReaddirpReadable(opts);
14

    
15
  opts = opts || {};
16

    
17
  opts.objectMode = true;
18
  Readable.call(this, opts);
19

    
20
  // backpressure not implemented at this point
21
  this.highWaterMark = Infinity;
22

    
23
  this._destroyed = false;
24
  this._paused = false;
25
  this._warnings = [];
26
  this._errors = [];
27

    
28
  this._pauseResumeErrors();
29
}
30

    
31
var proto = ReaddirpReadable.prototype;
32

    
33
proto._pauseResumeErrors = function () {
34
  var self = this;
35
  self.on('pause', function () { self._paused = true });
36
  self.on('resume', function () {
37
    if (self._destroyed) return;
38
    self._paused = false;
39

    
40
    self._warnings.forEach(function (err) { self.emit('warn', err) });
41
    self._warnings.length = 0;
42

    
43
    self._errors.forEach(function (err) { self.emit('error', err) });
44
    self._errors.length = 0;
45
  })
46
}
47

    
48
// called for each entry
49
proto._processEntry = function (entry) {
50
  if (this._destroyed) return;
51
  this.push(entry);
52
}
53

    
54
proto._read = function () { }
55

    
56
proto.destroy = function () {
57
  // when stream is destroyed it will emit nothing further, not even errors or warnings
58
  this.push(null);
59
  this.readable = false;
60
  this._destroyed = true;
61
  this.emit('close');
62
}
63

    
64
proto._done = function () {
65
  this.push(null);
66
}
67

    
68
// we emit errors and warnings async since we may handle errors like invalid args
69
// within the initial event loop before any event listeners subscribed
70
proto._handleError = function (err) {
71
  var self = this;
72
  setImmediate(function () {
73
    if (self._paused) return self._warnings.push(err);
74
    if (!self._destroyed) self.emit('warn', err);
75
  });
76
}
77

    
78
proto._handleFatalError = function (err) {
79
  var self = this;
80
  setImmediate(function () {
81
    if (self._paused) return self._errors.push(err);
82
    if (!self._destroyed) self.emit('error', err);
83
  });
84
}
85

    
86
function createStreamAPI () {
87
  var stream = new ReaddirpReadable();
88

    
89
  return {
90
      stream           :  stream
91
    , processEntry     :  stream._processEntry.bind(stream)
92
    , done             :  stream._done.bind(stream)
93
    , handleError      :  stream._handleError.bind(stream)
94
    , handleFatalError :  stream._handleFatalError.bind(stream)
95
  };
96
}
97

    
98
module.exports = createStreamAPI;
(5-5/5)