diff options
Diffstat (limited to 'app')
-rw-r--r-- | app/scripts/lib/inpage-provider.js | 69 | ||||
-rw-r--r-- | app/scripts/lib/obj-multiplex.js | 48 | ||||
-rw-r--r-- | app/scripts/lib/port-stream.js | 16 | ||||
-rw-r--r-- | app/scripts/lib/stream-utils.js | 24 | ||||
-rw-r--r-- | app/scripts/metamask-controller.js | 80 |
5 files changed, 100 insertions, 137 deletions
diff --git a/app/scripts/lib/inpage-provider.js b/app/scripts/lib/inpage-provider.js index 13888dc67..da75c4be2 100644 --- a/app/scripts/lib/inpage-provider.js +++ b/app/scripts/lib/inpage-provider.js @@ -1,8 +1,9 @@ -const pipe = require('pump') -const StreamProvider = require('web3-stream-provider') +const pump = require('pump') +const RpcEngine = require('json-rpc-engine') +const createIdRemapMiddleware = require('json-rpc-engine/src/idRemapMiddleware') +const createStreamMiddleware = require('json-rpc-middleware-stream') const LocalStorageStore = require('obs-store') -const ObjectMultiplex = require('./obj-multiplex') -const createRandomId = require('./random-id') +const ObjectMultiplex = require('obj-multiplex') module.exports = MetamaskInpageProvider @@ -10,64 +11,46 @@ function MetamaskInpageProvider (connectionStream) { const self = this // setup connectionStream multiplexing - var multiStream = self.multiStream = ObjectMultiplex() - pipe( + const mux = self.mux = new ObjectMultiplex() + pump( connectionStream, - multiStream, + mux, connectionStream, (err) => logStreamDisconnectWarning('MetaMask', err) ) // subscribe to metamask public config (one-way) self.publicConfigStore = new LocalStorageStore({ storageKey: 'MetaMask-Config' }) - pipe( - multiStream.createStream('publicConfig'), + pump( + mux.createStream('publicConfig'), self.publicConfigStore, (err) => logStreamDisconnectWarning('MetaMask PublicConfigStore', err) ) // ignore phishing warning message (handled elsewhere) - multiStream.ignoreStream('phishing') + mux.ignoreStream('phishing') // connect to async provider - const asyncProvider = self.asyncProvider = new StreamProvider() - pipe( - asyncProvider, - multiStream.createStream('provider'), - asyncProvider, + const streamMiddleware = createStreamMiddleware() + pump( + streamMiddleware.stream, + mux.createStream('provider'), + streamMiddleware.stream, (err) => logStreamDisconnectWarning('MetaMask RpcProvider', err) ) - // start and stop polling to unblock first block lock - self.idMap = {} + // handle sendAsync requests via dapp-side rpc engine + const rpcEngine = new RpcEngine() + rpcEngine.push(createIdRemapMiddleware()) + rpcEngine.push(streamMiddleware) + self.rpcEngine = rpcEngine } // handle sendAsync requests via asyncProvider // also remap ids inbound and outbound MetamaskInpageProvider.prototype.sendAsync = function (payload, cb) { const self = this - - // rewrite request ids - const request = eachJsonMessage(payload, (_message) => { - const message = Object.assign({}, _message) - const newId = createRandomId() - self.idMap[newId] = message.id - message.id = newId - return message - }) - - // forward to asyncProvider - self.asyncProvider.sendAsync(request, (err, _res) => { - if (err) return cb(err) - // transform messages to original ids - const res = eachJsonMessage(_res, (message) => { - const oldId = self.idMap[message.id] - delete self.idMap[message.id] - message.id = oldId - return message - }) - cb(null, res) - }) + self.rpcEngine.handle(payload, cb) } @@ -124,14 +107,6 @@ MetamaskInpageProvider.prototype.isMetaMask = true // util -function eachJsonMessage (payload, transformFn) { - if (Array.isArray(payload)) { - return payload.map(transformFn) - } else { - return transformFn(payload) - } -} - function logStreamDisconnectWarning (remoteLabel, err) { let warningMsg = `MetamaskInpageProvider - lost connection to ${remoteLabel}` if (err) warningMsg += '\n' + err.stack diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js deleted file mode 100644 index 0034febe0..000000000 --- a/app/scripts/lib/obj-multiplex.js +++ /dev/null @@ -1,48 +0,0 @@ -const through = require('through2') - -module.exports = ObjectMultiplex - -function ObjectMultiplex (opts) { - opts = opts || {} - // create multiplexer - const mx = through.obj(function (chunk, enc, cb) { - const name = chunk.name - const data = chunk.data - if (!name) { - console.warn(`ObjectMultiplex - Malformed chunk without name "${chunk}"`) - return cb() - } - const substream = mx.streams[name] - if (!substream) { - console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`) - } else { - if (substream.push) substream.push(data) - } - return cb() - }) - mx.streams = {} - // create substreams - mx.createStream = function (name) { - const substream = mx.streams[name] = through.obj(function (chunk, enc, cb) { - mx.push({ - name: name, - data: chunk, - }) - return cb() - }) - mx.on('end', function () { - return substream.emit('end') - }) - if (opts.error) { - mx.on('error', function () { - return substream.emit('error') - }) - } - return substream - } - // ignore streams (dont display orphaned data warning) - mx.ignoreStream = function (name) { - mx.streams[name] = true - } - return mx -} diff --git a/app/scripts/lib/port-stream.js b/app/scripts/lib/port-stream.js index 607a9c9ed..648d88087 100644 --- a/app/scripts/lib/port-stream.js +++ b/app/scripts/lib/port-stream.js @@ -1,5 +1,6 @@ const Duplex = require('readable-stream').Duplex const inherits = require('util').inherits +const noop = function(){} module.exports = PortDuplexStream @@ -20,20 +21,14 @@ PortDuplexStream.prototype._onMessage = function (msg) { if (Buffer.isBuffer(msg)) { delete msg._isBuffer var data = new Buffer(msg) - // console.log('PortDuplexStream - saw message as buffer', data) this.push(data) } else { - // console.log('PortDuplexStream - saw message', msg) this.push(msg) } } PortDuplexStream.prototype._onDisconnect = function () { - try { - this.push(null) - } catch (err) { - this.emit('error', err) - } + this.destroy() } // stream plumbing @@ -45,19 +40,12 @@ PortDuplexStream.prototype._write = function (msg, encoding, cb) { if (Buffer.isBuffer(msg)) { var data = msg.toJSON() data._isBuffer = true - // console.log('PortDuplexStream - sent message as buffer', data) this._port.postMessage(data) } else { - // console.log('PortDuplexStream - sent message', msg) this._port.postMessage(msg) } } catch (err) { - // console.error(err) return cb(new Error('PortDuplexStream - disconnected')) } cb() } - -// util - -function noop () {} diff --git a/app/scripts/lib/stream-utils.js b/app/scripts/lib/stream-utils.js index ba79990cc..8bb0b4f3c 100644 --- a/app/scripts/lib/stream-utils.js +++ b/app/scripts/lib/stream-utils.js @@ -1,6 +1,6 @@ const Through = require('through2') -const endOfStream = require('end-of-stream') -const ObjectMultiplex = require('./obj-multiplex') +const ObjectMultiplex = require('obj-multiplex') +const pump = require('pump') module.exports = { jsonParseStream: jsonParseStream, @@ -23,14 +23,14 @@ function jsonStringifyStream () { } function setupMultiplex (connectionStream) { - var mx = ObjectMultiplex() - connectionStream.pipe(mx).pipe(connectionStream) - endOfStream(mx, function (err) { - if (err) console.error(err) - }) - endOfStream(connectionStream, function (err) { - if (err) console.error(err) - mx.destroy() - }) - return mx + const mux = new ObjectMultiplex() + pump( + connectionStream, + mux, + connectionStream, + (err) => { + if (err) console.error(err) + } + ) + return mux } diff --git a/app/scripts/metamask-controller.js b/app/scripts/metamask-controller.js index a007d6fc5..735fc4af0 100644 --- a/app/scripts/metamask-controller.js +++ b/app/scripts/metamask-controller.js @@ -1,12 +1,14 @@ const EventEmitter = require('events') const extend = require('xtend') const promiseToCallback = require('promise-to-callback') -const pipe = require('pump') +const pump = require('pump') const Dnode = require('dnode') const ObservableStore = require('obs-store') const EthStore = require('./lib/eth-store') const EthQuery = require('eth-query') -const streamIntoProvider = require('web3-stream-provider/handler') +const RpcEngine = require('json-rpc-engine') +const createEngineStream = require('json-rpc-middleware-stream/engineStream') +const createFilterMiddleware = require('eth-json-rpc-filters') const setupMultiplex = require('./lib/stream-utils.js').setupMultiplex const KeyringController = require('./keyring-controller') const NetworkController = require('./controllers/network') @@ -77,12 +79,13 @@ module.exports = class MetamaskController extends EventEmitter { // rpc provider this.provider = this.initializeProvider() + this.blockTracker = this.provider // eth data query tools this.ethQuery = new EthQuery(this.provider) this.ethStore = new EthStore({ provider: this.provider, - blockTracker: this.provider, + blockTracker: this.blockTracker, }) // key mgmt @@ -109,7 +112,7 @@ module.exports = class MetamaskController extends EventEmitter { getNetwork: this.networkController.getNetworkState.bind(this), signTransaction: this.keyringController.signTransaction.bind(this.keyringController), provider: this.provider, - blockTracker: this.provider, + blockTracker: this.blockTracker, ethQuery: this.ethQuery, ethStore: this.ethStore, }) @@ -366,7 +369,14 @@ module.exports = class MetamaskController extends EventEmitter { setupControllerConnection (outStream) { const api = this.getApi() const dnode = Dnode(api) - outStream.pipe(dnode).pipe(outStream) + pump( + outStream, + dnode, + outStream, + (err) => { + if (err) console.error(err) + } + ) dnode.on('remote', (remote) => { // push updates to popup const sendUpdate = remote.sendUpdate.bind(remote) @@ -375,26 +385,64 @@ module.exports = class MetamaskController extends EventEmitter { } setupProviderConnection (outStream, originDomain) { - streamIntoProvider(outStream, this.provider, onRequest, onResponse) + // setup json rpc engine stack + const engine = new RpcEngine() + engine.push(originMiddleware) + engine.push(loggerMiddleware) + engine.push(createFilterMiddleware({ + provider: this.provider, + blockTracker: this.blockTracker, + })) + engine.push(createProviderMiddleware({ provider: this.provider })) + + // setup connection + const providerStream = createEngineStream({ engine }) + pump( + outStream, + providerStream, + outStream, + (err) => { + if (err) console.error(err) + } + ) + // append dapp origin domain to request - function onRequest (request) { - request.origin = originDomain + function originMiddleware (req, res, next, end) { + req.origin = originDomain + next() } + // log rpc activity - function onResponse (err, request, response) { - if (err) return console.error(err) - if (response.error) { - console.error('Error in RPC response:\n', response) + function loggerMiddleware (req, res, next, end) { + next((cb) => { + if (res.error) { + console.error('Error in RPC response:\n', res) + } + if (req.isMetamaskInternal) return + log.info(`RPC (${originDomain}):`, req, '->', res) + cb() + }) + } + + // forward requests to provider + function createProviderMiddleware({ provider }) { + return (req, res, next, end) => { + provider.sendAsync(req, (err, _res) => { + if (err) return end(err) + res.result = _res.result + end() + }) } - if (request.isMetamaskInternal) return - log.info(`RPC (${originDomain}):`, request, '->', response) } } setupPublicConfig (outStream) { - pipe( + pump( this.publicConfigStore, - outStream + outStream, + (err) => { + if (err) console.error(err) + } ) } |