aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-03-24 19:07:12 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-03-24 19:07:12 +0800
commit1018bf6a00be475cb9af2812c1f61fa8530068a3 (patch)
tree2cffdec27ef688a8c5f4bd52c2a770daed702f74
parent37e252587a3429ab71cbb3ace7dca09733fa6c7c (diff)
downloadgo-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar
go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.gz
go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.bz2
go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.lz
go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.xz
go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.zst
go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.zip
rpc: honour pending requests before tearing conn down (#3814)
-rw-r--r--rpc/server.go42
1 files changed, 28 insertions, 14 deletions
diff --git a/rpc/server.go b/rpc/server.go
index ca7e3c01a..8627b5592 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"runtime"
+ "sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
@@ -143,6 +144,8 @@ func hasOption(option CodecOption, options []CodecOption) bool {
// requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
+ var pend sync.WaitGroup
+
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
@@ -150,7 +153,6 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
buf = buf[:runtime.Stack(buf, false)]
log.Error(fmt.Sprint(string(buf)))
}
-
s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
@@ -179,8 +181,13 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
- log.Debug(fmt.Sprintf("read error %v\n", err))
- codec.Write(codec.CreateErrorResponse(nil, err))
+ // If a parsing error occurred, send an error
+ if err.Error() != "EOF" {
+ log.Debug(fmt.Sprintf("read error %v\n", err))
+ codec.Write(codec.CreateErrorResponse(nil, err))
+ }
+ // Error or end of stream, wait for requests and tear down
+ pend.Wait()
return nil
}
@@ -199,20 +206,27 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
}
return nil
}
-
- if singleShot && batch {
- s.execBatch(ctx, codec, reqs)
- return nil
- } else if singleShot && !batch {
- s.exec(ctx, codec, reqs[0])
+ // If a single shot request is executing, run and return immediately
+ if singleShot {
+ if batch {
+ s.execBatch(ctx, codec, reqs)
+ } else {
+ s.exec(ctx, codec, reqs[0])
+ }
return nil
- } else if !singleShot && batch {
- go s.execBatch(ctx, codec, reqs)
- } else {
- go s.exec(ctx, codec, reqs[0])
}
- }
+ // For multi-shot connections, start a goroutine to serve and loop back
+ pend.Add(1)
+ go func(reqs []*serverRequest, batch bool) {
+ defer pend.Done()
+ if batch {
+ s.execBatch(ctx, codec, reqs)
+ } else {
+ s.exec(ctx, codec, reqs[0])
+ }
+ }(reqs, batch)
+ }
return nil
}