diff options
author | Bas van Kervel <bas@ethdev.com> | 2016-02-24 18:19:00 +0800 |
---|---|---|
committer | Bas van Kervel <bas@ethdev.com> | 2016-03-23 18:27:08 +0800 |
commit | a7bae3b2a645653a149b9bcbb9bdc857e27027e2 (patch) | |
tree | 72a0014c4f4c3e6f64eabe6e7d541890b437246d /rpc/server.go | |
parent | 6d3cd03a03167ccac851676a912ce31c76d5f75c (diff) | |
download | go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.gz go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.bz2 go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.lz go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.xz go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.zst go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.zip |
rpc/http: improve request handling
Diffstat (limited to 'rpc/server.go')
-rw-r--r-- | rpc/server.go | 60 |
1 files changed, 46 insertions, 14 deletions
diff --git a/rpc/server.go b/rpc/server.go index f42ee2d37..22448f8e3 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -117,14 +117,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return nil } -// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the -// response back using the given codec. It will block until the codec is closed. -// -// This server will: -// 1. allow for asynchronous and parallel request execution -// 2. supports notifications (pub/sub) -// 3. supports request batches -func (s *Server) ServeCodec(codec ServerCodec) { +// serveRequest will reads requests from the codec, calls the RPC callback and +// writes the response to the given codec. +// If singleShot is true it will process a single request, otherwise it will handle +// requests until the codec returns an error when reading a request (in most cases +// an EOF). It executes requests in parallel when singleShot is false. +func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { defer func() { if err := recover(); err != nil { const size = 64 << 10 @@ -132,7 +130,12 @@ func (s *Server) ServeCodec(codec ServerCodec) { buf = buf[:runtime.Stack(buf, false)] glog.Errorln(string(buf)) } - codec.Close() + + s.codecsMu.Lock() + s.codecs.Remove(codec) + s.codecsMu.Unlock() + + return }() ctx, cancel := context.WithCancel(context.Background()) @@ -141,20 +144,22 @@ func (s *Server) ServeCodec(codec ServerCodec) { s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped s.codecsMu.Unlock() - return + return &shutdownError{} } s.codecs.Add(codec) s.codecsMu.Unlock() + // test if the server is ordered to stop for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) - if err != nil { glog.V(logger.Debug).Infof("%v\n", err) codec.Write(codec.CreateErrorResponse(nil, err)) - break + return nil } + // check if server is ordered to shutdown and return an error + // telling the client that his request failed. if atomic.LoadInt32(&s.run) != 1 { err = &shutdownError{} if batch { @@ -166,15 +171,42 @@ func (s *Server) ServeCodec(codec ServerCodec) { } else { codec.Write(codec.CreateErrorResponse(&reqs[0].id, err)) } - break + return nil } - if batch { + if singleShot && batch { + s.execBatch(ctx, codec, reqs) + return nil + } else if singleShot && !batch { + s.exec(ctx, codec, reqs[0]) + return nil + } else if !singleShot && batch { go s.execBatch(ctx, codec, reqs) } else { go s.exec(ctx, codec, reqs[0]) } } + + return nil +} + +// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the +// response back using the given codec. It will block until the codec is closed or the server is +// stopped. In either case the codec is closed. +// +// This server will: +// 1. allow for asynchronous and parallel request execution +// 2. supports notifications (pub/sub) +// 3. supports request batches +func (s *Server) ServeCodec(codec ServerCodec) { + defer codec.Close() + s.serveRequest(codec, false) +} + +// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not +// close the codec unless a non-recoverable error has occurred. +func (s *Server) ServeSingleRequest(codec ServerCodec) { + s.serveRequest(codec, true) } // Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish, |