aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/comms/http.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/comms/http.go')
-rw-r--r--rpc/comms/http.go197
1 files changed, 169 insertions, 28 deletions
diff --git a/rpc/comms/http.go b/rpc/comms/http.go
index 3fb429e65..c08b744a1 100644
--- a/rpc/comms/http.go
+++ b/rpc/comms/http.go
@@ -1,29 +1,35 @@
// Copyright 2015 The go-ethereum Authors
-// This file is part of go-ethereum.
+// This file is part of the go-ethereum library.
//
-// go-ethereum is free software: you can redistribute it and/or modify
+// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
-// go-ethereum is distributed in the hope that it will be useful,
+// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
-// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package comms
import (
+ "encoding/json"
"fmt"
+ "net"
"net/http"
"strings"
+ "sync"
+ "time"
"bytes"
+ "io"
"io/ioutil"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc/codec"
@@ -31,10 +37,15 @@ import (
"github.com/rs/cors"
)
+const (
+ serverIdleTimeout = 10 * time.Second // idle keep-alive connections
+ serverReadTimeout = 15 * time.Second // per-request read timeout
+ serverWriteTimeout = 15 * time.Second // per-request read timeout
+)
+
var (
- // main HTTP rpc listener
- httpListener *stoppableTCPListener
- listenerStoppedError = fmt.Errorf("Listener has stopped")
+ httpServerMu sync.Mutex
+ httpServer *stopServer
)
type HttpConfig struct {
@@ -43,42 +54,172 @@ type HttpConfig struct {
CorsDomain string
}
+// stopServer augments http.Server with idle connection tracking.
+// Idle keep-alive connections are shut down when Close is called.
+type stopServer struct {
+ *http.Server
+ l net.Listener
+ // connection tracking state
+ mu sync.Mutex
+ shutdown bool // true when Stop has returned
+ idle map[net.Conn]struct{}
+}
+
+type handler struct {
+ codec codec.Codec
+ api shared.EthereumApi
+}
+
+// StartHTTP starts listening for RPC requests sent via HTTP.
func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error {
- if httpListener != nil {
- if fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) != httpListener.Addr().String() {
- return fmt.Errorf("RPC service already running on %s ", httpListener.Addr().String())
+ httpServerMu.Lock()
+ defer httpServerMu.Unlock()
+
+ addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)
+ if httpServer != nil {
+ if addr != httpServer.Addr {
+ return fmt.Errorf("RPC service already running on %s ", httpServer.Addr)
}
return nil // RPC service already running on given host/port
}
-
- l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort))
+ // Set up the request handler, wrapping it with CORS headers if configured.
+ handler := http.Handler(&handler{codec, api})
+ if len(cfg.CorsDomain) > 0 {
+ opts := cors.Options{
+ AllowedMethods: []string{"POST"},
+ AllowedOrigins: strings.Split(cfg.CorsDomain, " "),
+ }
+ handler = cors.New(opts).Handler(handler)
+ }
+ // Start the server.
+ s, err := listenHTTP(addr, handler)
if err != nil {
glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err)
return err
}
- httpListener = l
+ httpServer = s
+ return nil
+}
- var handler http.Handler
- if len(cfg.CorsDomain) > 0 {
- var opts cors.Options
- opts.AllowedMethods = []string{"POST"}
- opts.AllowedOrigins = strings.Split(cfg.CorsDomain, " ")
+func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
- c := cors.New(opts)
- handler = newStoppableHandler(c.Handler(gethHttpHandler(codec, api)), l.stop)
- } else {
- handler = newStoppableHandler(gethHttpHandler(codec, api), l.stop)
+ // Limit request size to resist DoS
+ if req.ContentLength > maxHttpSizeReqLength {
+ err := fmt.Errorf("Request too large")
+ response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
+ sendJSON(w, &response)
+ return
}
- go http.Serve(l, handler)
+ defer req.Body.Close()
+ payload, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ err := fmt.Errorf("Could not read request body")
+ response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
+ sendJSON(w, &response)
+ return
+ }
- return nil
+ c := h.codec.New(nil)
+ var rpcReq shared.Request
+ if err = c.Decode(payload, &rpcReq); err == nil {
+ reply, err := h.api.Execute(&rpcReq)
+ res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
+ sendJSON(w, &res)
+ return
+ }
+
+ var reqBatch []shared.Request
+ if err = c.Decode(payload, &reqBatch); err == nil {
+ resBatch := make([]*interface{}, len(reqBatch))
+ resCount := 0
+ for i, rpcReq := range reqBatch {
+ reply, err := h.api.Execute(&rpcReq)
+ if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
+ resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
+ resCount += 1
+ }
+ }
+ // make response omitting nil entries
+ sendJSON(w, resBatch[:resCount])
+ return
+ }
+
+ // invalid request
+ err = fmt.Errorf("Could not decode request")
+ res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
+ sendJSON(w, res)
}
+func sendJSON(w io.Writer, v interface{}) {
+ if glog.V(logger.Detail) {
+ if payload, err := json.MarshalIndent(v, "", "\t"); err == nil {
+ glog.Infof("Sending payload: %s", payload)
+ }
+ }
+ if err := json.NewEncoder(w).Encode(v); err != nil {
+ glog.V(logger.Error).Infoln("Error sending JSON:", err)
+ }
+}
+
+// Stop closes all active HTTP connections and shuts down the server.
func StopHttp() {
- if httpListener != nil {
- httpListener.Stop()
- httpListener = nil
+ httpServerMu.Lock()
+ defer httpServerMu.Unlock()
+ if httpServer != nil {
+ httpServer.Close()
+ httpServer = nil
+ }
+}
+
+func listenHTTP(addr string, h http.Handler) (*stopServer, error) {
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+ l = fdtrack.WrapListener("rpc", l)
+ s := &stopServer{l: l, idle: make(map[net.Conn]struct{})}
+ s.Server = &http.Server{
+ Addr: addr,
+ Handler: h,
+ ReadTimeout: serverReadTimeout,
+ WriteTimeout: serverWriteTimeout,
+ ConnState: s.connState,
+ }
+ go s.Serve(l)
+ return s, nil
+}
+
+func (s *stopServer) connState(c net.Conn, state http.ConnState) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // Close c immediately if we're past shutdown.
+ if s.shutdown {
+ if state != http.StateClosed {
+ c.Close()
+ }
+ return
+ }
+ if state == http.StateIdle {
+ s.idle[c] = struct{}{}
+ } else {
+ delete(s.idle, c)
+ }
+}
+
+func (s *stopServer) Close() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // Shut down the acceptor. No new connections can be created.
+ s.l.Close()
+ // Drop all idle connections. Non-idle connections will be
+ // closed by connState as soon as they become idle.
+ s.shutdown = true
+ for c := range s.idle {
+ glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr())
+ c.Close()
+ delete(s.idle, c)
}
}