From 5757a0edb59f854433d11982b2ba4831cceb167e Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 14:32:22 +0200 Subject: added IPC timeout support --- rpc/codec/json.go | 60 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 26 deletions(-) (limited to 'rpc/codec') diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 1de649c21..0b1a90562 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -10,61 +10,69 @@ import ( ) const ( + READ_TIMEOUT = 15 // read timeout in seconds MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { - c net.Conn - buffer []byte - bytesInBuffer int + c net.Conn } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ - c: conn, - buffer: make([]byte, MAX_REQUEST_SIZE), - bytesInBuffer: 0, + c: conn, } } // Serialize obj to JSON and write it to conn func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { - n, err := self.c.Read(self.buffer[self.bytesInBuffer:]) - if err != nil { - self.bytesInBuffer = 0 + bytesInBuffer := 0 + buf := make([]byte, MAX_REQUEST_SIZE) + + deadline := time.Now().Add(READ_TIMEOUT * time.Second) + if err := self.c.SetDeadline(deadline); err != nil { return nil, false, err } - self.bytesInBuffer += n + for { + n, err := self.c.Read(buf[bytesInBuffer:]) + if err != nil { + self.c.Close() + return nil, false, err + } + + bytesInBuffer += n - singleRequest := shared.Request{} - err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest) - if err == nil { - self.bytesInBuffer = 0 - requests := make([]*shared.Request, 1) - requests[0] = &singleRequest - return requests, false, nil - } + singleRequest := shared.Request{} + err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest) + if err == nil { + requests := make([]*shared.Request, 1) + requests[0] = &singleRequest + return requests, false, nil + } - requests = make([]*shared.Request, 0) - err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests) - if err == nil { - self.bytesInBuffer = 0 - return requests, true, nil + requests = make([]*shared.Request, 0) + err = json.Unmarshal(buf[:bytesInBuffer], &requests) + if err == nil { + return requests, true, nil + } } - return nil, false, err + self.c.Close() // timeout + return nil, false, fmt.Errorf("Unable to read response") } func (self *JsonCodec) ReadResponse() (interface{}, error) { bytesInBuffer := 0 buf := make([]byte, MAX_RESPONSE_SIZE) - deadline := time.Now().Add(15 * time.Second) - self.c.SetDeadline(deadline) + deadline := time.Now().Add(READ_TIMEOUT * time.Second) + if err := self.c.SetDeadline(deadline); err != nil { + return nil, err + } for { n, err := self.c.Read(buf[bytesInBuffer:]) -- cgit v1.2.3