Projekt

Obecné

Profil

Stáhnout (12.9 KB) Statistiky
| Větev: | Revize:
1 3a515b92 cagy
# mississippi
2
3
a collection of useful stream utility modules. learn how the modules work using this and then pick the ones you want and use them individually
4
5
the goal of the modules included in mississippi is to make working with streams easy without sacrificing speed, error handling or composability.
6
7
## usage
8
9
```js
10
var miss = require('mississippi')
11
```
12
13
## methods
14
15
- [pipe](#pipe)
16
- [each](#each)
17
- [pipeline](#pipeline)
18
- [duplex](#duplex)
19
- [through](#through)
20
- [from](#from)
21
- [to](#to)
22
- [concat](#concat)
23
- [finished](#finished)
24
- [parallel](#parallel)
25
26
### pipe
27
28
##### `miss.pipe(stream1, stream2, stream3, ..., cb)`
29
30
Pipes streams together and destroys all of them if one of them closes. Calls `cb` with `(error)` if there was an error in any of the streams.
31
32
When using standard `source.pipe(destination)` the source will _not_ be destroyed if the destination emits close or error. You are also not able to provide a callback to tell when the pipe has finished.
33
34
`miss.pipe` does these two things for you, ensuring you handle stream errors 100% of the time (unhandled errors are probably the most common bug in most node streams code)
35
36
#### original module
37
38
`miss.pipe` is provided by [`require('pump')`](https://www.npmjs.com/package/pump)
39
40
#### example
41
42
```js
43
// lets do a simple file copy
44
var fs = require('fs')
45
46
var read = fs.createReadStream('./original.zip')
47
var write = fs.createWriteStream('./copy.zip')
48
49
// use miss.pipe instead of read.pipe(write)
50
miss.pipe(read, write, function (err) {
51
  if (err) return console.error('Copy error!', err)
52
  console.log('Copied successfully')
53
})
54
```
55
56
### each
57
58
##### `miss.each(stream, each, [done])`
59
60
Iterate the data in `stream` one chunk at a time. Your `each` function will be called with `(data, next)` where data is a data chunk and next is a callback. Call `next` when you are ready to consume the next chunk.
61
62
Optionally you can call `next` with an error to destroy the stream. You can also pass the optional third argument, `done`, which is a function that will be called with `(err)` when the stream ends. The `err` argument will be populated with an error if the stream emitted an error.
63
64
#### original module
65
66
`miss.each` is provided by [`require('stream-each')`](https://www.npmjs.com/package/stream-each)
67
68
#### example
69
70
```js
71
var fs = require('fs')
72
var split = require('split2')
73
74
var newLineSeparatedNumbers = fs.createReadStream('numbers.txt')
75
76
var pipeline = miss.pipeline(newLineSeparatedNumbers, split())
77
miss.each(pipeline, eachLine, done)
78
var sum = 0
79
80
function eachLine (line, next) {
81
  sum += parseInt(line.toString())
82
  next()
83
}
84
85
function done (err) {
86
  if (err) throw err
87
  console.log('sum is', sum)
88
}
89
```
90
91
### pipeline
92
93
##### `var pipeline = miss.pipeline(stream1, stream2, stream3, ...)`
94
95
Builds a pipeline from all the transform streams passed in as arguments by piping them together and returning a single stream object that lets you write to the first stream and read from the last stream.
96
97
If you are pumping object streams together use `pipeline = miss.pipeline.obj(s1, s2, ...)`.
98
99
If any of the streams in the pipeline emits an error or gets destroyed, or you destroy the stream it returns, all of the streams will be destroyed and cleaned up for you.
100
101
#### original module
102
103
`miss.pipeline` is provided by [`require('pumpify')`](https://www.npmjs.com/package/pumpify)
104
105
#### example
106
107
```js
108
// first create some transform streams (note: these two modules are fictional)
109
var imageResize = require('image-resizer-stream')({width: 400})
110
var pngOptimizer = require('png-optimizer-stream')({quality: 60})
111
112
// instead of doing a.pipe(b), use pipelin
113
var resizeAndOptimize = miss.pipeline(imageResize, pngOptimizer)
114
// `resizeAndOptimize` is a transform stream. when you write to it, it writes
115
// to `imageResize`. when you read from it, it reads from `pngOptimizer`.
116
// it handles piping all the streams together for you
117
118
// use it like any other transform stream
119
var fs = require('fs')
120
121
var read = fs.createReadStream('./image.png')
122
var write = fs.createWriteStream('./resized-and-optimized.png')
123
124
miss.pipe(read, resizeAndOptimize, write, function (err) {
125
  if (err) return console.error('Image processing error!', err)
126
  console.log('Image processed successfully')
127
})
128
```
129
130
### duplex
131
132
##### `var duplex = miss.duplex([writable, readable, opts])`
133
134
Take two separate streams, a writable and a readable, and turn them into a single [duplex (readable and writable) stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex).
135
136
The returned stream will emit data from the readable. When you write to it it writes to the writable.
137
138
You can either choose to supply the writable and the readable at the time you create the stream, or you can do it later using the `.setWritable` and `.setReadable` methods and data written to the stream in the meantime will be buffered for you.
139
140
#### original module
141
142
`miss.duplex` is provided by [`require('duplexify')`](https://www.npmjs.com/package/duplexify)
143
144
#### example
145
146
```js
147
// lets spawn a process and take its stdout and stdin and combine them into 1 stream
148
var child = require('child_process')
149
150
// @- tells it to read from stdin, --data-binary sets 'raw' binary mode
151
var curl = child.spawn('curl -X POST --data-binary @- http://foo.com')
152
153
// duplexCurl will write to stdin and read from stdout
154
var duplexCurl = miss.duplex(curl.stdin, curl.stdout)
155
```
156
157
### through
158
159
##### `var transformer = miss.through([options, transformFunction, flushFunction])`
160
161
Make a custom [transform stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_transform).
162
163
The `options` object is passed to the internal transform stream and can be used to create an `objectMode` stream (or use the shortcut `miss.through.obj([...])`)
164
165
The `transformFunction` is called when data is available for the writable side and has the signature `(chunk, encoding, cb)`. Within the function, add data to the readable side any number of times with `this.push(data)`. Call `cb()` to indicate processing of the `chunk` is complete. Or to easily emit a single error or chunk, call `cb(err, chunk)`
166
167
The `flushFunction`, with signature `(cb)`, is called just before the stream is complete and should be used to wrap up stream processing.
168
169
#### original module
170
171
`miss.through` is provided by [`require('through2')`](https://www.npmjs.com/package/through2)
172
173
#### example
174
175
```js
176
var fs = require('fs')
177
178
var read = fs.createReadStream('./boring_lowercase.txt')
179
var write = fs.createWriteStream('./AWESOMECASE.TXT')
180
181
// Leaving out the options object
182
var uppercaser = miss.through(
183
  function (chunk, enc, cb) {
184
    cb(null, chunk.toString().toUpperCase())
185
  },
186
  function (cb) {
187
    cb(null, 'ONE LAST BIT OF UPPERCASE')
188
  }
189
)
190
191
miss.pipe(read, uppercaser, write, function (err) {
192
  if (err) return console.error('Trouble uppercasing!')
193
  console.log('Splendid uppercasing!')
194
})
195
```
196
197
### from
198
199
##### `miss.from([opts], read)`
200
201
Make a custom [readable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_readable).
202
203
`opts` contains the options to pass on to the ReadableStream constructor e.g. for creating a readable object stream (or use the shortcut `miss.from.obj([...])`).
204
205
Returns a readable stream that calls `read(size, next)` when data is requested from the stream.
206
207
- `size` is the recommended amount of data (in bytes) to retrieve.
208
- `next(err, chunk)` should be called when you're ready to emit more data.
209
210
#### original module
211
212
`miss.from` is provided by [`require('from2')`](https://www.npmjs.com/package/from2)
213
214
#### example
215
216
```js
217
218
219
function fromString(string) {
220
  return miss.from(function(size, next) {
221
    // if there's no more content
222
    // left in the string, close the stream.
223
    if (string.length <= 0) return next(null, null)
224
225
    // Pull in a new chunk of text,
226
    // removing it from the string.
227
    var chunk = string.slice(0, size)
228
    string = string.slice(size)
229
230
    // Emit "chunk" from the stream.
231
    next(null, chunk)
232
  })
233
}
234
235
// pipe "hello world" out
236
// to stdout.
237
fromString('hello world').pipe(process.stdout)
238
```
239
240
### to
241
242
##### `miss.to([options], write, [flush])`
243
244
Make a custom [writable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_writable).
245
246
`opts` contains the options to pass on to the WritableStream constructor e.g. for creating a writable object stream (or use the shortcut `miss.to.obj([...])`).
247
248
Returns a writable stream that calls `write(data, enc, cb)` when data is written to the stream.
249
250
- `data` is the received data to write the destination.
251
- `enc` encoding of the piece of data received.
252
- `cb(err, data)` should be called when you're ready to write more data, or encountered an error.
253
254
`flush(cb)` is called before `finish` is emitted and allows for cleanup steps to occur.
255
256
#### original module
257
258
`miss.to` is provided by [`require('flush-write-stream')`](https://www.npmjs.com/package/flush-write-stream)
259
260
#### example
261
262
```js
263
var ws = miss.to(write, flush)
264
265
ws.on('finish', function () {
266
  console.log('finished')
267
})
268
269
ws.write('hello')
270
ws.write('world')
271
ws.end()
272
273
function write (data, enc, cb) {
274
  // i am your normal ._write method
275
  console.log('writing', data.toString())
276
  cb()
277
}
278
279
function flush (cb) {
280
  // i am called before finish is emitted
281
  setTimeout(cb, 1000) // wait 1 sec
282
}
283
```
284
285
If you run the above it will produce the following output
286
287
```
288
writing hello
289
writing world
290
(nothing happens for 1 sec)
291
finished
292
```
293
294
### concat
295
296
##### `var concat = miss.concat(cb)`
297
298
Returns a writable stream that concatenates all data written to the stream and calls a callback with the single result.
299
300
Calling `miss.concat(cb)` returns a writable stream. `cb` is called when the writable stream is finished, e.g. when all data is done being written to it. `cb` is called with a single argument, `(data)`, which will contain the result of concatenating all the data written to the stream.
301
302
Note that `miss.concat` will not handle stream errors for you. To handle errors, use `miss.pipe` or handle the `error` event manually.
303
304
#### original module
305
306
`miss.concat` is provided by [`require('concat-stream')`](https://www.npmjs.com/package/concat-stream)
307
308
#### example
309
310
```js
311
var fs = require('fs')
312
313
var readStream = fs.createReadStream('cat.png')
314
var concatStream = miss.concat(gotPicture)
315
316
function callback (err) {
317
  if (err) {
318
    console.error(err)
319
    process.exit(1)
320
  }
321
}
322
323
miss.pipe(readStream, concatStream, callback)
324
325
function gotPicture(imageBuffer) {
326
  // imageBuffer is all of `cat.png` as a node.js Buffer
327
}
328
329
function handleError(err) {
330
  // handle your error appropriately here, e.g.:
331
  console.error(err) // print the error to STDERR
332
  process.exit(1) // exit program with non-zero exit code
333
}
334
```
335
336
### finished
337
338
##### `miss.finished(stream, cb)`
339
340
Waits for `stream` to finish or error and then calls `cb` with `(err)`. `cb` will only be called once. `err` will be null if the stream finished without error, or else it will be populated with the error from the streams `error` event.
341
342
This function is useful for simplifying stream handling code as it lets you handle success or error conditions in a single code path. It's used internally `miss.pipe`.
343
344
#### original module
345
346
`miss.finished` is provided by [`require('end-of-stream')`](https://www.npmjs.com/package/end-of-stream)
347
348
#### example
349
350
```js
351
var copySource = fs.createReadStream('./movie.mp4')
352
var copyDest = fs.createWriteStream('./movie-copy.mp4')
353
354
copySource.pipe(copyDest)
355
356
miss.finished(copyDest, function(err) {
357
  if (err) return console.log('write failed', err)
358
  console.log('write success')
359
})
360
```
361
362
### parallel
363
364
##### `miss.parallel(concurrency, each)`
365
366
This works like `through` except you can process items in parallel, while still preserving the original input order.
367
368
This is handy if you wanna take advantage of node's async I/O and process streams of items in batches. With this module you can build your very own streaming parallel job queue.
369
370
Note that `miss.parallel` preserves input ordering, if you don't need that then you can use [through2-concurrent](https://github.com/almost/through2-concurrent) instead, which is very similar to this otherwise.
371
372
#### original module
373
374
`miss.parallel` is provided by [`require('parallel-transform')`](https://npmjs.org/parallel-transform)
375
376
#### example
377
378
This example fetches the GET HTTP headers for a stream of input URLs 5 at a time in parallel.
379
380
```js
381
function getResponse (item, cb) {
382
  var r = request(item.url)
383
  r.on('error', function (err) {
384
    cb(err)
385
  })
386
  r.on('response', function (re) {
387
    cb(null, {url: item.url, date: new Date(), status: re.statusCode, headers: re.headers})
388
    r.abort()
389
  })
390
}
391
392
miss.pipe(
393
  fs.createReadStream('./urls.txt'), // one url per line
394
  split(),
395
  miss.parallel(5, getResponse),
396
  miss.through(function (row, enc, next) {
397
    console.log(JSON.stringify(row))
398
    next()
399
  })
400
)
401
```
402
403
## see also
404
405
- [substack/stream-handbook](https://github.com/substack/stream-handbook)
406
- [nodejs.org/api/stream.html](https://nodejs.org/api/stream.html)
407
- [awesome-nodejs-streams](https://github.com/thejmazz/awesome-nodejs-streams)
408
409
## license
410
411
Licensed under the BSD 2-clause license.