diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/client.go | 2 | ||||
-rw-r--r-- | rpc/doc.go | 2 | ||||
-rw-r--r-- | rpc/endpoints.go | 4 | ||||
-rw-r--r-- | rpc/http.go | 56 | ||||
-rw-r--r-- | rpc/server.go | 17 | ||||
-rw-r--r-- | rpc/types.go | 4 | ||||
-rw-r--r-- | rpc/websocket.go | 10 |
7 files changed, 70 insertions, 25 deletions
diff --git a/rpc/client.go b/rpc/client.go index 1c88cfab8..a2ef2ed6b 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -304,7 +304,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str return err } - // dispatch has accepted the request and will close the channel it when it quits. + // dispatch has accepted the request and will close the channel when it quits. switch resp, err := op.wait(ctx); { case err != nil: return err diff --git a/rpc/doc.go b/rpc/doc.go index 78aa92f89..9a6c4abbc 100644 --- a/rpc/doc.go +++ b/rpc/doc.go @@ -58,7 +58,7 @@ An example server which uses the JSON codec: return a + b } - func (s *CalculatorService Div(a, b int) (int, error) { + func (s *CalculatorService) Div(a, b int) (int, error) { if b == 0 { return 0, errors.New("divide by zero") } diff --git a/rpc/endpoints.go b/rpc/endpoints.go index 692c62d3a..8ca6d4eb0 100644 --- a/rpc/endpoints.go +++ b/rpc/endpoints.go @@ -23,7 +23,7 @@ import ( ) // StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules -func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) { +func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string, timeouts HTTPTimeouts) (net.Listener, *Server, error) { // Generate the whitelist based on the allowed modules whitelist := make(map[string]bool) for _, module := range modules { @@ -47,7 +47,7 @@ func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []str if listener, err = net.Listen("tcp", endpoint); err != nil { return nil, nil, err } - go NewHTTPServer(cors, vhosts, handler).Serve(listener) + go NewHTTPServer(cors, vhosts, timeouts, handler).Serve(listener) return listener, handler, err } diff --git a/rpc/http.go b/rpc/http.go index 6388d6896..f3bd1f29c 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -31,6 +31,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/log" "github.com/rs/cors" ) @@ -66,6 +67,38 @@ func (hc *httpConn) Close() error { return nil } +// HTTPTimeouts represents the configuration params for the HTTP RPC server. +type HTTPTimeouts struct { + // ReadTimeout is the maximum duration for reading the entire + // request, including the body. + // + // Because ReadTimeout does not let Handlers make per-request + // decisions on each request body's acceptable deadline or + // upload rate, most users will prefer to use + // ReadHeaderTimeout. It is valid to use them both. + ReadTimeout time.Duration + + // WriteTimeout is the maximum duration before timing out + // writes of the response. It is reset whenever a new + // request's header is read. Like ReadTimeout, it does not + // let Handlers make decisions on a per-request basis. + WriteTimeout time.Duration + + // IdleTimeout is the maximum amount of time to wait for the + // next request when keep-alives are enabled. If IdleTimeout + // is zero, the value of ReadTimeout is used. If both are + // zero, ReadHeaderTimeout is used. + IdleTimeout time.Duration +} + +// DefaultHTTPTimeouts represents the default timeout values used if further +// configuration is not provided. +var DefaultHTTPTimeouts = HTTPTimeouts{ + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 120 * time.Second, +} + // DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP // using the provided HTTP Client. func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { @@ -161,15 +194,30 @@ func (t *httpReadWriteNopCloser) Close() error { // NewHTTPServer creates a new HTTP RPC server around an API provider. // // Deprecated: Server implements http.Handler -func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server { +func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv *Server) *http.Server { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) + + // Make sure timeout values are meaningful + if timeouts.ReadTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout) + timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout + } + if timeouts.WriteTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout) + timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout + } + if timeouts.IdleTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout) + timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout + } + // Bundle and start the HTTP server return &http.Server{ Handler: handler, - ReadTimeout: 5 * time.Second, - WriteTimeout: 10 * time.Second, - IdleTimeout: 120 * time.Second, + ReadTimeout: timeouts.ReadTimeout, + WriteTimeout: timeouts.WriteTimeout, + IdleTimeout: timeouts.IdleTimeout, } } diff --git a/rpc/server.go b/rpc/server.go index 8925419fe..214e1d3ed 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -25,8 +25,8 @@ import ( "sync" "sync/atomic" + mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/log" - "gopkg.in/fatih/set.v0" ) const MetadataApi = "rpc" @@ -46,7 +46,7 @@ const ( func NewServer() *Server { server := &Server{ services: make(serviceRegistry), - codecs: set.New(), + codecs: mapset.NewSet(), run: 1, } @@ -94,11 +94,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) - // already a previous service register under given sname, merge methods/subscriptions + if len(methods) == 0 && len(subscriptions) == 0 { + return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) + } + + // already a previous service register under given name, merge methods/subscriptions if regsvc, present := s.services[name]; present { - if len(methods) == 0 && len(subscriptions) == 0 { - return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) - } for _, m := range methods { regsvc.callbacks[formatName(m.method.Name)] = m } @@ -111,10 +112,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { svc.name = name svc.callbacks, svc.subscriptions = methods, subscriptions - if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { - return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) - } - s.services[svc.name] = svc return nil } diff --git a/rpc/types.go b/rpc/types.go index f2375604e..4252c3602 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -23,8 +23,8 @@ import ( "strings" "sync" + mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common/hexutil" - "gopkg.in/fatih/set.v0" ) // API describes the set of methods offered over the RPC interface @@ -73,7 +73,7 @@ type Server struct { run int32 codecsMu sync.Mutex - codecs *set.Set + codecs mapset.Set } // rpcRequest represents a raw incoming RPC request diff --git a/rpc/websocket.go b/rpc/websocket.go index a6e1cec28..e7a86ddae 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -29,9 +29,9 @@ import ( "strings" "time" + mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/log" "golang.org/x/net/websocket" - "gopkg.in/fatih/set.v0" ) // websocketJSONCodec is a custom JSON codec with payload size enforcement and @@ -84,7 +84,7 @@ func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { // websocket upgrade process. When a '*' is specified as an allowed origins all // connections are accepted. func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error { - origins := set.New() + origins := mapset.NewSet() allowAllOrigins := false for _, origin := range allowedOrigins { @@ -97,18 +97,18 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http } // allow localhost if no allowedOrigins are specified. - if len(origins.List()) == 0 { + if len(origins.ToSlice()) == 0 { origins.Add("http://localhost") if hostname, err := os.Hostname(); err == nil { origins.Add("http://" + strings.ToLower(hostname)) } } - log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v\n", origins.List())) + log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v\n", origins.ToSlice())) f := func(cfg *websocket.Config, req *http.Request) error { origin := strings.ToLower(req.Header.Get("Origin")) - if allowAllOrigins || origins.Has(origin) { + if allowAllOrigins || origins.Contains(origin) { return nil } log.Warn(fmt.Sprintf("origin '%s' not allowed on WS-RPC interface\n", origin)) |