diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-04-05 15:43:32 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-04-05 15:43:32 +0800 |
commit | ed92f116f72646e73613afc2f2e7b83472a61434 (patch) | |
tree | 6c6d3cf414e21ee37d93e30e782ec028ff0144f8 /rpc/json.go | |
parent | 6a185531d2cd2003bb4352c391f9dca023894d5a (diff) | |
parent | f7328c5ecbd1076582a71ef7bf436485f3868b1f (diff) | |
download | dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.gz dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.bz2 dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.lz dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.xz dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.zst dexon-ed92f116f72646e73613afc2f2e7b83472a61434.zip |
Merge pull request #2407 from bas-vk/rpc-notifications
RPC pub sub
Diffstat (limited to 'rpc/json.go')
-rw-r--r-- | rpc/json.go | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/rpc/json.go b/rpc/json.go index 1ed943c00..a0bfcac04 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -22,7 +22,7 @@ import ( "io" "reflect" "strings" - "sync/atomic" + "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -81,19 +81,20 @@ type jsonNotification struct { // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments // and serializing (result) objects. type jsonCodec struct { - closed chan interface{} - isClosed int32 - d *json.Decoder - e *json.Encoder - req JSONRequest - rw io.ReadWriteCloser + closed chan interface{} + closer sync.Once + d *json.Decoder + muEncoder sync.Mutex + e *json.Encoder + req JSONRequest + rw io.ReadWriteCloser } // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { d := json.NewDecoder(rwc) d.UseNumber() - return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0} + return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc} } // isBatch returns true when the first non-whitespace characters is '[' @@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac // Write message to client func (c *jsonCodec) Write(res interface{}) error { + c.muEncoder.Lock() + defer c.muEncoder.Unlock() + return c.e.Encode(res) } // Close the underlying connection func (c *jsonCodec) Close() { - if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) { + c.closer.Do(func() { close(c.closed) c.rw.Close() - } + }) } // Closed returns a channel which will be closed when Close is called |