From 6d92fdc0df12337c2ffb8e6d19c83653f1a00ff2 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 12:01:28 +0200 Subject: added support for batch requests --- rpc/codec/codec.go | 2 +- rpc/codec/json.go | 59 ++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 12 deletions(-) (limited to 'rpc/codec') diff --git a/rpc/codec/codec.go b/rpc/codec/codec.go index 5e8f38438..3177f77e4 100644 --- a/rpc/codec/codec.go +++ b/rpc/codec/codec.go @@ -12,7 +12,7 @@ type Codec int // (de)serialization support for rpc interface type ApiCoder interface { // Parse message to request from underlying stream - ReadRequest() (*shared.Request, error) + ReadRequest() ([]*shared.Request, bool, error) // Parse response message from underlying stream ReadResponse() (interface{}, error) // Encode response to encoded form in underlying stream diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 31024ee74..380b4cba7 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -8,33 +8,53 @@ import ( ) const ( - MAX_RESPONSE_SIZE = 64 * 1024 + MAX_REQUEST_SIZE = 1024 * 1024 + MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { c net.Conn - d *json.Decoder - e *json.Encoder + buffer []byte + bytesInBuffer int } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ c: conn, - d: json.NewDecoder(conn), - e: json.NewEncoder(conn), + buffer: make([]byte, MAX_REQUEST_SIZE), + bytesInBuffer: 0, } } // Serialize obj to JSON and write it to conn -func (self *JsonCodec) ReadRequest() (*shared.Request, error) { - req := shared.Request{} - err := self.d.Decode(&req) +func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { + n, err := self.c.Read(self.buffer[self.bytesInBuffer:]) + if err != nil { + self.bytesInBuffer = 0 + return nil, false, err + } + + self.bytesInBuffer += n + + singleRequest := shared.Request{} + err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest) if err == nil { - return &req, nil + self.bytesInBuffer = 0 + requests := make([]*shared.Request, 1) + requests[0] = &singleRequest + return requests, false, nil } - return nil, err + + requests = make([]*shared.Request, 0) + err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests) + if err == nil { + self.bytesInBuffer = 0 + return requests, true, nil + } + + return nil, false, err } func (self *JsonCodec) ReadResponse() (interface{}, error) { @@ -66,7 +86,24 @@ func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { // Parse JSON data from conn to obj func (self *JsonCodec) WriteResponse(res interface{}) error { - return self.e.Encode(&res) + data, err := json.Marshal(res) + if err != nil { + self.c.Close() + return err + } + + bytesWritten := 0 + + for bytesWritten < len(data) { + n, err := self.c.Write(data[bytesWritten:]) + if err != nil { + self.c.Close() + return err + } + bytesWritten += n + } + + return nil } // Close decoder and encoder -- cgit v1.2.3 From ffbe5656ff2cba43c813f46f743fde4d1ab2dd58 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 13:18:10 +0200 Subject: support for large requests/responses --- rpc/codec/json.go | 46 ++++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 16 deletions(-) (limited to 'rpc/codec') diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 380b4cba7..1de649c21 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -2,28 +2,30 @@ package codec import ( "encoding/json" + "fmt" "net" + "time" "github.com/ethereum/go-ethereum/rpc/shared" ) const ( - MAX_REQUEST_SIZE = 1024 * 1024 + MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { - c net.Conn - buffer []byte + c net.Conn + buffer []byte bytesInBuffer int } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ - c: conn, - buffer: make([]byte, MAX_REQUEST_SIZE), + c: conn, + buffer: make([]byte, MAX_REQUEST_SIZE), bytesInBuffer: 0, } } @@ -58,28 +60,40 @@ func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, } func (self *JsonCodec) ReadResponse() (interface{}, error) { - var err error + bytesInBuffer := 0 buf := make([]byte, MAX_RESPONSE_SIZE) - n, _ := self.c.Read(buf) - var failure shared.ErrorResponse - if err = json.Unmarshal(buf[:n], &failure); err == nil && failure.Error != nil { - return failure, nil - } + deadline := time.Now().Add(15 * time.Second) + self.c.SetDeadline(deadline) + + for { + n, err := self.c.Read(buf[bytesInBuffer:]) + if err != nil { + return nil, err + } + bytesInBuffer += n - var success shared.SuccessResponse - if err = json.Unmarshal(buf[:n], &success); err == nil { - return success, nil + var success shared.SuccessResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil { + return success, nil + } + + var failure shared.ErrorResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { + return failure, nil + } } - return nil, err + self.c.Close() + return nil, fmt.Errorf("Unable to read response") } -// Encode response to encoded form in underlying stream +// Decode data func (self *JsonCodec) Decode(data []byte, msg interface{}) error { return json.Unmarshal(data, msg) } +// Encode message func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { return json.Marshal(msg) } -- cgit v1.2.3 From 5757a0edb59f854433d11982b2ba4831cceb167e Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 14:32:22 +0200 Subject: added IPC timeout support --- rpc/codec/json.go | 60 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 26 deletions(-) (limited to 'rpc/codec') diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 1de649c21..0b1a90562 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -10,61 +10,69 @@ import ( ) const ( + READ_TIMEOUT = 15 // read timeout in seconds MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { - c net.Conn - buffer []byte - bytesInBuffer int + c net.Conn } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ - c: conn, - buffer: make([]byte, MAX_REQUEST_SIZE), - bytesInBuffer: 0, + c: conn, } } // Serialize obj to JSON and write it to conn func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { - n, err := self.c.Read(self.buffer[self.bytesInBuffer:]) - if err != nil { - self.bytesInBuffer = 0 + bytesInBuffer := 0 + buf := make([]byte, MAX_REQUEST_SIZE) + + deadline := time.Now().Add(READ_TIMEOUT * time.Second) + if err := self.c.SetDeadline(deadline); err != nil { return nil, false, err } - self.bytesInBuffer += n + for { + n, err := self.c.Read(buf[bytesInBuffer:]) + if err != nil { + self.c.Close() + return nil, false, err + } + + bytesInBuffer += n - singleRequest := shared.Request{} - err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest) - if err == nil { - self.bytesInBuffer = 0 - requests := make([]*shared.Request, 1) - requests[0] = &singleRequest - return requests, false, nil - } + singleRequest := shared.Request{} + err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest) + if err == nil { + requests := make([]*shared.Request, 1) + requests[0] = &singleRequest + return requests, false, nil + } - requests = make([]*shared.Request, 0) - err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests) - if err == nil { - self.bytesInBuffer = 0 - return requests, true, nil + requests = make([]*shared.Request, 0) + err = json.Unmarshal(buf[:bytesInBuffer], &requests) + if err == nil { + return requests, true, nil + } } - return nil, false, err + self.c.Close() // timeout + return nil, false, fmt.Errorf("Unable to read response") } func (self *JsonCodec) ReadResponse() (interface{}, error) { bytesInBuffer := 0 buf := make([]byte, MAX_RESPONSE_SIZE) - deadline := time.Now().Add(15 * time.Second) - self.c.SetDeadline(deadline) + deadline := time.Now().Add(READ_TIMEOUT * time.Second) + if err := self.c.SetDeadline(deadline); err != nil { + return nil, err + } for { n, err := self.c.Read(buf[bytesInBuffer:]) -- cgit v1.2.3