aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-02-24 18:19:00 +0800
committerBas van Kervel <bas@ethdev.com>2016-03-23 18:27:08 +0800
commita7bae3b2a645653a149b9bcbb9bdc857e27027e2 (patch)
tree72a0014c4f4c3e6f64eabe6e7d541890b437246d /rpc
parent6d3cd03a03167ccac851676a912ce31c76d5f75c (diff)
downloadgo-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar
go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.gz
go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.bz2
go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.lz
go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.xz
go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.tar.zst
go-tangerine-a7bae3b2a645653a149b9bcbb9bdc857e27027e2.zip
rpc/http: improve request handling
Diffstat (limited to 'rpc')
-rw-r--r--rpc/http.go277
-rw-r--r--rpc/server.go60
2 files changed, 102 insertions, 235 deletions
diff --git a/rpc/http.go b/rpc/http.go
index d9053b003..af3d29014 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -17,240 +17,23 @@
package rpc
import (
- "bufio"
"bytes"
"encoding/json"
"fmt"
- "io"
"io/ioutil"
- "net"
"net/http"
"net/url"
- "strconv"
"strings"
- "time"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "gopkg.in/fatih/set.v0"
+ "io"
+
+ "github.com/rs/cors"
)
const (
- httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request
+ maxHTTPRequestContentLength = 1024 * 128
)
-// httpMessageStream is the glue between a HTTP connection which is message based
-// and the RPC codecs that expect json requests to be read from a stream. It will
-// parse HTTP messages and offer the bodies of these requests as a stream through
-// the Read method. This will require full control of the connection and thus need
-// a "hijacked" HTTP connection.
-type httpMessageStream struct {
- conn net.Conn // TCP connection
- rw *bufio.ReadWriter // buffered where HTTP requests/responses are read/written from/to
- currentReq *http.Request // pending request, codec can pass in a too small buffer for a single read we need to keep track of the current requests if it was not read at once
- payloadBytesRead int64 // number of bytes which are read from the current request
- allowedOrigins *set.Set // allowed CORS domains
- origin string // origin of this connection/request
-}
-
-// NewHttpMessageStream will create a new http message stream parser that can be
-// used by the codes in the RPC package. It will take full control of the given
-// connection and thus needs to be hijacked. It will read and write HTTP messages
-// from the passed rwbuf. The allowed origins are the RPC CORS domains the user has supplied.
-func NewHTTPMessageStream(c net.Conn, rwbuf *bufio.ReadWriter, initialReq *http.Request, allowdOrigins []string) *httpMessageStream {
- r := &httpMessageStream{conn: c, rw: rwbuf, currentReq: initialReq, allowedOrigins: set.New()}
- for _, origin := range allowdOrigins {
- r.allowedOrigins.Add(origin)
- }
- return r
-}
-
-// handleOptionsRequest handles the HTTP preflight requests (OPTIONS) that browsers
-// make to enforce CORS rules. Only the POST method is allowed and the origin must
-// be on the rpccorsdomain list the user has specified.
-func (h *httpMessageStream) handleOptionsRequest(req *http.Request) error {
- headers := req.Header
-
- if !strings.EqualFold(req.Method, "OPTIONS") {
- return fmt.Errorf("preflight aborted: %s!=OPTIONS", req.Method)
- }
-
- origin := headers.Get("Origin")
- if origin == "" {
- return fmt.Errorf("preflight aborted: empty origin")
- }
-
- responseHeaders := make(http.Header)
- responseHeaders.Set("Access-Control-Allow-Methods", "POST")
- if h.allowedOrigins.Has(origin) || h.allowedOrigins.Has("*") {
- responseHeaders.Set("Access-Control-Allow-Origin", origin)
- } else {
- glog.V(logger.Info).Infof("origin '%s' not allowed", origin)
- }
- responseHeaders.Set("Access-Control-Allow-Headers", "Content-Type")
- responseHeaders.Set("Date", string(httpTimestamp(time.Now())))
- responseHeaders.Set("Content-Type", "text/plain; charset=utf-8")
- responseHeaders.Set("Content-Length", "0")
- responseHeaders.Set("Vary", "Origin")
-
- defer h.rw.Flush()
-
- if _, err := h.rw.WriteString("HTTP/1.1 200 OK\r\n"); err != nil {
- glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err)
- return err
- }
- if err := responseHeaders.Write(h.rw); err != nil {
- glog.V(logger.Error).Infof("unable to write OPTIONS headers: %v\n", err)
- }
- if _, err := h.rw.WriteString("\r\n"); err != nil {
- glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err)
- }
-
- return nil
-}
-
-// Read will read incoming HTTP requests and reads the body data from these requests
-// as an endless stream of data.
-func (h *httpMessageStream) Read(buf []byte) (n int, err error) {
- h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine))
- for {
- // if the last request was read completely try to read the next request
- if h.currentReq == nil {
- if h.currentReq, err = http.ReadRequest(bufio.NewReader(h.rw)); err != nil {
- return 0, err
- }
- }
-
- // The "options" method is http specific and not interested for the RPC server.
- // Handle it internally and wait for the next request.
- if strings.EqualFold(h.currentReq.Method, "OPTIONS") {
- if err = h.handleOptionsRequest(h.currentReq); err != nil {
- glog.V(logger.Info).Infof("RPC/HTTP OPTIONS error: %v\n", err)
- h.currentReq = nil
- return 0, err
- }
-
- // processed valid request -> reset deadline
- h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine))
- h.currentReq = nil
- continue
- }
-
- if strings.EqualFold(h.currentReq.Method, "GET") || strings.EqualFold(h.currentReq.Method, "POST") {
- n, err := h.currentReq.Body.Read(buf)
- h.payloadBytesRead += int64(n)
-
- // entire payload read, read new request next time
- if err == io.EOF || h.payloadBytesRead >= h.currentReq.ContentLength {
- h.origin = h.currentReq.Header.Get("origin")
- h.payloadBytesRead = 0
- h.currentReq.Body.Close()
- h.currentReq = nil
- err = nil // io.EOF is not an error
- } else if err != nil {
- // unable to read body
- h.currentReq.Body.Close()
- h.currentReq = nil
- h.payloadBytesRead = 0
- }
- // partial read of body
- return n, err
- }
- return 0, fmt.Errorf("unsupported HTTP method '%s'", h.currentReq.Method)
- }
-}
-
-// Write will create a HTTP response with the given payload and send it to the peer.
-func (h *httpMessageStream) Write(payload []byte) (int, error) {
- defer h.rw.Flush()
-
- responseHeaders := make(http.Header)
- responseHeaders.Set("Content-Type", "application/json")
- responseHeaders.Set("Content-Length", strconv.Itoa(len(payload)))
- if h.origin != "" {
- responseHeaders.Set("Access-Control-Allow-Origin", h.origin)
- }
-
- h.rw.WriteString("HTTP/1.1 200 OK\r\n")
- responseHeaders.Write(h.rw)
- h.rw.WriteString("\r\n")
-
- return h.rw.Write(payload)
-}
-
-// Close will close the underlying TCP connection this instance has taken ownership over.
-func (h *httpMessageStream) Close() error {
- h.rw.Flush()
- return h.conn.Close()
-}
-
-// TimeFormat is the time format to use with time.Parse and time.Time.Format when
-// parsing or generating times in HTTP headers. It is like time.RFC1123 but hard
-// codes GMT as the time zone.
-const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
-
-// httpTimestamp formats the given t as specified in RFC1123.
-func httpTimestamp(t time.Time) []byte {
- const days = "SunMonTueWedThuFriSat"
- const months = "JanFebMarAprMayJunJulAugSepOctNovDec"
-
- b := make([]byte, 0)
- t = t.UTC()
- yy, mm, dd := t.Date()
- hh, mn, ss := t.Clock()
- day := days[3*t.Weekday():]
- mon := months[3*(mm-1):]
-
- return append(b,
- day[0], day[1], day[2], ',', ' ',
- byte('0'+dd/10), byte('0'+dd%10), ' ',
- mon[0], mon[1], mon[2], ' ',
- byte('0'+yy/1000), byte('0'+(yy/100)%10), byte('0'+(yy/10)%10), byte('0'+yy%10), ' ',
- byte('0'+hh/10), byte('0'+hh%10), ':',
- byte('0'+mn/10), byte('0'+mn%10), ':',
- byte('0'+ss/10), byte('0'+ss%10), ' ',
- 'G', 'M', 'T')
-}
-
-// httpConnHijacker is a http.Handler implementation that will hijack the HTTP
-// connection, wraps it in a HttpMessageStream that is then wrapped in a JSON
-// codec which will be served on the rpcServer.
-type httpConnHijacker struct {
- corsdomains []string
- rpcServer *Server
-}
-
-// ServeHTTP will hijack the connection, wraps the captured connection in a
-// HttpMessageStream which is then used as codec.
-func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- hj, ok := w.(http.Hijacker)
- if !ok {
- http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
- return
- }
-
- conn, rwbuf, err := hj.Hijack()
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-
- httpRequestStream := NewHTTPMessageStream(conn, rwbuf, req, h.corsdomains)
-
- codec := NewJSONCodec(httpRequestStream)
- go h.rpcServer.ServeCodec(codec)
-}
-
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-func NewHTTPServer(cors string, handler *Server) *http.Server {
- return &http.Server{
- Handler: &httpConnHijacker{
- corsdomains: strings.Split(cors, ","),
- rpcServer: handler,
- },
- }
-}
-
// httpClient connects to a geth RPC server over HTTP.
type httpClient struct {
endpoint *url.URL // HTTP-RPC server endpoint
@@ -313,3 +96,55 @@ func (client *httpClient) Close() {
func (client *httpClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
}
+
+// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
+type httpReadWriteNopCloser struct {
+ io.Reader
+ io.Writer
+}
+
+// Close does nothing and returns always nil
+func (t *httpReadWriteNopCloser) Close() error {
+ return nil
+}
+
+// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
+// send the request to the given API provider and sends the response back to the caller.
+func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.ContentLength > maxHTTPRequestContentLength {
+ http.Error(w,
+ fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
+ http.StatusRequestEntityTooLarge)
+ return
+ }
+
+ w.Header().Set("content-type", "application/json")
+
+ // create a codec that reads direct from the request body until
+ // EOF and writes the response to w and order the server to process
+ // a single request.
+ codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
+ defer codec.Close()
+ srv.ServeSingleRequest(codec)
+ }
+}
+
+// NewHTTPServer creates a new HTTP RPC server around an API provider.
+func NewHTTPServer(corsString string, srv *Server) *http.Server {
+ var allowedOrigins []string
+ for _, domain := range strings.Split(corsString, ",") {
+ allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
+ }
+
+ c := cors.New(cors.Options{
+ AllowedOrigins: allowedOrigins,
+ AllowedMethods: []string{"POST", "GET"},
+ })
+
+ handler := c.Handler(newJSONHTTPHandler(srv))
+
+ return &http.Server{
+ Handler: handler,
+ }
+}
diff --git a/rpc/server.go b/rpc/server.go
index f42ee2d37..22448f8e3 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -117,14 +117,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil
}
-// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
-// response back using the given codec. It will block until the codec is closed.
-//
-// This server will:
-// 1. allow for asynchronous and parallel request execution
-// 2. supports notifications (pub/sub)
-// 3. supports request batches
-func (s *Server) ServeCodec(codec ServerCodec) {
+// serveRequest will reads requests from the codec, calls the RPC callback and
+// writes the response to the given codec.
+// If singleShot is true it will process a single request, otherwise it will handle
+// requests until the codec returns an error when reading a request (in most cases
+// an EOF). It executes requests in parallel when singleShot is false.
+func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
@@ -132,7 +130,12 @@ func (s *Server) ServeCodec(codec ServerCodec) {
buf = buf[:runtime.Stack(buf, false)]
glog.Errorln(string(buf))
}
- codec.Close()
+
+ s.codecsMu.Lock()
+ s.codecs.Remove(codec)
+ s.codecsMu.Unlock()
+
+ return
}()
ctx, cancel := context.WithCancel(context.Background())
@@ -141,20 +144,22 @@ func (s *Server) ServeCodec(codec ServerCodec) {
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock()
- return
+ return &shutdownError{}
}
s.codecs.Add(codec)
s.codecsMu.Unlock()
+ // test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
-
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
- break
+ return nil
}
+ // check if server is ordered to shutdown and return an error
+ // telling the client that his request failed.
if atomic.LoadInt32(&s.run) != 1 {
err = &shutdownError{}
if batch {
@@ -166,15 +171,42 @@ func (s *Server) ServeCodec(codec ServerCodec) {
} else {
codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
}
- break
+ return nil
}
- if batch {
+ if singleShot && batch {
+ s.execBatch(ctx, codec, reqs)
+ return nil
+ } else if singleShot && !batch {
+ s.exec(ctx, codec, reqs[0])
+ return nil
+ } else if !singleShot && batch {
go s.execBatch(ctx, codec, reqs)
} else {
go s.exec(ctx, codec, reqs[0])
}
}
+
+ return nil
+}
+
+// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
+// response back using the given codec. It will block until the codec is closed or the server is
+// stopped. In either case the codec is closed.
+//
+// This server will:
+// 1. allow for asynchronous and parallel request execution
+// 2. supports notifications (pub/sub)
+// 3. supports request batches
+func (s *Server) ServeCodec(codec ServerCodec) {
+ defer codec.Close()
+ s.serveRequest(codec, false)
+}
+
+// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
+// close the codec unless a non-recoverable error has occurred.
+func (s *Server) ServeSingleRequest(codec ServerCodec) {
+ s.serveRequest(codec, true)
}
// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,