aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/inproc.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-03-01 19:47:36 +0800
committerFelix Lange <fjl@twurst.com>2016-03-01 19:47:36 +0800
commite8e6df5159e415127d069e8954706f8e22aadb90 (patch)
treed2738690d55bcc9f5c7e6ac6f887a05f8c1bfb5e /rpc/inproc.go
parent8255afbc756f09c69676d79de4fcefddaa482ee8 (diff)
downloadgo-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.tar
go-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.tar.gz
go-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.tar.bz2
go-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.tar.lz
go-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.tar.xz
go-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.tar.zst
go-tangerine-e8e6df5159e415127d069e8954706f8e22aadb90.zip
rpc: simplify inproc client
Fixes #2277
Diffstat (limited to 'rpc/inproc.go')
-rw-r--r--rpc/inproc.go88
1 files changed, 19 insertions, 69 deletions
diff --git a/rpc/inproc.go b/rpc/inproc.go
index e138ba2c3..3cfbea71c 100644
--- a/rpc/inproc.go
+++ b/rpc/inproc.go
@@ -16,96 +16,46 @@
package rpc
-import "encoding/json"
-
-// NewInProcRPCClient creates an in-process buffer stream attachment to a given
-// RPC server.
-func NewInProcRPCClient(handler *Server) Client {
- buffer := &inprocBuffer{
- requests: make(chan []byte, 16),
- responses: make(chan []byte, 16),
- }
- client := &inProcClient{
- server: handler,
- buffer: buffer,
- }
- go handler.ServeCodec(NewJSONCodec(client.buffer))
- return client
-}
+import (
+ "encoding/json"
+ "io"
+ "net"
+)
// inProcClient is an in-process buffer stream attached to an RPC server.
type inProcClient struct {
server *Server
- buffer *inprocBuffer
+ cl io.Closer
+ enc *json.Encoder
+ dec *json.Decoder
}
// Close tears down the request channel of the in-proc client.
func (c *inProcClient) Close() {
- c.buffer.Close()
+ c.cl.Close()
+}
+
+// NewInProcRPCClient creates an in-process buffer stream attachment to a given
+// RPC server.
+func NewInProcRPCClient(handler *Server) Client {
+ p1, p2 := net.Pipe()
+ go handler.ServeCodec(NewJSONCodec(p1))
+ return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
}
// Send marshals a message into a json format and injects in into the client
// request channel.
func (c *inProcClient) Send(msg interface{}) error {
- d, err := json.Marshal(msg)
- if err != nil {
- return err
- }
- c.buffer.requests <- d
- return nil
+ return c.enc.Encode(msg)
}
// Recv reads a message from the response channel and tries to parse it into the
// given msg interface.
func (c *inProcClient) Recv(msg interface{}) error {
- data := <-c.buffer.responses
- return json.Unmarshal(data, &msg)
+ return c.dec.Decode(msg)
}
// Returns the collection of modules the RPC server offers.
func (c *inProcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(c)
}
-
-// inprocBuffer represents the connection between the RPC server and console
-type inprocBuffer struct {
- readBuf []byte // store remaining request bytes after a partial read
- requests chan []byte // list with raw serialized requests
- responses chan []byte // list with raw serialized responses
-}
-
-// Read will read the next request in json format.
-func (b *inprocBuffer) Read(p []byte) (int, error) {
- // last read didn't read entire request, return remaining bytes
- if len(b.readBuf) > 0 {
- n := copy(p, b.readBuf)
- if n < len(b.readBuf) {
- b.readBuf = b.readBuf[:n]
- } else {
- b.readBuf = b.readBuf[:0]
- }
- return n, nil
- }
- // read next request
- req := <-b.requests
- n := copy(p, req)
- if n < len(req) {
- // inprocBuffer too small, store remaining chunk for next read
- b.readBuf = req[n:]
- }
- return n, nil
-}
-
-// Write sends the given buffer to the backend.
-func (b *inprocBuffer) Write(p []byte) (n int, err error) {
- b.responses <- p
- return len(p), nil
-}
-
-// Close cleans up obtained resources.
-func (b *inprocBuffer) Close() error {
- close(b.requests)
- close(b.responses)
-
- return nil
-}