aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/client.go2
-rw-r--r--rpc/doc.go2
-rw-r--r--rpc/endpoints.go4
-rw-r--r--rpc/http.go56
-rw-r--r--rpc/server.go17
-rw-r--r--rpc/types.go4
-rw-r--r--rpc/websocket.go10
7 files changed, 70 insertions, 25 deletions
diff --git a/rpc/client.go b/rpc/client.go
index 1c88cfab8..a2ef2ed6b 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -304,7 +304,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
return err
}
- // dispatch has accepted the request and will close the channel it when it quits.
+ // dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx); {
case err != nil:
return err
diff --git a/rpc/doc.go b/rpc/doc.go
index 78aa92f89..9a6c4abbc 100644
--- a/rpc/doc.go
+++ b/rpc/doc.go
@@ -58,7 +58,7 @@ An example server which uses the JSON codec:
return a + b
}
- func (s *CalculatorService Div(a, b int) (int, error) {
+ func (s *CalculatorService) Div(a, b int) (int, error) {
if b == 0 {
return 0, errors.New("divide by zero")
}
diff --git a/rpc/endpoints.go b/rpc/endpoints.go
index 692c62d3a..8ca6d4eb0 100644
--- a/rpc/endpoints.go
+++ b/rpc/endpoints.go
@@ -23,7 +23,7 @@ import (
)
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules
-func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {
+func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string, timeouts HTTPTimeouts) (net.Listener, *Server, error) {
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
@@ -47,7 +47,7 @@ func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []str
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
- go NewHTTPServer(cors, vhosts, handler).Serve(listener)
+ go NewHTTPServer(cors, vhosts, timeouts, handler).Serve(listener)
return listener, handler, err
}
diff --git a/rpc/http.go b/rpc/http.go
index 6388d6896..f3bd1f29c 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -31,6 +31,7 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/log"
"github.com/rs/cors"
)
@@ -66,6 +67,38 @@ func (hc *httpConn) Close() error {
return nil
}
+// HTTPTimeouts represents the configuration params for the HTTP RPC server.
+type HTTPTimeouts struct {
+ // ReadTimeout is the maximum duration for reading the entire
+ // request, including the body.
+ //
+ // Because ReadTimeout does not let Handlers make per-request
+ // decisions on each request body's acceptable deadline or
+ // upload rate, most users will prefer to use
+ // ReadHeaderTimeout. It is valid to use them both.
+ ReadTimeout time.Duration
+
+ // WriteTimeout is the maximum duration before timing out
+ // writes of the response. It is reset whenever a new
+ // request's header is read. Like ReadTimeout, it does not
+ // let Handlers make decisions on a per-request basis.
+ WriteTimeout time.Duration
+
+ // IdleTimeout is the maximum amount of time to wait for the
+ // next request when keep-alives are enabled. If IdleTimeout
+ // is zero, the value of ReadTimeout is used. If both are
+ // zero, ReadHeaderTimeout is used.
+ IdleTimeout time.Duration
+}
+
+// DefaultHTTPTimeouts represents the default timeout values used if further
+// configuration is not provided.
+var DefaultHTTPTimeouts = HTTPTimeouts{
+ ReadTimeout: 30 * time.Second,
+ WriteTimeout: 30 * time.Second,
+ IdleTimeout: 120 * time.Second,
+}
+
// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP
// using the provided HTTP Client.
func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) {
@@ -161,15 +194,30 @@ func (t *httpReadWriteNopCloser) Close() error {
// NewHTTPServer creates a new HTTP RPC server around an API provider.
//
// Deprecated: Server implements http.Handler
-func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {
+func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv *Server) *http.Server {
// Wrap the CORS-handler within a host-handler
handler := newCorsHandler(srv, cors)
handler = newVHostHandler(vhosts, handler)
+
+ // Make sure timeout values are meaningful
+ if timeouts.ReadTimeout < time.Second {
+ log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout)
+ timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout
+ }
+ if timeouts.WriteTimeout < time.Second {
+ log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout)
+ timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout
+ }
+ if timeouts.IdleTimeout < time.Second {
+ log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout)
+ timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout
+ }
+ // Bundle and start the HTTP server
return &http.Server{
Handler: handler,
- ReadTimeout: 5 * time.Second,
- WriteTimeout: 10 * time.Second,
- IdleTimeout: 120 * time.Second,
+ ReadTimeout: timeouts.ReadTimeout,
+ WriteTimeout: timeouts.WriteTimeout,
+ IdleTimeout: timeouts.IdleTimeout,
}
}
diff --git a/rpc/server.go b/rpc/server.go
index 8925419fe..214e1d3ed 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -25,8 +25,8 @@ import (
"sync"
"sync/atomic"
+ mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/log"
- "gopkg.in/fatih/set.v0"
)
const MetadataApi = "rpc"
@@ -46,7 +46,7 @@ const (
func NewServer() *Server {
server := &Server{
services: make(serviceRegistry),
- codecs: set.New(),
+ codecs: mapset.NewSet(),
run: 1,
}
@@ -94,11 +94,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
- // already a previous service register under given sname, merge methods/subscriptions
+ if len(methods) == 0 && len(subscriptions) == 0 {
+ return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
+ }
+
+ // already a previous service register under given name, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
- if len(methods) == 0 && len(subscriptions) == 0 {
- return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
- }
for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
@@ -111,10 +112,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
svc.name = name
svc.callbacks, svc.subscriptions = methods, subscriptions
- if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
- return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
- }
-
s.services[svc.name] = svc
return nil
}
diff --git a/rpc/types.go b/rpc/types.go
index f2375604e..4252c3602 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -23,8 +23,8 @@ import (
"strings"
"sync"
+ mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common/hexutil"
- "gopkg.in/fatih/set.v0"
)
// API describes the set of methods offered over the RPC interface
@@ -73,7 +73,7 @@ type Server struct {
run int32
codecsMu sync.Mutex
- codecs *set.Set
+ codecs mapset.Set
}
// rpcRequest represents a raw incoming RPC request
diff --git a/rpc/websocket.go b/rpc/websocket.go
index a6e1cec28..e7a86ddae 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -29,9 +29,9 @@ import (
"strings"
"time"
+ mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/net/websocket"
- "gopkg.in/fatih/set.v0"
)
// websocketJSONCodec is a custom JSON codec with payload size enforcement and
@@ -84,7 +84,7 @@ func NewWSServer(allowedOrigins []string, srv *Server) *http.Server {
// websocket upgrade process. When a '*' is specified as an allowed origins all
// connections are accepted.
func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error {
- origins := set.New()
+ origins := mapset.NewSet()
allowAllOrigins := false
for _, origin := range allowedOrigins {
@@ -97,18 +97,18 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
}
// allow localhost if no allowedOrigins are specified.
- if len(origins.List()) == 0 {
+ if len(origins.ToSlice()) == 0 {
origins.Add("http://localhost")
if hostname, err := os.Hostname(); err == nil {
origins.Add("http://" + strings.ToLower(hostname))
}
}
- log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v\n", origins.List()))
+ log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v\n", origins.ToSlice()))
f := func(cfg *websocket.Config, req *http.Request) error {
origin := strings.ToLower(req.Header.Get("Origin"))
- if allowAllOrigins || origins.Has(origin) {
+ if allowAllOrigins || origins.Contains(origin) {
return nil
}
log.Warn(fmt.Sprintf("origin '%s' not allowed on WS-RPC interface\n", origin))