diff options
Diffstat (limited to 'app/scripts/lib')
-rw-r--r-- | app/scripts/lib/obj-multiplex.js | 2 | ||||
-rw-r--r-- | app/scripts/lib/remote-store.js | 97 | ||||
-rw-r--r-- | app/scripts/lib/stream-utils.js | 16 |
3 files changed, 114 insertions, 1 deletions
diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js index 333b6061f..ad1d914f8 100644 --- a/app/scripts/lib/obj-multiplex.js +++ b/app/scripts/lib/obj-multiplex.js @@ -11,7 +11,7 @@ function ObjectMultiplex(opts){ var data = chunk.data var substream = mx.streams[name] if (!substream) { - console.warn("orphaned data for stream " + name) + console.warn('orphaned data for stream ' + name) } else { substream.push(data) } diff --git a/app/scripts/lib/remote-store.js b/app/scripts/lib/remote-store.js new file mode 100644 index 000000000..2dbdde811 --- /dev/null +++ b/app/scripts/lib/remote-store.js @@ -0,0 +1,97 @@ +const Dnode = require('dnode') +const inherits = require('util').inherits + +module.exports = { + HostStore: HostStore, + RemoteStore: RemoteStore, +} + +function BaseStore(initState){ + this._state = initState || {} + this._subs = [] +} + +BaseStore.prototype.set = function(key, value){ + throw Error('Not implemented.') +} + +BaseStore.prototype.get = function(key){ + return this._state[key] +} + +BaseStore.prototype.subscribe = function(fn){ + this._subs.push(fn) + var unsubscribe = this.unsubscribe.bind(this, fn) + return unsubscribe +} + +BaseStore.prototype.unsubscribe = function(fn){ + var index = this._subs.indexOf(fn) + if (index !== -1) this._subs.splice(index, 1) +} + +BaseStore.prototype._emitUpdates = function(state){ + this._subs.forEach(function(handler){ + handler(state) + }) +} + +// +// host +// + +inherits(HostStore, BaseStore) +function HostStore(initState, opts){ + BaseStore.call(this, initState) +} + +HostStore.prototype.set = function(key, value){ + this._state[key] = value + process.nextTick(this._emitUpdates.bind(this, this._state)) +} + +HostStore.prototype.createStream = function(){ + var dnode = Dnode({ + // update: this._didUpdate.bind(this), + }) + dnode.on('remote', this._didConnect.bind(this)) + return dnode +} + +HostStore.prototype._didConnect = function(remote){ + this.subscribe(function(state){ + remote.update(state) + }) + remote.update(this._state) +} + +// +// remote +// + +inherits(RemoteStore, BaseStore) +function RemoteStore(initState, opts){ + BaseStore.call(this, initState) + this._remote = null +} + +RemoteStore.prototype.set = function(key, value){ + this._remote.set(key, value) +} + +RemoteStore.prototype.createStream = function(){ + var dnode = Dnode({ + update: this._didUpdate.bind(this), + }) + dnode.once('remote', this._didConnect.bind(this)) + return dnode +} + +RemoteStore.prototype._didConnect = function(remote){ + this._remote = remote +} + +RemoteStore.prototype._didUpdate = function(state){ + this._state = state + this._emitUpdates(state) +} diff --git a/app/scripts/lib/stream-utils.js b/app/scripts/lib/stream-utils.js index 12560ffd8..fd4417d94 100644 --- a/app/scripts/lib/stream-utils.js +++ b/app/scripts/lib/stream-utils.js @@ -1,9 +1,11 @@ const Through = require('through2') +const ObjectMultiplex = require('./obj-multiplex') module.exports = { jsonParseStream: jsonParseStream, jsonStringifyStream: jsonStringifyStream, + setupMultiplex: setupMultiplex, } function jsonParseStream(){ @@ -19,3 +21,17 @@ function jsonStringifyStream(){ cb() }) } + +function setupMultiplex(connectionStream){ + var mx = ObjectMultiplex() + connectionStream.pipe(mx).pipe(connectionStream) + mx.on('error', function(err) { + console.error(err) + // connectionStream.destroy() + }) + connectionStream.on('error', function(err) { + console.error(err) + mx.destroy() + }) + return mx +}
\ No newline at end of file |