aboutsummaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorkumavis <aaron@kumavis.me>2017-09-08 12:17:49 +0800
committerkumavis <aaron@kumavis.me>2017-09-08 12:17:49 +0800
commit57e4805c621155cd86169064f4aaba34b73644c6 (patch)
tree8743a9621d2f4596c42df1ede748fa1b69aa8691 /app
parent440a42bbc38ed53b64dc017fd56bd3281355df33 (diff)
downloadtangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.tar
tangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.tar.gz
tangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.tar.bz2
tangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.tar.lz
tangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.tar.xz
tangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.tar.zst
tangerine-wallet-browser-57e4805c621155cd86169064f4aaba34b73644c6.zip
streams - use pump and published obj-multiplex
Diffstat (limited to 'app')
-rw-r--r--app/scripts/lib/obj-multiplex.js48
-rw-r--r--app/scripts/lib/port-stream.js16
-rw-r--r--app/scripts/lib/stream-utils.js23
-rw-r--r--app/scripts/metamask-controller.js34
4 files changed, 41 insertions, 80 deletions
diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js
deleted file mode 100644
index 0034febe0..000000000
--- a/app/scripts/lib/obj-multiplex.js
+++ /dev/null
@@ -1,48 +0,0 @@
-const through = require('through2')
-
-module.exports = ObjectMultiplex
-
-function ObjectMultiplex (opts) {
- opts = opts || {}
- // create multiplexer
- const mx = through.obj(function (chunk, enc, cb) {
- const name = chunk.name
- const data = chunk.data
- if (!name) {
- console.warn(`ObjectMultiplex - Malformed chunk without name "${chunk}"`)
- return cb()
- }
- const substream = mx.streams[name]
- if (!substream) {
- console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
- } else {
- if (substream.push) substream.push(data)
- }
- return cb()
- })
- mx.streams = {}
- // create substreams
- mx.createStream = function (name) {
- const substream = mx.streams[name] = through.obj(function (chunk, enc, cb) {
- mx.push({
- name: name,
- data: chunk,
- })
- return cb()
- })
- mx.on('end', function () {
- return substream.emit('end')
- })
- if (opts.error) {
- mx.on('error', function () {
- return substream.emit('error')
- })
- }
- return substream
- }
- // ignore streams (dont display orphaned data warning)
- mx.ignoreStream = function (name) {
- mx.streams[name] = true
- }
- return mx
-}
diff --git a/app/scripts/lib/port-stream.js b/app/scripts/lib/port-stream.js
index 607a9c9ed..648d88087 100644
--- a/app/scripts/lib/port-stream.js
+++ b/app/scripts/lib/port-stream.js
@@ -1,5 +1,6 @@
const Duplex = require('readable-stream').Duplex
const inherits = require('util').inherits
+const noop = function(){}
module.exports = PortDuplexStream
@@ -20,20 +21,14 @@ PortDuplexStream.prototype._onMessage = function (msg) {
if (Buffer.isBuffer(msg)) {
delete msg._isBuffer
var data = new Buffer(msg)
- // console.log('PortDuplexStream - saw message as buffer', data)
this.push(data)
} else {
- // console.log('PortDuplexStream - saw message', msg)
this.push(msg)
}
}
PortDuplexStream.prototype._onDisconnect = function () {
- try {
- this.push(null)
- } catch (err) {
- this.emit('error', err)
- }
+ this.destroy()
}
// stream plumbing
@@ -45,19 +40,12 @@ PortDuplexStream.prototype._write = function (msg, encoding, cb) {
if (Buffer.isBuffer(msg)) {
var data = msg.toJSON()
data._isBuffer = true
- // console.log('PortDuplexStream - sent message as buffer', data)
this._port.postMessage(data)
} else {
- // console.log('PortDuplexStream - sent message', msg)
this._port.postMessage(msg)
}
} catch (err) {
- // console.error(err)
return cb(new Error('PortDuplexStream - disconnected'))
}
cb()
}
-
-// util
-
-function noop () {}
diff --git a/app/scripts/lib/stream-utils.js b/app/scripts/lib/stream-utils.js
index ba79990cc..89e2a359e 100644
--- a/app/scripts/lib/stream-utils.js
+++ b/app/scripts/lib/stream-utils.js
@@ -1,6 +1,7 @@
const Through = require('through2')
const endOfStream = require('end-of-stream')
-const ObjectMultiplex = require('./obj-multiplex')
+const ObjectMultiplex = require('obj-multiplex')
+const pump = require('pump')
module.exports = {
jsonParseStream: jsonParseStream,
@@ -23,14 +24,14 @@ function jsonStringifyStream () {
}
function setupMultiplex (connectionStream) {
- var mx = ObjectMultiplex()
- connectionStream.pipe(mx).pipe(connectionStream)
- endOfStream(mx, function (err) {
- if (err) console.error(err)
- })
- endOfStream(connectionStream, function (err) {
- if (err) console.error(err)
- mx.destroy()
- })
- return mx
+ const mux = new ObjectMultiplex()
+ pump(
+ connectionStream,
+ mux,
+ connectionStream,
+ (err) => {
+ if (err) console.error(err)
+ }
+ )
+ return mux
}
diff --git a/app/scripts/metamask-controller.js b/app/scripts/metamask-controller.js
index e4b1b5975..1a6732338 100644
--- a/app/scripts/metamask-controller.js
+++ b/app/scripts/metamask-controller.js
@@ -1,7 +1,7 @@
const EventEmitter = require('events')
const extend = require('xtend')
const promiseToCallback = require('promise-to-callback')
-const pipe = require('pump')
+const pump = require('pump')
const Dnode = require('dnode')
const ObservableStore = require('obs-store')
const EthStore = require('./lib/eth-store')
@@ -367,7 +367,14 @@ module.exports = class MetamaskController extends EventEmitter {
setupControllerConnection (outStream) {
const api = this.getApi()
const dnode = Dnode(api)
- outStream.pipe(dnode).pipe(outStream)
+ pump(
+ outStream,
+ dnode,
+ outStream,
+ (err) => {
+ if (err) console.error(err)
+ }
+ )
dnode.on('remote', (remote) => {
// push updates to popup
const sendUpdate = remote.sendUpdate.bind(remote)
@@ -376,20 +383,29 @@ module.exports = class MetamaskController extends EventEmitter {
}
setupProviderConnection (outStream, originDomain) {
+ // setup json rpc engine stack
const engine = new RpcEngine()
engine.push(originMiddleware)
engine.push(loggerMiddleware)
engine.push(createProviderMiddleware({ provider: this.provider }))
-
+
// setup connection
const providerStream = createEngineStream({ engine })
- outStream.pipe(providerStream).pipe(outStream)
-
+ pump(
+ outStream,
+ providerStream,
+ outStream,
+ (err) => {
+ if (err) console.error(err)
+ }
+ )
+
// append dapp origin domain to request
function originMiddleware (req, res, next, end) {
req.origin = originDomain
next()
}
+
// log rpc activity
function loggerMiddleware (req, res, next, end) {
next((cb) => {
@@ -401,6 +417,7 @@ module.exports = class MetamaskController extends EventEmitter {
cb()
})
}
+
// forward requests to provider
function createProviderMiddleware({ provider }) {
return (req, res, next, end) => {
@@ -414,9 +431,12 @@ module.exports = class MetamaskController extends EventEmitter {
}
setupPublicConfig (outStream) {
- pipe(
+ pump(
this.publicConfigStore,
- outStream
+ outStream,
+ (err) => {
+ if (err) console.error(err)
+ }
)
}