diff options
author | Péter Szilágyi <peterke@gmail.com> | 2016-02-13 20:53:48 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2016-02-13 20:53:48 +0800 |
commit | 770b29fd80b8f25be31c2db5af6904cc5d0688ef (patch) | |
tree | 7b9da6bc13f6e881dd3a4ae231e9c69b6d9bbfeb /rpc | |
parent | b05e472c076d30035233d6a8b5fb3360b236e3ff (diff) | |
parent | df75dbfd6804923b1c8a8388b67523072d59f155 (diff) | |
download | go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.tar go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.tar.gz go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.tar.bz2 go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.tar.lz go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.tar.xz go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.tar.zst go-tangerine-770b29fd80b8f25be31c2db5af6904cc5d0688ef.zip |
Merge pull request #2175 from karalabe/refactor-http-rpc
cmd, common, node, rpc: move HTTP RPC into node, drop singleton aspect
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/http.go | 63 | ||||
-rw-r--r-- | rpc/inproc.go | 111 | ||||
-rw-r--r-- | rpc/ipc.go | 2 | ||||
-rw-r--r-- | rpc/server.go | 4 | ||||
-rw-r--r-- | rpc/utils.go | 3 | ||||
-rw-r--r-- | rpc/websocket.go | 78 |
6 files changed, 135 insertions, 126 deletions
diff --git a/rpc/http.go b/rpc/http.go index c5eb41af1..d9053b003 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -20,7 +20,6 @@ import ( "bufio" "bytes" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -29,7 +28,6 @@ import ( "net/url" "strconv" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/logger" @@ -41,12 +39,6 @@ const ( httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request ) -var ( - httpServerMu sync.Mutex // prevent concurrent access to the httpListener and httpServer - httpListener net.Listener // listener for the http server - httpRPCServer *Server // the node can only start 1 HTTP RPC server instance -) - // httpMessageStream is the glue between a HTTP connection which is message based // and the RPC codecs that expect json requests to be read from a stream. It will // parse HTTP messages and offer the bodies of these requests as a stream through @@ -249,53 +241,14 @@ func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) { go h.rpcServer.ServeCodec(codec) } -// StartHTTP will start the JSONRPC HTTP RPC interface when its not yet running. -func StartHTTP(address string, port int, corsdomains []string, apis []API) error { - httpServerMu.Lock() - defer httpServerMu.Unlock() - - if httpRPCServer != nil { - return fmt.Errorf("HTTP RPC interface already started on %s", httpListener.Addr()) - } - - rpcServer := NewServer() - - for _, api := range apis { - if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - } - - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) - if err != nil { - return err +// NewHTTPServer creates a new HTTP RPC server around an API provider. +func NewHTTPServer(cors string, handler *Server) *http.Server { + return &http.Server{ + Handler: &httpConnHijacker{ + corsdomains: strings.Split(cors, ","), + rpcServer: handler, + }, } - - httpServer := http.Server{Handler: &httpConnHijacker{corsdomains, rpcServer}} - go httpServer.Serve(listener) - - httpListener = listener - httpRPCServer = rpcServer - - return nil -} - -// StopHTTP will stop the running HTTP interface. If it is not running an error will be returned. -func StopHTTP() error { - httpServerMu.Lock() - defer httpServerMu.Unlock() - - if httpRPCServer == nil { - return errors.New("HTTP RPC interface not started") - } - - httpListener.Close() - httpRPCServer.Stop() - - httpRPCServer = nil - httpListener = nil - - return nil } // httpClient connects to a geth RPC server over HTTP. @@ -306,7 +259,7 @@ type httpClient struct { // NewHTTPClient create a new RPC clients that connection to a geth RPC server // over HTTP. -func NewHTTPClient(endpoint string) (*httpClient, error) { +func NewHTTPClient(endpoint string) (Client, error) { url, err := url.Parse(endpoint) if err != nil { return nil, err diff --git a/rpc/inproc.go b/rpc/inproc.go new file mode 100644 index 000000000..e138ba2c3 --- /dev/null +++ b/rpc/inproc.go @@ -0,0 +1,111 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import "encoding/json" + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + buffer := &inprocBuffer{ + requests: make(chan []byte, 16), + responses: make(chan []byte, 16), + } + client := &inProcClient{ + server: handler, + buffer: buffer, + } + go handler.ServeCodec(NewJSONCodec(client.buffer)) + return client +} + +// inProcClient is an in-process buffer stream attached to an RPC server. +type inProcClient struct { + server *Server + buffer *inprocBuffer +} + +// Close tears down the request channel of the in-proc client. +func (c *inProcClient) Close() { + c.buffer.Close() +} + +// Send marshals a message into a json format and injects in into the client +// request channel. +func (c *inProcClient) Send(msg interface{}) error { + d, err := json.Marshal(msg) + if err != nil { + return err + } + c.buffer.requests <- d + return nil +} + +// Recv reads a message from the response channel and tries to parse it into the +// given msg interface. +func (c *inProcClient) Recv(msg interface{}) error { + data := <-c.buffer.responses + return json.Unmarshal(data, &msg) +} + +// Returns the collection of modules the RPC server offers. +func (c *inProcClient) SupportedModules() (map[string]string, error) { + return SupportedModules(c) +} + +// inprocBuffer represents the connection between the RPC server and console +type inprocBuffer struct { + readBuf []byte // store remaining request bytes after a partial read + requests chan []byte // list with raw serialized requests + responses chan []byte // list with raw serialized responses +} + +// Read will read the next request in json format. +func (b *inprocBuffer) Read(p []byte) (int, error) { + // last read didn't read entire request, return remaining bytes + if len(b.readBuf) > 0 { + n := copy(p, b.readBuf) + if n < len(b.readBuf) { + b.readBuf = b.readBuf[:n] + } else { + b.readBuf = b.readBuf[:0] + } + return n, nil + } + // read next request + req := <-b.requests + n := copy(p, req) + if n < len(req) { + // inprocBuffer too small, store remaining chunk for next read + b.readBuf = req[n:] + } + return n, nil +} + +// Write sends the given buffer to the backend. +func (b *inprocBuffer) Write(p []byte) (n int, err error) { + b.responses <- p + return len(p), nil +} + +// Close cleans up obtained resources. +func (b *inprocBuffer) Close() error { + close(b.requests) + close(b.responses) + + return nil +} diff --git a/rpc/ipc.go b/rpc/ipc.go index b87bfcbd7..05d8909ca 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -38,7 +38,7 @@ type ipcClient struct { // NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. // On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a // named pipe. -func NewIPCClient(endpoint string) (*ipcClient, error) { +func NewIPCClient(endpoint string) (Client, error) { conn, err := newIPCConnection(endpoint) if err != nil { return nil, err diff --git a/rpc/server.go b/rpc/server.go index 5b88d843a..f42ee2d37 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -33,8 +33,8 @@ import ( const ( stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - DefaultIpcApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" - DefaultHttpRpcApis = "eth,net,web3" + DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" + DefaultHTTPApis = "eth,net,web3" ) // NewServer will create a new server instance with no registered handlers. diff --git a/rpc/utils.go b/rpc/utils.go index 39acf8196..fa114284d 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -20,13 +20,12 @@ import ( "crypto/rand" "encoding/hex" "errors" + "fmt" "math/big" "reflect" "unicode" "unicode/utf8" - "fmt" - "golang.org/x/net/context" ) diff --git a/rpc/websocket.go b/rpc/websocket.go index b5bcbf4f6..92615494e 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -17,13 +17,11 @@ package rpc import ( - "errors" "fmt" - "net" "net/http" - "sync" - "os" + "strings" + "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -31,12 +29,6 @@ import ( "gopkg.in/fatih/set.v0" ) -var ( - wsServerMu sync.Mutex - wsRPCServer *Server - wsListener net.Listener -) - // wsReaderWriterCloser reads and write payloads from and to a websocket connection. type wsReaderWriterCloser struct { c *websocket.Conn @@ -57,14 +49,6 @@ func (rw *wsReaderWriterCloser) Close() error { return rw.c.Close() } -// wsHandler accepts a websocket connection and handles incoming RPC requests. -// Will return when the websocket connection is closed, either by the client or -// server. -func wsHandler(conn *websocket.Conn) { - rwc := &wsReaderWriterCloser{conn} - wsRPCServer.ServeCodec(NewJSONCodec(rwc)) -} - // wsHandshakeValidator returns a handler that verifies the origin during the // websocket upgrade process. When a '*' is specified as an allowed origins all // connections are accepted. @@ -103,54 +87,16 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http return f } -// StartWS will start a websocket RPC server on the given address and port. -func StartWS(address string, port int, corsdomains []string, apis []API) error { - wsServerMu.Lock() - defer wsServerMu.Unlock() - - if wsRPCServer != nil { - return fmt.Errorf("WS RPC interface already started on %s", wsListener.Addr()) - } - - rpcServer := NewServer() - for _, api := range apis { - if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return err - } +// NewWSServer creates a new websocket RPC server around an API provider. +func NewWSServer(cors string, handler *Server) *http.Server { + return &http.Server{ + Handler: websocket.Server{ + Handshake: wsHandshakeValidator(strings.Split(cors, ",")), + Handler: func(conn *websocket.Conn) { + handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn})) + }, + }, } - - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) - if err != nil { - return err - } - - wsServer := websocket.Server{Handshake: wsHandshakeValidator(corsdomains), Handler: wsHandler} - wsHTTPServer := http.Server{Handler: wsServer} - - go wsHTTPServer.Serve(listener) - - wsListener = listener - wsRPCServer = rpcServer - - return nil -} - -// StopWS stops the running websocket RPC server. -func StopWS() error { - wsServerMu.Lock() - defer wsServerMu.Unlock() - - if wsRPCServer == nil { - return errors.New("HTTP RPC interface not started") - } - - wsListener.Close() - wsRPCServer.Stop() - - wsRPCServer = nil - wsListener = nil - - return nil } // wsClient represents a RPC client that communicates over websockets with a @@ -163,7 +109,7 @@ type wsClient struct { // NewWSClientj creates a new RPC client that communicates with a RPC server // that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (*wsClient, error) { +func NewWSClient(endpoint string) (Client, error) { return &wsClient{endpoint: endpoint}, nil } |