diff options
streams - use pump and published obj-multiplex
Diffstat (limited to 'app/scripts')
-rw-r--r-- | app/scripts/lib/obj-multiplex.js | 48 | ||||
-rw-r--r-- | app/scripts/lib/port-stream.js | 16 | ||||
-rw-r--r-- | app/scripts/lib/stream-utils.js | 23 | ||||
-rw-r--r-- | app/scripts/metamask-controller.js | 34 |
4 files changed, 41 insertions, 80 deletions
diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js deleted file mode 100644 index 0034febe0..000000000 --- a/app/scripts/lib/obj-multiplex.js +++ /dev/null @@ -1,48 +0,0 @@ -const through = require('through2') - -module.exports = ObjectMultiplex - -function ObjectMultiplex (opts) { - opts = opts || {} - // create multiplexer - const mx = through.obj(function (chunk, enc, cb) { - const name = chunk.name - const data = chunk.data - if (!name) { - console.warn(`ObjectMultiplex - Malformed chunk without name "${chunk}"`) - return cb() - } - const substream = mx.streams[name] - if (!substream) { - console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`) - } else { - if (substream.push) substream.push(data) - } - return cb() - }) - mx.streams = {} - // create substreams - mx.createStream = function (name) { - const substream = mx.streams[name] = through.obj(function (chunk, enc, cb) { - mx.push({ - name: name, - data: chunk, - }) - return cb() - }) - mx.on('end', function () { - return substream.emit('end') - }) - if (opts.error) { - mx.on('error', function () { - return substream.emit('error') - }) - } - return substream - } - // ignore streams (dont display orphaned data warning) - mx.ignoreStream = function (name) { - mx.streams[name] = true - } - return mx -} diff --git a/app/scripts/lib/port-stream.js b/app/scripts/lib/port-stream.js index 607a9c9ed..648d88087 100644 --- a/app/scripts/lib/port-stream.js +++ b/app/scripts/lib/port-stream.js @@ -1,5 +1,6 @@ const Duplex = require('readable-stream').Duplex const inherits = require('util').inherits +const noop = function(){} module.exports = PortDuplexStream @@ -20,20 +21,14 @@ PortDuplexStream.prototype._onMessage = function (msg) { if (Buffer.isBuffer(msg)) { delete msg._isBuffer var data = new Buffer(msg) - // console.log('PortDuplexStream - saw message as buffer', data) this.push(data) } else { - // console.log('PortDuplexStream - saw message', msg) this.push(msg) } } PortDuplexStream.prototype._onDisconnect = function () { - try { - this.push(null) - } catch (err) { - this.emit('error', err) - } + this.destroy() } // stream plumbing @@ -45,19 +40,12 @@ PortDuplexStream.prototype._write = function (msg, encoding, cb) { if (Buffer.isBuffer(msg)) { var data = msg.toJSON() data._isBuffer = true - // console.log('PortDuplexStream - sent message as buffer', data) this._port.postMessage(data) } else { - // console.log('PortDuplexStream - sent message', msg) this._port.postMessage(msg) } } catch (err) { - // console.error(err) return cb(new Error('PortDuplexStream - disconnected')) } cb() } - -// util - -function noop () {} diff --git a/app/scripts/lib/stream-utils.js b/app/scripts/lib/stream-utils.js index ba79990cc..89e2a359e 100644 --- a/app/scripts/lib/stream-utils.js +++ b/app/scripts/lib/stream-utils.js @@ -1,6 +1,7 @@ const Through = require('through2') const endOfStream = require('end-of-stream') -const ObjectMultiplex = require('./obj-multiplex') +const ObjectMultiplex = require('obj-multiplex') +const pump = require('pump') module.exports = { jsonParseStream: jsonParseStream, @@ -23,14 +24,14 @@ function jsonStringifyStream () { } function setupMultiplex (connectionStream) { - var mx = ObjectMultiplex() - connectionStream.pipe(mx).pipe(connectionStream) - endOfStream(mx, function (err) { - if (err) console.error(err) - }) - endOfStream(connectionStream, function (err) { - if (err) console.error(err) - mx.destroy() - }) - return mx + const mux = new ObjectMultiplex() + pump( + connectionStream, + mux, + connectionStream, + (err) => { + if (err) console.error(err) + } + ) + return mux } diff --git a/app/scripts/metamask-controller.js b/app/scripts/metamask-controller.js index e4b1b5975..1a6732338 100644 --- a/app/scripts/metamask-controller.js +++ b/app/scripts/metamask-controller.js @@ -1,7 +1,7 @@ const EventEmitter = require('events') const extend = require('xtend') const promiseToCallback = require('promise-to-callback') -const pipe = require('pump') +const pump = require('pump') const Dnode = require('dnode') const ObservableStore = require('obs-store') const EthStore = require('./lib/eth-store') @@ -367,7 +367,14 @@ module.exports = class MetamaskController extends EventEmitter { setupControllerConnection (outStream) { const api = this.getApi() const dnode = Dnode(api) - outStream.pipe(dnode).pipe(outStream) + pump( + outStream, + dnode, + outStream, + (err) => { + if (err) console.error(err) + } + ) dnode.on('remote', (remote) => { // push updates to popup const sendUpdate = remote.sendUpdate.bind(remote) @@ -376,20 +383,29 @@ module.exports = class MetamaskController extends EventEmitter { } setupProviderConnection (outStream, originDomain) { + // setup json rpc engine stack const engine = new RpcEngine() engine.push(originMiddleware) engine.push(loggerMiddleware) engine.push(createProviderMiddleware({ provider: this.provider })) - + // setup connection const providerStream = createEngineStream({ engine }) - outStream.pipe(providerStream).pipe(outStream) - + pump( + outStream, + providerStream, + outStream, + (err) => { + if (err) console.error(err) + } + ) + // append dapp origin domain to request function originMiddleware (req, res, next, end) { req.origin = originDomain next() } + // log rpc activity function loggerMiddleware (req, res, next, end) { next((cb) => { @@ -401,6 +417,7 @@ module.exports = class MetamaskController extends EventEmitter { cb() }) } + // forward requests to provider function createProviderMiddleware({ provider }) { return (req, res, next, end) => { @@ -414,9 +431,12 @@ module.exports = class MetamaskController extends EventEmitter { } setupPublicConfig (outStream) { - pipe( + pump( this.publicConfigStore, - outStream + outStream, + (err) => { + if (err) console.error(err) + } ) } |