aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/comms
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/comms')
-rw-r--r--rpc/comms/http.go185
-rw-r--r--rpc/comms/http_net.go182
2 files changed, 162 insertions, 205 deletions
diff --git a/rpc/comms/http.go b/rpc/comms/http.go
index 108ba0c5f..c165aa27e 100644
--- a/rpc/comms/http.go
+++ b/rpc/comms/http.go
@@ -17,11 +17,16 @@
package comms
import (
+ "encoding/json"
"fmt"
+ "net"
"net/http"
"strings"
+ "sync"
+ "time"
"bytes"
+ "io"
"io/ioutil"
"github.com/ethereum/go-ethereum/logger"
@@ -31,10 +36,15 @@ import (
"github.com/rs/cors"
)
+const (
+ serverIdleTimeout = 10 * time.Second // idle keep-alive connections
+ serverReadTimeout = 15 * time.Second // per-request read timeout
+ serverWriteTimeout = 15 * time.Second // per-request read timeout
+)
+
var (
- // main HTTP rpc listener
- httpListener *stoppableTCPListener
- listenerStoppedError = fmt.Errorf("Listener has stopped")
+ httpServerMu sync.Mutex
+ httpServer *stopServer
)
type HttpConfig struct {
@@ -43,42 +53,171 @@ type HttpConfig struct {
CorsDomain string
}
+// stopServer augments http.Server with idle connection tracking.
+// Idle keep-alive connections are shut down when Close is called.
+type stopServer struct {
+ *http.Server
+ l net.Listener
+ // connection tracking state
+ mu sync.Mutex
+ shutdown bool // true when Stop has returned
+ idle map[net.Conn]struct{}
+}
+
+type handler struct {
+ codec codec.Codec
+ api shared.EthereumApi
+}
+
+// StartHTTP starts listening for RPC requests sent via HTTP.
func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error {
- if httpListener != nil {
- if fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) != httpListener.Addr().String() {
- return fmt.Errorf("RPC service already running on %s ", httpListener.Addr().String())
+ httpServerMu.Lock()
+ defer httpServerMu.Unlock()
+
+ addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)
+ if httpServer != nil {
+ if addr != httpServer.Addr {
+ return fmt.Errorf("RPC service already running on %s ", httpServer.Addr)
}
return nil // RPC service already running on given host/port
}
-
- l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort))
+ // Set up the request handler, wrapping it with CORS headers if configured.
+ handler := http.Handler(&handler{codec, api})
+ if len(cfg.CorsDomain) > 0 {
+ opts := cors.Options{
+ AllowedMethods: []string{"POST"},
+ AllowedOrigins: strings.Split(cfg.CorsDomain, " "),
+ }
+ handler = cors.New(opts).Handler(handler)
+ }
+ // Start the server.
+ s, err := listenHTTP(addr, handler)
if err != nil {
glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err)
return err
}
- httpListener = l
+ httpServer = s
+ return nil
+}
- var handler http.Handler
- if len(cfg.CorsDomain) > 0 {
- var opts cors.Options
- opts.AllowedMethods = []string{"POST"}
- opts.AllowedOrigins = strings.Split(cfg.CorsDomain, " ")
+func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
- c := cors.New(opts)
- handler = newStoppableHandler(c.Handler(gethHttpHandler(codec, api)), l.stop)
- } else {
- handler = newStoppableHandler(gethHttpHandler(codec, api), l.stop)
+ // Limit request size to resist DoS
+ if req.ContentLength > maxHttpSizeReqLength {
+ err := fmt.Errorf("Request too large")
+ response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
+ sendJSON(w, &response)
+ return
}
- go http.Serve(l, handler)
+ defer req.Body.Close()
+ payload, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ err := fmt.Errorf("Could not read request body")
+ response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
+ sendJSON(w, &response)
+ return
+ }
- return nil
+ c := h.codec.New(nil)
+ var rpcReq shared.Request
+ if err = c.Decode(payload, &rpcReq); err == nil {
+ reply, err := h.api.Execute(&rpcReq)
+ res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
+ sendJSON(w, &res)
+ return
+ }
+
+ var reqBatch []shared.Request
+ if err = c.Decode(payload, &reqBatch); err == nil {
+ resBatch := make([]*interface{}, len(reqBatch))
+ resCount := 0
+ for i, rpcReq := range reqBatch {
+ reply, err := h.api.Execute(&rpcReq)
+ if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
+ resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
+ resCount += 1
+ }
+ }
+ // make response omitting nil entries
+ sendJSON(w, resBatch[:resCount])
+ return
+ }
+
+ // invalid request
+ err = fmt.Errorf("Could not decode request")
+ res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
+ sendJSON(w, res)
}
+func sendJSON(w io.Writer, v interface{}) {
+ if glog.V(logger.Detail) {
+ if payload, err := json.MarshalIndent(v, "", "\t"); err == nil {
+ glog.Infof("Sending payload: %s", payload)
+ }
+ }
+ if err := json.NewEncoder(w).Encode(v); err != nil {
+ glog.V(logger.Error).Infoln("Error sending JSON:", err)
+ }
+}
+
+// Stop closes all active HTTP connections and shuts down the server.
func StopHttp() {
- if httpListener != nil {
- httpListener.Stop()
- httpListener = nil
+ httpServerMu.Lock()
+ defer httpServerMu.Unlock()
+ if httpServer != nil {
+ httpServer.Close()
+ httpServer = nil
+ }
+}
+
+func listenHTTP(addr string, h http.Handler) (*stopServer, error) {
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+ s := &stopServer{l: l, idle: make(map[net.Conn]struct{})}
+ s.Server = &http.Server{
+ Addr: addr,
+ Handler: h,
+ ReadTimeout: serverReadTimeout,
+ WriteTimeout: serverWriteTimeout,
+ ConnState: s.connState,
+ }
+ go s.Serve(l)
+ return s, nil
+}
+
+func (s *stopServer) connState(c net.Conn, state http.ConnState) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // Close c immediately if we're past shutdown.
+ if s.shutdown {
+ if state != http.StateClosed {
+ c.Close()
+ }
+ return
+ }
+ if state == http.StateIdle {
+ s.idle[c] = struct{}{}
+ } else {
+ delete(s.idle, c)
+ }
+}
+
+func (s *stopServer) Close() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // Shut down the acceptor. No new connections can be created.
+ s.l.Close()
+ // Drop all idle connections. Non-idle connections will be
+ // closed by connState as soon as they become idle.
+ s.shutdown = true
+ for c := range s.idle {
+ glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr())
+ c.Close()
+ delete(s.idle, c)
}
}
diff --git a/rpc/comms/http_net.go b/rpc/comms/http_net.go
deleted file mode 100644
index dba2029d4..000000000
--- a/rpc/comms/http_net.go
+++ /dev/null
@@ -1,182 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package comms
-
-import (
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "time"
-
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/rpc/codec"
- "github.com/ethereum/go-ethereum/rpc/shared"
-)
-
-// When https://github.com/golang/go/issues/4674 is implemented this could be replaced
-type stoppableTCPListener struct {
- *net.TCPListener
- stop chan struct{} // closed when the listener must stop
-}
-
-func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) {
- wl, err := net.Listen("tcp", addr)
- if err != nil {
- return nil, err
- }
-
- if tcpl, ok := wl.(*net.TCPListener); ok {
- stop := make(chan struct{})
- return &stoppableTCPListener{tcpl, stop}, nil
- }
-
- return nil, fmt.Errorf("Unable to create TCP listener for RPC service")
-}
-
-// Stop the listener and all accepted and still active connections.
-func (self *stoppableTCPListener) Stop() {
- close(self.stop)
-}
-
-func (self *stoppableTCPListener) Accept() (net.Conn, error) {
- for {
- self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second)))
- c, err := self.TCPListener.AcceptTCP()
-
- select {
- case <-self.stop:
- if c != nil { // accept timeout
- c.Close()
- }
- self.TCPListener.Close()
- return nil, listenerStoppedError
- default:
- }
-
- if err != nil {
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() {
- continue // regular timeout
- }
- }
-
- return &closableConnection{c, self.stop}, err
- }
-}
-
-type closableConnection struct {
- *net.TCPConn
- closed chan struct{}
-}
-
-func (self *closableConnection) Read(b []byte) (n int, err error) {
- select {
- case <-self.closed:
- self.TCPConn.Close()
- return 0, io.EOF
- default:
- return self.TCPConn.Read(b)
- }
-}
-
-// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an
-// error indicating that the service was stopped. This will only happen for connections which are
-// kept open (HTTP keep-alive) when the RPC service was shutdown.
-func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- select {
- case <-stop:
- w.Header().Set("Content-Type", "application/json")
- err := fmt.Errorf("RPC service stopped")
- response := shared.NewRpcResponse(-1, shared.JsonRpcVersion, nil, err)
- httpSend(w, response)
- default:
- h.ServeHTTP(w, r)
- }
- })
-}
-
-func httpSend(writer io.Writer, v interface{}) (n int, err error) {
- var payload []byte
- payload, err = json.MarshalIndent(v, "", "\t")
- if err != nil {
- glog.V(logger.Error).Infoln("Error marshalling JSON", err)
- return 0, err
- }
- glog.V(logger.Detail).Infof("Sending payload: %s", payload)
-
- return writer.Write(payload)
-}
-
-func gethHttpHandler(codec codec.Codec, a shared.EthereumApi) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- w.Header().Set("Content-Type", "application/json")
-
- // Limit request size to resist DoS
- if req.ContentLength > maxHttpSizeReqLength {
- err := fmt.Errorf("Request too large")
- response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
- httpSend(w, &response)
- return
- }
-
- defer req.Body.Close()
- payload, err := ioutil.ReadAll(req.Body)
- if err != nil {
- err := fmt.Errorf("Could not read request body")
- response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
- httpSend(w, &response)
- return
- }
-
- c := codec.New(nil)
- var rpcReq shared.Request
- if err = c.Decode(payload, &rpcReq); err == nil {
- reply, err := a.Execute(&rpcReq)
- res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
- httpSend(w, &res)
- return
- }
-
- var reqBatch []shared.Request
- if err = c.Decode(payload, &reqBatch); err == nil {
- resBatch := make([]*interface{}, len(reqBatch))
- resCount := 0
-
- for i, rpcReq := range reqBatch {
- reply, err := a.Execute(&rpcReq)
- if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
- resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
- resCount += 1
- }
- }
-
- // make response omitting nil entries
- resBatch = resBatch[:resCount]
- httpSend(w, resBatch)
- return
- }
-
- // invalid request
- err = fmt.Errorf("Could not decode request")
- res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
- httpSend(w, res)
- })
-}