From e8e6df5159e415127d069e8954706f8e22aadb90 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 1 Mar 2016 12:47:36 +0100 Subject: rpc: simplify inproc client Fixes #2277 --- rpc/inproc.go | 88 +++++++++++++---------------------------------------------- 1 file changed, 19 insertions(+), 69 deletions(-) (limited to 'rpc/inproc.go') diff --git a/rpc/inproc.go b/rpc/inproc.go index e138ba2c3..3cfbea71c 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -16,96 +16,46 @@ package rpc -import "encoding/json" - -// NewInProcRPCClient creates an in-process buffer stream attachment to a given -// RPC server. -func NewInProcRPCClient(handler *Server) Client { - buffer := &inprocBuffer{ - requests: make(chan []byte, 16), - responses: make(chan []byte, 16), - } - client := &inProcClient{ - server: handler, - buffer: buffer, - } - go handler.ServeCodec(NewJSONCodec(client.buffer)) - return client -} +import ( + "encoding/json" + "io" + "net" +) // inProcClient is an in-process buffer stream attached to an RPC server. type inProcClient struct { server *Server - buffer *inprocBuffer + cl io.Closer + enc *json.Encoder + dec *json.Decoder } // Close tears down the request channel of the in-proc client. func (c *inProcClient) Close() { - c.buffer.Close() + c.cl.Close() +} + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + p1, p2 := net.Pipe() + go handler.ServeCodec(NewJSONCodec(p1)) + return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} } // Send marshals a message into a json format and injects in into the client // request channel. func (c *inProcClient) Send(msg interface{}) error { - d, err := json.Marshal(msg) - if err != nil { - return err - } - c.buffer.requests <- d - return nil + return c.enc.Encode(msg) } // Recv reads a message from the response channel and tries to parse it into the // given msg interface. func (c *inProcClient) Recv(msg interface{}) error { - data := <-c.buffer.responses - return json.Unmarshal(data, &msg) + return c.dec.Decode(msg) } // Returns the collection of modules the RPC server offers. func (c *inProcClient) SupportedModules() (map[string]string, error) { return SupportedModules(c) } - -// inprocBuffer represents the connection between the RPC server and console -type inprocBuffer struct { - readBuf []byte // store remaining request bytes after a partial read - requests chan []byte // list with raw serialized requests - responses chan []byte // list with raw serialized responses -} - -// Read will read the next request in json format. -func (b *inprocBuffer) Read(p []byte) (int, error) { - // last read didn't read entire request, return remaining bytes - if len(b.readBuf) > 0 { - n := copy(p, b.readBuf) - if n < len(b.readBuf) { - b.readBuf = b.readBuf[:n] - } else { - b.readBuf = b.readBuf[:0] - } - return n, nil - } - // read next request - req := <-b.requests - n := copy(p, req) - if n < len(req) { - // inprocBuffer too small, store remaining chunk for next read - b.readBuf = req[n:] - } - return n, nil -} - -// Write sends the given buffer to the backend. -func (b *inprocBuffer) Write(p []byte) (n int, err error) { - b.responses <- p - return len(p), nil -} - -// Close cleans up obtained resources. -func (b *inprocBuffer) Close() error { - close(b.requests) - close(b.responses) - - return nil -} -- cgit v1.2.3