1
|
var Readable = require('readable-stream').Readable
|
2
|
var inherits = require('inherits')
|
3
|
|
4
|
module.exports = from2
|
5
|
|
6
|
from2.ctor = ctor
|
7
|
from2.obj = obj
|
8
|
|
9
|
var Proto = ctor()
|
10
|
|
11
|
function toFunction(list) {
|
12
|
list = list.slice()
|
13
|
return function (_, cb) {
|
14
|
var err = null
|
15
|
var item = list.length ? list.shift() : null
|
16
|
if (item instanceof Error) {
|
17
|
err = item
|
18
|
item = null
|
19
|
}
|
20
|
|
21
|
cb(err, item)
|
22
|
}
|
23
|
}
|
24
|
|
25
|
function from2(opts, read) {
|
26
|
if (typeof opts !== 'object' || Array.isArray(opts)) {
|
27
|
read = opts
|
28
|
opts = {}
|
29
|
}
|
30
|
|
31
|
var rs = new Proto(opts)
|
32
|
rs._from = Array.isArray(read) ? toFunction(read) : (read || noop)
|
33
|
return rs
|
34
|
}
|
35
|
|
36
|
function ctor(opts, read) {
|
37
|
if (typeof opts === 'function') {
|
38
|
read = opts
|
39
|
opts = {}
|
40
|
}
|
41
|
|
42
|
opts = defaults(opts)
|
43
|
|
44
|
inherits(Class, Readable)
|
45
|
function Class(override) {
|
46
|
if (!(this instanceof Class)) return new Class(override)
|
47
|
this._reading = false
|
48
|
this._callback = check
|
49
|
this.destroyed = false
|
50
|
Readable.call(this, override || opts)
|
51
|
|
52
|
var self = this
|
53
|
var hwm = this._readableState.highWaterMark
|
54
|
|
55
|
function check(err, data) {
|
56
|
if (self.destroyed) return
|
57
|
if (err) return self.destroy(err)
|
58
|
if (data === null) return self.push(null)
|
59
|
self._reading = false
|
60
|
if (self.push(data)) self._read(hwm)
|
61
|
}
|
62
|
}
|
63
|
|
64
|
Class.prototype._from = read || noop
|
65
|
Class.prototype._read = function(size) {
|
66
|
if (this._reading || this.destroyed) return
|
67
|
this._reading = true
|
68
|
this._from(size, this._callback)
|
69
|
}
|
70
|
|
71
|
Class.prototype.destroy = function(err) {
|
72
|
if (this.destroyed) return
|
73
|
this.destroyed = true
|
74
|
|
75
|
var self = this
|
76
|
process.nextTick(function() {
|
77
|
if (err) self.emit('error', err)
|
78
|
self.emit('close')
|
79
|
})
|
80
|
}
|
81
|
|
82
|
return Class
|
83
|
}
|
84
|
|
85
|
function obj(opts, read) {
|
86
|
if (typeof opts === 'function' || Array.isArray(opts)) {
|
87
|
read = opts
|
88
|
opts = {}
|
89
|
}
|
90
|
|
91
|
opts = defaults(opts)
|
92
|
opts.objectMode = true
|
93
|
opts.highWaterMark = 16
|
94
|
|
95
|
return from2(opts, read)
|
96
|
}
|
97
|
|
98
|
function noop () {}
|
99
|
|
100
|
function defaults(opts) {
|
101
|
opts = opts || {}
|
102
|
return opts
|
103
|
}
|