diff options
author | Bas van Kervel <bas@ethdev.com> | 2016-02-24 18:19:00 +0800 |
---|---|---|
committer | Bas van Kervel <bas@ethdev.com> | 2016-03-23 18:27:08 +0800 |
commit | a7bae3b2a645653a149b9bcbb9bdc857e27027e2 (patch) | |
tree | 72a0014c4f4c3e6f64eabe6e7d541890b437246d /rpc | |
parent | 6d3cd03a03167ccac851676a912ce31c76d5f75c (diff) | |
download | go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.gz go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.bz2 go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.lz go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.xz go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.zst go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.zip |
rpc/http: improve request handling
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/http.go | 277 | ||||
-rw-r--r-- | rpc/server.go | 60 |
2 files changed, 102 insertions, 235 deletions
diff --git a/rpc/http.go b/rpc/http.go index d9053b003..af3d29014 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -17,240 +17,23 @@ package rpc import ( - "bufio" "bytes" "encoding/json" "fmt" - "io" "io/ioutil" - "net" "net/http" "net/url" - "strconv" "strings" - "time" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "gopkg.in/fatih/set.v0" + "io" + + "github.com/rs/cors" ) const ( - httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request + maxHTTPRequestContentLength = 1024 * 128 ) -// httpMessageStream is the glue between a HTTP connection which is message based -// and the RPC codecs that expect json requests to be read from a stream. It will -// parse HTTP messages and offer the bodies of these requests as a stream through -// the Read method. This will require full control of the connection and thus need -// a "hijacked" HTTP connection. -type httpMessageStream struct { - conn net.Conn // TCP connection - rw *bufio.ReadWriter // buffered where HTTP requests/responses are read/written from/to - currentReq *http.Request // pending request, codec can pass in a too small buffer for a single read we need to keep track of the current requests if it was not read at once - payloadBytesRead int64 // number of bytes which are read from the current request - allowedOrigins *set.Set // allowed CORS domains - origin string // origin of this connection/request -} - -// NewHttpMessageStream will create a new http message stream parser that can be -// used by the codes in the RPC package. It will take full control of the given -// connection and thus needs to be hijacked. It will read and write HTTP messages -// from the passed rwbuf. The allowed origins are the RPC CORS domains the user has supplied. -func NewHTTPMessageStream(c net.Conn, rwbuf *bufio.ReadWriter, initialReq *http.Request, allowdOrigins []string) *httpMessageStream { - r := &httpMessageStream{conn: c, rw: rwbuf, currentReq: initialReq, allowedOrigins: set.New()} - for _, origin := range allowdOrigins { - r.allowedOrigins.Add(origin) - } - return r -} - -// handleOptionsRequest handles the HTTP preflight requests (OPTIONS) that browsers -// make to enforce CORS rules. Only the POST method is allowed and the origin must -// be on the rpccorsdomain list the user has specified. -func (h *httpMessageStream) handleOptionsRequest(req *http.Request) error { - headers := req.Header - - if !strings.EqualFold(req.Method, "OPTIONS") { - return fmt.Errorf("preflight aborted: %s!=OPTIONS", req.Method) - } - - origin := headers.Get("Origin") - if origin == "" { - return fmt.Errorf("preflight aborted: empty origin") - } - - responseHeaders := make(http.Header) - responseHeaders.Set("Access-Control-Allow-Methods", "POST") - if h.allowedOrigins.Has(origin) || h.allowedOrigins.Has("*") { - responseHeaders.Set("Access-Control-Allow-Origin", origin) - } else { - glog.V(logger.Info).Infof("origin '%s' not allowed", origin) - } - responseHeaders.Set("Access-Control-Allow-Headers", "Content-Type") - responseHeaders.Set("Date", string(httpTimestamp(time.Now()))) - responseHeaders.Set("Content-Type", "text/plain; charset=utf-8") - responseHeaders.Set("Content-Length", "0") - responseHeaders.Set("Vary", "Origin") - - defer h.rw.Flush() - - if _, err := h.rw.WriteString("HTTP/1.1 200 OK\r\n"); err != nil { - glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err) - return err - } - if err := responseHeaders.Write(h.rw); err != nil { - glog.V(logger.Error).Infof("unable to write OPTIONS headers: %v\n", err) - } - if _, err := h.rw.WriteString("\r\n"); err != nil { - glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err) - } - - return nil -} - -// Read will read incoming HTTP requests and reads the body data from these requests -// as an endless stream of data. -func (h *httpMessageStream) Read(buf []byte) (n int, err error) { - h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine)) - for { - // if the last request was read completely try to read the next request - if h.currentReq == nil { - if h.currentReq, err = http.ReadRequest(bufio.NewReader(h.rw)); err != nil { - return 0, err - } - } - - // The "options" method is http specific and not interested for the RPC server. - // Handle it internally and wait for the next request. - if strings.EqualFold(h.currentReq.Method, "OPTIONS") { - if err = h.handleOptionsRequest(h.currentReq); err != nil { - glog.V(logger.Info).Infof("RPC/HTTP OPTIONS error: %v\n", err) - h.currentReq = nil - return 0, err - } - - // processed valid request -> reset deadline - h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine)) - h.currentReq = nil - continue - } - - if strings.EqualFold(h.currentReq.Method, "GET") || strings.EqualFold(h.currentReq.Method, "POST") { - n, err := h.currentReq.Body.Read(buf) - h.payloadBytesRead += int64(n) - - // entire payload read, read new request next time - if err == io.EOF || h.payloadBytesRead >= h.currentReq.ContentLength { - h.origin = h.currentReq.Header.Get("origin") - h.payloadBytesRead = 0 - h.currentReq.Body.Close() - h.currentReq = nil - err = nil // io.EOF is not an error - } else if err != nil { - // unable to read body - h.currentReq.Body.Close() - h.currentReq = nil - h.payloadBytesRead = 0 - } - // partial read of body - return n, err - } - return 0, fmt.Errorf("unsupported HTTP method '%s'", h.currentReq.Method) - } -} - -// Write will create a HTTP response with the given payload and send it to the peer. -func (h *httpMessageStream) Write(payload []byte) (int, error) { - defer h.rw.Flush() - - responseHeaders := make(http.Header) - responseHeaders.Set("Content-Type", "application/json") - responseHeaders.Set("Content-Length", strconv.Itoa(len(payload))) - if h.origin != "" { - responseHeaders.Set("Access-Control-Allow-Origin", h.origin) - } - - h.rw.WriteString("HTTP/1.1 200 OK\r\n") - responseHeaders.Write(h.rw) - h.rw.WriteString("\r\n") - - return h.rw.Write(payload) -} - -// Close will close the underlying TCP connection this instance has taken ownership over. -func (h *httpMessageStream) Close() error { - h.rw.Flush() - return h.conn.Close() -} - -// TimeFormat is the time format to use with time.Parse and time.Time.Format when -// parsing or generating times in HTTP headers. It is like time.RFC1123 but hard -// codes GMT as the time zone. -const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT" - -// httpTimestamp formats the given t as specified in RFC1123. -func httpTimestamp(t time.Time) []byte { - const days = "SunMonTueWedThuFriSat" - const months = "JanFebMarAprMayJunJulAugSepOctNovDec" - - b := make([]byte, 0) - t = t.UTC() - yy, mm, dd := t.Date() - hh, mn, ss := t.Clock() - day := days[3*t.Weekday():] - mon := months[3*(mm-1):] - - return append(b, - day[0], day[1], day[2], ',', ' ', - byte('0'+dd/10), byte('0'+dd%10), ' ', - mon[0], mon[1], mon[2], ' ', - byte('0'+yy/1000), byte('0'+(yy/100)%10), byte('0'+(yy/10)%10), byte('0'+yy%10), ' ', - byte('0'+hh/10), byte('0'+hh%10), ':', - byte('0'+mn/10), byte('0'+mn%10), ':', - byte('0'+ss/10), byte('0'+ss%10), ' ', - 'G', 'M', 'T') -} - -// httpConnHijacker is a http.Handler implementation that will hijack the HTTP -// connection, wraps it in a HttpMessageStream that is then wrapped in a JSON -// codec which will be served on the rpcServer. -type httpConnHijacker struct { - corsdomains []string - rpcServer *Server -} - -// ServeHTTP will hijack the connection, wraps the captured connection in a -// HttpMessageStream which is then used as codec. -func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) { - hj, ok := w.(http.Hijacker) - if !ok { - http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError) - return - } - - conn, rwbuf, err := hj.Hijack() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - httpRequestStream := NewHTTPMessageStream(conn, rwbuf, req, h.corsdomains) - - codec := NewJSONCodec(httpRequestStream) - go h.rpcServer.ServeCodec(codec) -} - -// NewHTTPServer creates a new HTTP RPC server around an API provider. -func NewHTTPServer(cors string, handler *Server) *http.Server { - return &http.Server{ - Handler: &httpConnHijacker{ - corsdomains: strings.Split(cors, ","), - rpcServer: handler, - }, - } -} - // httpClient connects to a geth RPC server over HTTP. type httpClient struct { endpoint *url.URL // HTTP-RPC server endpoint @@ -313,3 +96,55 @@ func (client *httpClient) Close() { func (client *httpClient) SupportedModules() (map[string]string, error) { return SupportedModules(client) } + +// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method. +type httpReadWriteNopCloser struct { + io.Reader + io.Writer +} + +// Close does nothing and returns always nil +func (t *httpReadWriteNopCloser) Close() error { + return nil +} + +// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests, +// send the request to the given API provider and sends the response back to the caller. +func newJSONHTTPHandler(srv *Server) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > maxHTTPRequestContentLength { + http.Error(w, + fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), + http.StatusRequestEntityTooLarge) + return + } + + w.Header().Set("content-type", "application/json") + + // create a codec that reads direct from the request body until + // EOF and writes the response to w and order the server to process + // a single request. + codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) + defer codec.Close() + srv.ServeSingleRequest(codec) + } +} + +// NewHTTPServer creates a new HTTP RPC server around an API provider. +func NewHTTPServer(corsString string, srv *Server) *http.Server { + var allowedOrigins []string + for _, domain := range strings.Split(corsString, ",") { + allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain)) + } + + c := cors.New(cors.Options{ + AllowedOrigins: allowedOrigins, + AllowedMethods: []string{"POST", "GET"}, + }) + + handler := c.Handler(newJSONHTTPHandler(srv)) + + return &http.Server{ + Handler: handler, + } +} diff --git a/rpc/server.go b/rpc/server.go index f42ee2d37..22448f8e3 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -117,14 +117,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return nil } -// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the -// response back using the given codec. It will block until the codec is closed. -// -// This server will: -// 1. allow for asynchronous and parallel request execution -// 2. supports notifications (pub/sub) -// 3. supports request batches -func (s *Server) ServeCodec(codec ServerCodec) { +// serveRequest will reads requests from the codec, calls the RPC callback and +// writes the response to the given codec. +// If singleShot is true it will process a single request, otherwise it will handle +// requests until the codec returns an error when reading a request (in most cases +// an EOF). It executes requests in parallel when singleShot is false. +func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { defer func() { if err := recover(); err != nil { const size = 64 << 10 @@ -132,7 +130,12 @@ func (s *Server) ServeCodec(codec ServerCodec) { buf = buf[:runtime.Stack(buf, false)] glog.Errorln(string(buf)) } - codec.Close() + + s.codecsMu.Lock() + s.codecs.Remove(codec) + s.codecsMu.Unlock() + + return }() ctx, cancel := context.WithCancel(context.Background()) @@ -141,20 +144,22 @@ func (s *Server) ServeCodec(codec ServerCodec) { s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped s.codecsMu.Unlock() - return + return &shutdownError{} } s.codecs.Add(codec) s.codecsMu.Unlock() + // test if the server is ordered to stop for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) - if err != nil { glog.V(logger.Debug).Infof("%v\n", err) codec.Write(codec.CreateErrorResponse(nil, err)) - break + return nil } + // check if server is ordered to shutdown and return an error + // telling the client that his request failed. if atomic.LoadInt32(&s.run) != 1 { err = &shutdownError{} if batch { @@ -166,15 +171,42 @@ func (s *Server) ServeCodec(codec ServerCodec) { } else { codec.Write(codec.CreateErrorResponse(&reqs[0].id, err)) } - break + return nil } - if batch { + if singleShot && batch { + s.execBatch(ctx, codec, reqs) + return nil + } else if singleShot && !batch { + s.exec(ctx, codec, reqs[0]) + return nil + } else if !singleShot && batch { go s.execBatch(ctx, codec, reqs) } else { go s.exec(ctx, codec, reqs[0]) } } + + return nil +} + +// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the +// response back using the given codec. It will block until the codec is closed or the server is +// stopped. In either case the codec is closed. +// +// This server will: +// 1. allow for asynchronous and parallel request execution +// 2. supports notifications (pub/sub) +// 3. supports request batches +func (s *Server) ServeCodec(codec ServerCodec) { + defer codec.Close() + s.serveRequest(codec, false) +} + +// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not +// close the codec unless a non-recoverable error has occurred. +func (s *Server) ServeSingleRequest(codec ServerCodec) { + s.serveRequest(codec, true) } // Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish, |