From e8e6df5159e415127d069e8954706f8e22aadb90 Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Tue, 1 Mar 2016 12:47:36 +0100
Subject: rpc: simplify inproc client

Fixes #2277
---
 rpc/inproc.go | 88 +++++++++++++----------------------------------------------
 1 file changed, 19 insertions(+), 69 deletions(-)

(limited to 'rpc/inproc.go')

diff --git a/rpc/inproc.go b/rpc/inproc.go
index e138ba2c3..3cfbea71c 100644
--- a/rpc/inproc.go
+++ b/rpc/inproc.go
@@ -16,96 +16,46 @@
 
 package rpc
 
-import "encoding/json"
-
-// NewInProcRPCClient creates an in-process buffer stream attachment to a given
-// RPC server.
-func NewInProcRPCClient(handler *Server) Client {
-	buffer := &inprocBuffer{
-		requests:  make(chan []byte, 16),
-		responses: make(chan []byte, 16),
-	}
-	client := &inProcClient{
-		server: handler,
-		buffer: buffer,
-	}
-	go handler.ServeCodec(NewJSONCodec(client.buffer))
-	return client
-}
+import (
+	"encoding/json"
+	"io"
+	"net"
+)
 
 // inProcClient is an in-process buffer stream attached to an RPC server.
 type inProcClient struct {
 	server *Server
-	buffer *inprocBuffer
+	cl     io.Closer
+	enc    *json.Encoder
+	dec    *json.Decoder
 }
 
 // Close tears down the request channel of the in-proc client.
 func (c *inProcClient) Close() {
-	c.buffer.Close()
+	c.cl.Close()
+}
+
+// NewInProcRPCClient creates an in-process buffer stream attachment to a given
+// RPC server.
+func NewInProcRPCClient(handler *Server) Client {
+	p1, p2 := net.Pipe()
+	go handler.ServeCodec(NewJSONCodec(p1))
+	return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
 }
 
 // Send marshals a message into a json format and injects in into the client
 // request channel.
 func (c *inProcClient) Send(msg interface{}) error {
-	d, err := json.Marshal(msg)
-	if err != nil {
-		return err
-	}
-	c.buffer.requests <- d
-	return nil
+	return c.enc.Encode(msg)
 }
 
 // Recv reads a message from the response channel and tries to parse it into the
 // given msg interface.
 func (c *inProcClient) Recv(msg interface{}) error {
-	data := <-c.buffer.responses
-	return json.Unmarshal(data, &msg)
+	return c.dec.Decode(msg)
 }
 
 // Returns the collection of modules the RPC server offers.
 func (c *inProcClient) SupportedModules() (map[string]string, error) {
 	return SupportedModules(c)
 }
-
-// inprocBuffer represents the connection between the RPC server and console
-type inprocBuffer struct {
-	readBuf   []byte      // store remaining request bytes after a partial read
-	requests  chan []byte // list with raw serialized requests
-	responses chan []byte // list with raw serialized responses
-}
-
-// Read will read the next request in json format.
-func (b *inprocBuffer) Read(p []byte) (int, error) {
-	// last read didn't read entire request, return remaining bytes
-	if len(b.readBuf) > 0 {
-		n := copy(p, b.readBuf)
-		if n < len(b.readBuf) {
-			b.readBuf = b.readBuf[:n]
-		} else {
-			b.readBuf = b.readBuf[:0]
-		}
-		return n, nil
-	}
-	// read next request
-	req := <-b.requests
-	n := copy(p, req)
-	if n < len(req) {
-		// inprocBuffer too small, store remaining chunk for next read
-		b.readBuf = req[n:]
-	}
-	return n, nil
-}
-
-// Write sends the given buffer to the backend.
-func (b *inprocBuffer) Write(p []byte) (n int, err error) {
-	b.responses <- p
-	return len(p), nil
-}
-
-// Close cleans up obtained resources.
-func (b *inprocBuffer) Close() error {
-	close(b.requests)
-	close(b.responses)
-
-	return nil
-}
-- 
cgit v1.2.3