diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-03-24 19:07:12 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-24 19:07:12 +0800 |
commit | 1018bf6a00be475cb9af2812c1f61fa8530068a3 (patch) | |
tree | 2cffdec27ef688a8c5f4bd52c2a770daed702f74 | |
parent | 37e252587a3429ab71cbb3ace7dca09733fa6c7c (diff) | |
download | go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.gz go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.bz2 go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.lz go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.xz go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.tar.zst go-tangerine-1018bf6a00be475cb9af2812c1f61fa8530068a3.zip |
rpc: honour pending requests before tearing conn down (#3814)
-rw-r--r-- | rpc/server.go | 42 |
1 files changed, 28 insertions, 14 deletions
diff --git a/rpc/server.go b/rpc/server.go index ca7e3c01a..8627b5592 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "runtime" + "sync" "sync/atomic" "github.com/ethereum/go-ethereum/log" @@ -143,6 +144,8 @@ func hasOption(option CodecOption, options []CodecOption) bool { // requests until the codec returns an error when reading a request (in most cases // an EOF). It executes requests in parallel when singleShot is false. func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error { + var pend sync.WaitGroup + defer func() { if err := recover(); err != nil { const size = 64 << 10 @@ -150,7 +153,6 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO buf = buf[:runtime.Stack(buf, false)] log.Error(fmt.Sprint(string(buf))) } - s.codecsMu.Lock() s.codecs.Remove(codec) s.codecsMu.Unlock() @@ -179,8 +181,13 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) if err != nil { - log.Debug(fmt.Sprintf("read error %v\n", err)) - codec.Write(codec.CreateErrorResponse(nil, err)) + // If a parsing error occurred, send an error + if err.Error() != "EOF" { + log.Debug(fmt.Sprintf("read error %v\n", err)) + codec.Write(codec.CreateErrorResponse(nil, err)) + } + // Error or end of stream, wait for requests and tear down + pend.Wait() return nil } @@ -199,20 +206,27 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO } return nil } - - if singleShot && batch { - s.execBatch(ctx, codec, reqs) - return nil - } else if singleShot && !batch { - s.exec(ctx, codec, reqs[0]) + // If a single shot request is executing, run and return immediately + if singleShot { + if batch { + s.execBatch(ctx, codec, reqs) + } else { + s.exec(ctx, codec, reqs[0]) + } return nil - } else if !singleShot && batch { - go s.execBatch(ctx, codec, reqs) - } else { - go s.exec(ctx, codec, reqs[0]) } - } + // For multi-shot connections, start a goroutine to serve and loop back + pend.Add(1) + go func(reqs []*serverRequest, batch bool) { + defer pend.Done() + if batch { + s.execBatch(ctx, codec, reqs) + } else { + s.exec(ctx, codec, reqs[0]) + } + }(reqs, batch) + } return nil } |