aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/json.go
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2016-04-05 15:43:32 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2016-04-05 15:43:32 +0800
commited92f116f72646e73613afc2f2e7b83472a61434 (patch)
tree6c6d3cf414e21ee37d93e30e782ec028ff0144f8 /rpc/json.go
parent6a185531d2cd2003bb4352c391f9dca023894d5a (diff)
parentf7328c5ecbd1076582a71ef7bf436485f3868b1f (diff)
downloaddexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.gz
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.bz2
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.lz
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.xz
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.zst
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.zip
Merge pull request #2407 from bas-vk/rpc-notifications
RPC pub sub
Diffstat (limited to 'rpc/json.go')
-rw-r--r--rpc/json.go24
1 files changed, 14 insertions, 10 deletions
diff --git a/rpc/json.go b/rpc/json.go
index 1ed943c00..a0bfcac04 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -22,7 +22,7 @@ import (
"io"
"reflect"
"strings"
- "sync/atomic"
+ "sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
@@ -81,19 +81,20 @@ type jsonNotification struct {
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments
// and serializing (result) objects.
type jsonCodec struct {
- closed chan interface{}
- isClosed int32
- d *json.Decoder
- e *json.Encoder
- req JSONRequest
- rw io.ReadWriteCloser
+ closed chan interface{}
+ closer sync.Once
+ d *json.Decoder
+ muEncoder sync.Mutex
+ e *json.Encoder
+ req JSONRequest
+ rw io.ReadWriteCloser
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc)
d.UseNumber()
- return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0}
+ return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc}
}
// isBatch returns true when the first non-whitespace characters is '['
@@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac
// Write message to client
func (c *jsonCodec) Write(res interface{}) error {
+ c.muEncoder.Lock()
+ defer c.muEncoder.Unlock()
+
return c.e.Encode(res)
}
// Close the underlying connection
func (c *jsonCodec) Close() {
- if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) {
+ c.closer.Do(func() {
close(c.closed)
c.rw.Close()
- }
+ })
}
// Closed returns a channel which will be closed when Close is called