aboutsummaryrefslogtreecommitdiffstats
path: root/app/scripts/lib/createStreamSink.js
blob: b93dbc0893c3e1d2d4bc1ec141a6d76c79a3b4e4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const WritableStream = require('readable-stream').Writable
const promiseToCallback = require('promise-to-callback')

module.exports = createStreamSink


function createStreamSink (asyncWriteFn, _opts) {
  return new AsyncWritableStream(asyncWriteFn, _opts)
}

class AsyncWritableStream extends WritableStream {

  constructor (asyncWriteFn, _opts) {
    const opts = Object.assign({ objectMode: true }, _opts)
    super(opts)
    this._asyncWriteFn = asyncWriteFn
  }

  // write from incomming stream to state
  _write (chunk, encoding, callback) {
    promiseToCallback(this._asyncWriteFn(chunk, encoding))(callback)
  }

}