aboutsummaryrefslogtreecommitdiffstats
path: root/app/scripts/lib
diff options
context:
space:
mode:
Diffstat (limited to 'app/scripts/lib')
-rw-r--r--app/scripts/lib/obj-multiplex.js2
-rw-r--r--app/scripts/lib/remote-store.js97
-rw-r--r--app/scripts/lib/stream-utils.js16
3 files changed, 114 insertions, 1 deletions
diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js
index 333b6061f..ad1d914f8 100644
--- a/app/scripts/lib/obj-multiplex.js
+++ b/app/scripts/lib/obj-multiplex.js
@@ -11,7 +11,7 @@ function ObjectMultiplex(opts){
var data = chunk.data
var substream = mx.streams[name]
if (!substream) {
- console.warn("orphaned data for stream " + name)
+ console.warn('orphaned data for stream ' + name)
} else {
substream.push(data)
}
diff --git a/app/scripts/lib/remote-store.js b/app/scripts/lib/remote-store.js
new file mode 100644
index 000000000..2dbdde811
--- /dev/null
+++ b/app/scripts/lib/remote-store.js
@@ -0,0 +1,97 @@
+const Dnode = require('dnode')
+const inherits = require('util').inherits
+
+module.exports = {
+ HostStore: HostStore,
+ RemoteStore: RemoteStore,
+}
+
+function BaseStore(initState){
+ this._state = initState || {}
+ this._subs = []
+}
+
+BaseStore.prototype.set = function(key, value){
+ throw Error('Not implemented.')
+}
+
+BaseStore.prototype.get = function(key){
+ return this._state[key]
+}
+
+BaseStore.prototype.subscribe = function(fn){
+ this._subs.push(fn)
+ var unsubscribe = this.unsubscribe.bind(this, fn)
+ return unsubscribe
+}
+
+BaseStore.prototype.unsubscribe = function(fn){
+ var index = this._subs.indexOf(fn)
+ if (index !== -1) this._subs.splice(index, 1)
+}
+
+BaseStore.prototype._emitUpdates = function(state){
+ this._subs.forEach(function(handler){
+ handler(state)
+ })
+}
+
+//
+// host
+//
+
+inherits(HostStore, BaseStore)
+function HostStore(initState, opts){
+ BaseStore.call(this, initState)
+}
+
+HostStore.prototype.set = function(key, value){
+ this._state[key] = value
+ process.nextTick(this._emitUpdates.bind(this, this._state))
+}
+
+HostStore.prototype.createStream = function(){
+ var dnode = Dnode({
+ // update: this._didUpdate.bind(this),
+ })
+ dnode.on('remote', this._didConnect.bind(this))
+ return dnode
+}
+
+HostStore.prototype._didConnect = function(remote){
+ this.subscribe(function(state){
+ remote.update(state)
+ })
+ remote.update(this._state)
+}
+
+//
+// remote
+//
+
+inherits(RemoteStore, BaseStore)
+function RemoteStore(initState, opts){
+ BaseStore.call(this, initState)
+ this._remote = null
+}
+
+RemoteStore.prototype.set = function(key, value){
+ this._remote.set(key, value)
+}
+
+RemoteStore.prototype.createStream = function(){
+ var dnode = Dnode({
+ update: this._didUpdate.bind(this),
+ })
+ dnode.once('remote', this._didConnect.bind(this))
+ return dnode
+}
+
+RemoteStore.prototype._didConnect = function(remote){
+ this._remote = remote
+}
+
+RemoteStore.prototype._didUpdate = function(state){
+ this._state = state
+ this._emitUpdates(state)
+}
diff --git a/app/scripts/lib/stream-utils.js b/app/scripts/lib/stream-utils.js
index 12560ffd8..fd4417d94 100644
--- a/app/scripts/lib/stream-utils.js
+++ b/app/scripts/lib/stream-utils.js
@@ -1,9 +1,11 @@
const Through = require('through2')
+const ObjectMultiplex = require('./obj-multiplex')
module.exports = {
jsonParseStream: jsonParseStream,
jsonStringifyStream: jsonStringifyStream,
+ setupMultiplex: setupMultiplex,
}
function jsonParseStream(){
@@ -19,3 +21,17 @@ function jsonStringifyStream(){
cb()
})
}
+
+function setupMultiplex(connectionStream){
+ var mx = ObjectMultiplex()
+ connectionStream.pipe(mx).pipe(connectionStream)
+ mx.on('error', function(err) {
+ console.error(err)
+ // connectionStream.destroy()
+ })
+ connectionStream.on('error', function(err) {
+ console.error(err)
+ mx.destroy()
+ })
+ return mx
+} \ No newline at end of file