Projekt

Obecné

Profil

Stáhnout (12.9 KB) Statistiky
| Větev: | Revize:
1
# mississippi
2

    
3
a collection of useful stream utility modules. learn how the modules work using this and then pick the ones you want and use them individually
4

    
5
the goal of the modules included in mississippi is to make working with streams easy without sacrificing speed, error handling or composability.
6

    
7
## usage
8

    
9
```js
10
var miss = require('mississippi')
11
```
12

    
13
## methods
14

    
15
- [pipe](#pipe)
16
- [each](#each)
17
- [pipeline](#pipeline)
18
- [duplex](#duplex)
19
- [through](#through)
20
- [from](#from)
21
- [to](#to)
22
- [concat](#concat)
23
- [finished](#finished)
24
- [parallel](#parallel)
25

    
26
### pipe
27

    
28
##### `miss.pipe(stream1, stream2, stream3, ..., cb)`
29

    
30
Pipes streams together and destroys all of them if one of them closes. Calls `cb` with `(error)` if there was an error in any of the streams.
31

    
32
When using standard `source.pipe(destination)` the source will _not_ be destroyed if the destination emits close or error. You are also not able to provide a callback to tell when the pipe has finished.
33

    
34
`miss.pipe` does these two things for you, ensuring you handle stream errors 100% of the time (unhandled errors are probably the most common bug in most node streams code)
35

    
36
#### original module
37

    
38
`miss.pipe` is provided by [`require('pump')`](https://www.npmjs.com/package/pump)
39

    
40
#### example
41

    
42
```js
43
// lets do a simple file copy
44
var fs = require('fs')
45

    
46
var read = fs.createReadStream('./original.zip')
47
var write = fs.createWriteStream('./copy.zip')
48

    
49
// use miss.pipe instead of read.pipe(write)
50
miss.pipe(read, write, function (err) {
51
  if (err) return console.error('Copy error!', err)
52
  console.log('Copied successfully')
53
})
54
```
55

    
56
### each
57

    
58
##### `miss.each(stream, each, [done])`
59

    
60
Iterate the data in `stream` one chunk at a time. Your `each` function will be called with `(data, next)` where data is a data chunk and next is a callback. Call `next` when you are ready to consume the next chunk.
61

    
62
Optionally you can call `next` with an error to destroy the stream. You can also pass the optional third argument, `done`, which is a function that will be called with `(err)` when the stream ends. The `err` argument will be populated with an error if the stream emitted an error.
63

    
64
#### original module
65

    
66
`miss.each` is provided by [`require('stream-each')`](https://www.npmjs.com/package/stream-each)
67

    
68
#### example
69

    
70
```js
71
var fs = require('fs')
72
var split = require('split2')
73

    
74
var newLineSeparatedNumbers = fs.createReadStream('numbers.txt')
75

    
76
var pipeline = miss.pipeline(newLineSeparatedNumbers, split())
77
miss.each(pipeline, eachLine, done)
78
var sum = 0
79

    
80
function eachLine (line, next) {
81
  sum += parseInt(line.toString())
82
  next()
83
}
84

    
85
function done (err) {
86
  if (err) throw err
87
  console.log('sum is', sum)
88
}
89
```
90

    
91
### pipeline
92

    
93
##### `var pipeline = miss.pipeline(stream1, stream2, stream3, ...)`
94

    
95
Builds a pipeline from all the transform streams passed in as arguments by piping them together and returning a single stream object that lets you write to the first stream and read from the last stream.
96

    
97
If you are pumping object streams together use `pipeline = miss.pipeline.obj(s1, s2, ...)`.
98

    
99
If any of the streams in the pipeline emits an error or gets destroyed, or you destroy the stream it returns, all of the streams will be destroyed and cleaned up for you.
100

    
101
#### original module
102

    
103
`miss.pipeline` is provided by [`require('pumpify')`](https://www.npmjs.com/package/pumpify)
104

    
105
#### example
106

    
107
```js
108
// first create some transform streams (note: these two modules are fictional)
109
var imageResize = require('image-resizer-stream')({width: 400})
110
var pngOptimizer = require('png-optimizer-stream')({quality: 60})
111

    
112
// instead of doing a.pipe(b), use pipelin
113
var resizeAndOptimize = miss.pipeline(imageResize, pngOptimizer)
114
// `resizeAndOptimize` is a transform stream. when you write to it, it writes
115
// to `imageResize`. when you read from it, it reads from `pngOptimizer`.
116
// it handles piping all the streams together for you
117

    
118
// use it like any other transform stream
119
var fs = require('fs')
120

    
121
var read = fs.createReadStream('./image.png')
122
var write = fs.createWriteStream('./resized-and-optimized.png')
123

    
124
miss.pipe(read, resizeAndOptimize, write, function (err) {
125
  if (err) return console.error('Image processing error!', err)
126
  console.log('Image processed successfully')
127
})
128
```
129

    
130
### duplex
131

    
132
##### `var duplex = miss.duplex([writable, readable, opts])`
133

    
134
Take two separate streams, a writable and a readable, and turn them into a single [duplex (readable and writable) stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex).
135

    
136
The returned stream will emit data from the readable. When you write to it it writes to the writable.
137

    
138
You can either choose to supply the writable and the readable at the time you create the stream, or you can do it later using the `.setWritable` and `.setReadable` methods and data written to the stream in the meantime will be buffered for you.
139

    
140
#### original module
141

    
142
`miss.duplex` is provided by [`require('duplexify')`](https://www.npmjs.com/package/duplexify)
143

    
144
#### example
145

    
146
```js
147
// lets spawn a process and take its stdout and stdin and combine them into 1 stream
148
var child = require('child_process')
149

    
150
// @- tells it to read from stdin, --data-binary sets 'raw' binary mode
151
var curl = child.spawn('curl -X POST --data-binary @- http://foo.com')
152

    
153
// duplexCurl will write to stdin and read from stdout
154
var duplexCurl = miss.duplex(curl.stdin, curl.stdout)
155
```
156

    
157
### through
158

    
159
##### `var transformer = miss.through([options, transformFunction, flushFunction])`
160

    
161
Make a custom [transform stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_transform).
162

    
163
The `options` object is passed to the internal transform stream and can be used to create an `objectMode` stream (or use the shortcut `miss.through.obj([...])`)
164

    
165
The `transformFunction` is called when data is available for the writable side and has the signature `(chunk, encoding, cb)`. Within the function, add data to the readable side any number of times with `this.push(data)`. Call `cb()` to indicate processing of the `chunk` is complete. Or to easily emit a single error or chunk, call `cb(err, chunk)`
166

    
167
The `flushFunction`, with signature `(cb)`, is called just before the stream is complete and should be used to wrap up stream processing.
168

    
169
#### original module
170

    
171
`miss.through` is provided by [`require('through2')`](https://www.npmjs.com/package/through2)
172

    
173
#### example
174

    
175
```js
176
var fs = require('fs')
177

    
178
var read = fs.createReadStream('./boring_lowercase.txt')
179
var write = fs.createWriteStream('./AWESOMECASE.TXT')
180

    
181
// Leaving out the options object
182
var uppercaser = miss.through(
183
  function (chunk, enc, cb) {
184
    cb(null, chunk.toString().toUpperCase())
185
  },
186
  function (cb) {
187
    cb(null, 'ONE LAST BIT OF UPPERCASE')
188
  }
189
)
190

    
191
miss.pipe(read, uppercaser, write, function (err) {
192
  if (err) return console.error('Trouble uppercasing!')
193
  console.log('Splendid uppercasing!')
194
})
195
```
196

    
197
### from
198

    
199
##### `miss.from([opts], read)`
200

    
201
Make a custom [readable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_readable).
202

    
203
`opts` contains the options to pass on to the ReadableStream constructor e.g. for creating a readable object stream (or use the shortcut `miss.from.obj([...])`).
204

    
205
Returns a readable stream that calls `read(size, next)` when data is requested from the stream.
206

    
207
- `size` is the recommended amount of data (in bytes) to retrieve.
208
- `next(err, chunk)` should be called when you're ready to emit more data.
209

    
210
#### original module
211

    
212
`miss.from` is provided by [`require('from2')`](https://www.npmjs.com/package/from2)
213

    
214
#### example
215

    
216
```js
217

    
218

    
219
function fromString(string) {
220
  return miss.from(function(size, next) {
221
    // if there's no more content
222
    // left in the string, close the stream.
223
    if (string.length <= 0) return next(null, null)
224

    
225
    // Pull in a new chunk of text,
226
    // removing it from the string.
227
    var chunk = string.slice(0, size)
228
    string = string.slice(size)
229

    
230
    // Emit "chunk" from the stream.
231
    next(null, chunk)
232
  })
233
}
234

    
235
// pipe "hello world" out
236
// to stdout.
237
fromString('hello world').pipe(process.stdout)
238
```
239

    
240
### to
241

    
242
##### `miss.to([options], write, [flush])`
243

    
244
Make a custom [writable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_writable).
245

    
246
`opts` contains the options to pass on to the WritableStream constructor e.g. for creating a writable object stream (or use the shortcut `miss.to.obj([...])`).
247

    
248
Returns a writable stream that calls `write(data, enc, cb)` when data is written to the stream.
249

    
250
- `data` is the received data to write the destination.
251
- `enc` encoding of the piece of data received.
252
- `cb(err, data)` should be called when you're ready to write more data, or encountered an error.
253

    
254
`flush(cb)` is called before `finish` is emitted and allows for cleanup steps to occur.
255

    
256
#### original module
257

    
258
`miss.to` is provided by [`require('flush-write-stream')`](https://www.npmjs.com/package/flush-write-stream)
259

    
260
#### example
261

    
262
```js
263
var ws = miss.to(write, flush)
264

    
265
ws.on('finish', function () {
266
  console.log('finished')
267
})
268

    
269
ws.write('hello')
270
ws.write('world')
271
ws.end()
272

    
273
function write (data, enc, cb) {
274
  // i am your normal ._write method
275
  console.log('writing', data.toString())
276
  cb()
277
}
278

    
279
function flush (cb) {
280
  // i am called before finish is emitted
281
  setTimeout(cb, 1000) // wait 1 sec
282
}
283
```
284

    
285
If you run the above it will produce the following output
286

    
287
```
288
writing hello
289
writing world
290
(nothing happens for 1 sec)
291
finished
292
```
293

    
294
### concat
295

    
296
##### `var concat = miss.concat(cb)`
297

    
298
Returns a writable stream that concatenates all data written to the stream and calls a callback with the single result.
299

    
300
Calling `miss.concat(cb)` returns a writable stream. `cb` is called when the writable stream is finished, e.g. when all data is done being written to it. `cb` is called with a single argument, `(data)`, which will contain the result of concatenating all the data written to the stream.
301

    
302
Note that `miss.concat` will not handle stream errors for you. To handle errors, use `miss.pipe` or handle the `error` event manually.
303

    
304
#### original module
305

    
306
`miss.concat` is provided by [`require('concat-stream')`](https://www.npmjs.com/package/concat-stream)
307

    
308
#### example
309

    
310
```js
311
var fs = require('fs')
312

    
313
var readStream = fs.createReadStream('cat.png')
314
var concatStream = miss.concat(gotPicture)
315

    
316
function callback (err) {
317
  if (err) {
318
    console.error(err)
319
    process.exit(1)
320
  }
321
}
322

    
323
miss.pipe(readStream, concatStream, callback)
324

    
325
function gotPicture(imageBuffer) {
326
  // imageBuffer is all of `cat.png` as a node.js Buffer
327
}
328

    
329
function handleError(err) {
330
  // handle your error appropriately here, e.g.:
331
  console.error(err) // print the error to STDERR
332
  process.exit(1) // exit program with non-zero exit code
333
}
334
```
335

    
336
### finished
337

    
338
##### `miss.finished(stream, cb)`
339

    
340
Waits for `stream` to finish or error and then calls `cb` with `(err)`. `cb` will only be called once. `err` will be null if the stream finished without error, or else it will be populated with the error from the streams `error` event.
341

    
342
This function is useful for simplifying stream handling code as it lets you handle success or error conditions in a single code path. It's used internally `miss.pipe`.
343

    
344
#### original module
345

    
346
`miss.finished` is provided by [`require('end-of-stream')`](https://www.npmjs.com/package/end-of-stream)
347

    
348
#### example
349

    
350
```js
351
var copySource = fs.createReadStream('./movie.mp4')
352
var copyDest = fs.createWriteStream('./movie-copy.mp4')
353

    
354
copySource.pipe(copyDest)
355

    
356
miss.finished(copyDest, function(err) {
357
  if (err) return console.log('write failed', err)
358
  console.log('write success')
359
})
360
```
361

    
362
### parallel
363

    
364
##### `miss.parallel(concurrency, each)`
365

    
366
This works like `through` except you can process items in parallel, while still preserving the original input order.
367

    
368
This is handy if you wanna take advantage of node's async I/O and process streams of items in batches. With this module you can build your very own streaming parallel job queue.
369

    
370
Note that `miss.parallel` preserves input ordering, if you don't need that then you can use [through2-concurrent](https://github.com/almost/through2-concurrent) instead, which is very similar to this otherwise.
371

    
372
#### original module
373

    
374
`miss.parallel` is provided by [`require('parallel-transform')`](https://npmjs.org/parallel-transform)
375

    
376
#### example
377

    
378
This example fetches the GET HTTP headers for a stream of input URLs 5 at a time in parallel.
379

    
380
```js
381
function getResponse (item, cb) {
382
  var r = request(item.url)
383
  r.on('error', function (err) {
384
    cb(err)
385
  })
386
  r.on('response', function (re) {
387
    cb(null, {url: item.url, date: new Date(), status: re.statusCode, headers: re.headers})
388
    r.abort()
389
  })
390
}
391

    
392
miss.pipe(
393
  fs.createReadStream('./urls.txt'), // one url per line
394
  split(),
395
  miss.parallel(5, getResponse),
396
  miss.through(function (row, enc, next) {
397
    console.log(JSON.stringify(row))
398
    next()
399
  })
400
)
401
```
402

    
403
## see also
404

    
405
- [substack/stream-handbook](https://github.com/substack/stream-handbook)
406
- [nodejs.org/api/stream.html](https://nodejs.org/api/stream.html)
407
- [awesome-nodejs-streams](https://github.com/thejmazz/awesome-nodejs-streams)
408

    
409
## license
410

    
411
Licensed under the BSD 2-clause license.
(5-5/5)