diff options
author | Bas van Kervel <bas@ethdev.com> | 2015-12-16 17:58:01 +0800 |
---|---|---|
committer | Jeffrey Wilcke <geffobscura@gmail.com> | 2016-01-26 20:51:50 +0800 |
commit | 19b2640e89465c1c57f1bbea0274d52d97151f60 (patch) | |
tree | 980e063693dae7fa6105646821ee6755b176b6e2 /rpc/http.go | |
parent | f2ab351e8d3b0a4e569ce56f6a4f17725ca5ba65 (diff) | |
download | go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.tar go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.tar.gz go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.tar.bz2 go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.tar.lz go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.tar.xz go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.tar.zst go-tangerine-19b2640e89465c1c57f1bbea0274d52d97151f60.zip |
rpc: migrated the RPC insterface to a new reflection based RPC layer
Diffstat (limited to 'rpc/http.go')
-rw-r--r-- | rpc/http.go | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/rpc/http.go b/rpc/http.go new file mode 100644 index 000000000..e4b25bed8 --- /dev/null +++ b/rpc/http.go @@ -0,0 +1,368 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import ( + "bufio" + "fmt" + "io" + "net" + "net/http" + "strconv" + "strings" + "time" + + "errors" + "sync" + + "bytes" + "encoding/json" + "io/ioutil" + "net/url" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "gopkg.in/fatih/set.v0" +) + +const ( + httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request +) + +var ( + httpServerMu sync.Mutex // prevent concurrent access to the httpListener and httpServer + httpListener net.Listener // listener for the http server + httpRPCServer *Server // the node can only start 1 HTTP RPC server instance +) + +// httpMessageStream is the glue between a HTTP connection which is message based +// and the RPC codecs that expect json requests to be read from a stream. It will +// parse HTTP messages and offer the bodies of these requests as a stream through +// the Read method. This will require full control of the connection and thus need +// a "hijacked" HTTP connection. +type httpMessageStream struct { + conn net.Conn // TCP connection + rw *bufio.ReadWriter // buffered where HTTP requests/responses are read/written from/to + currentReq *http.Request // pending request, codec can pass in a too small buffer for a single read + // we need to keep track of the current requests if it was not read at once + payloadBytesRead int64 // number of bytes which are read from the current request + allowedOrigins *set.Set // allowed CORS domains + origin string // origin of this connection/request +} + +// NewHttpMessageStream will create a new http message stream parser that can be +// used by the codes in the RPC package. It will take full control of the given +// connection and thus needs to be hijacked. It will read and write HTTP messages +// from the passed rwbuf. The allowed origins are the RPC CORS domains the user has supplied. +func NewHTTPMessageStream(c net.Conn, rwbuf *bufio.ReadWriter, initialReq *http.Request, allowdOrigins []string) *httpMessageStream { + r := &httpMessageStream{conn: c, rw: rwbuf, currentReq: initialReq, allowedOrigins: set.New()} + for _, origin := range allowdOrigins { + r.allowedOrigins.Add(origin) + } + return r +} + +// handleOptionsRequest handles the HTTP preflight requests (OPTIONS) that browsers +// make to enforce CORS rules. Only the POST method is allowed and the origin must +// be on the rpccorsdomain list the user has specified. +func (h *httpMessageStream) handleOptionsRequest(req *http.Request) error { + headers := req.Header + + if !strings.EqualFold(req.Method, "OPTIONS") { + return fmt.Errorf("preflight aborted: %s!=OPTIONS", req.Method) + } + + origin := headers.Get("Origin") + if origin == "" { + return fmt.Errorf("preflight aborted: empty origin") + } + + responseHeaders := make(http.Header) + responseHeaders.Set("Access-Control-Allow-Methods", "POST") + if h.allowedOrigins.Has(origin) || h.allowedOrigins.Has("*") { + responseHeaders.Set("Access-Control-Allow-Origin", origin) + } else { + glog.V(logger.Info).Infof("origin '%s' not allowed", origin) + } + responseHeaders.Set("Access-Control-Allow-Headers", "Content-Type") + responseHeaders.Set("Date", string(httpTimestamp(time.Now()))) + responseHeaders.Set("Content-Type", "text/plain; charset=utf-8") + responseHeaders.Set("Content-Length", "0") + responseHeaders.Set("Vary", "Origin") + + defer h.rw.Flush() + + if _, err := h.rw.WriteString("HTTP/1.1 200 OK\r\n"); err != nil { + glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err) + return err + } + if err := responseHeaders.Write(h.rw); err != nil { + glog.V(logger.Error).Infof("unable to write OPTIONS headers: %v\n", err) + } + if _, err := h.rw.WriteString("\r\n"); err != nil { + glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err) + } + + return nil +} + +// Read will read incoming HTTP requests and reads the body data from these requests +// as an endless stream of data. +func (h *httpMessageStream) Read(buf []byte) (n int, err error) { + h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine)) + for { + // if the last request was read completely try to read the next request + if h.currentReq == nil { + if h.currentReq, err = http.ReadRequest(bufio.NewReader(h.rw)); err != nil { + return 0, err + } + } + + // The "options" method is http specific and not interested for the RPC server. + // Handle it internally and wait for the next request. + if strings.EqualFold(h.currentReq.Method, "OPTIONS") { + if err = h.handleOptionsRequest(h.currentReq); err != nil { + glog.V(logger.Info).Infof("RPC/HTTP OPTIONS error: %v\n", err) + h.currentReq = nil + return 0, err + } + + // processed valid request -> reset deadline + h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine)) + h.currentReq = nil + continue + } + + if strings.EqualFold(h.currentReq.Method, "POST") { + n, err := h.currentReq.Body.Read(buf) + h.payloadBytesRead += int64(n) + + // entire payload read, read new request next time + if err == io.EOF || h.payloadBytesRead >= h.currentReq.ContentLength { + h.origin = h.currentReq.Header.Get("origin") + h.payloadBytesRead = 0 + h.currentReq.Body.Close() + h.currentReq = nil + err = nil // io.EOF is not an error + } else if err != nil { + // unable to read body + h.currentReq.Body.Close() + h.currentReq = nil + h.payloadBytesRead = 0 + } + + // partial read of body + return n, err + } + + h.currentReq = nil + return 0, fmt.Errorf("unsupported HTTP method '%s'", h.currentReq.Method) + } +} + +// Write will create a HTTP response with the given payload and send it to the peer. +func (h *httpMessageStream) Write(payload []byte) (int, error) { + defer h.rw.Flush() + + responseHeaders := make(http.Header) + responseHeaders.Set("Content-Type", "application/json") + responseHeaders.Set("Content-Length", strconv.Itoa(len(payload))) + if h.origin != "" { + responseHeaders.Set("Access-Control-Allow-Origin", h.origin) + } + + h.rw.WriteString("HTTP/1.1 200 OK\r\n") + responseHeaders.Write(h.rw) + h.rw.WriteString("\r\n") + + return h.rw.Write(payload) +} + +// Close will close the underlying TCP connection this instance has taken ownership over. +func (h *httpMessageStream) Close() error { + h.rw.Flush() + return h.conn.Close() +} + +// TimeFormat is the time format to use with time.Parse and time.Time.Format when +// parsing or generating times in HTTP headers. It is like time.RFC1123 but hard +// codes GMT as the time zone. +const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT" + +// httpTimestamp formats the given t as specified in RFC1123. +func httpTimestamp(t time.Time) []byte { + const days = "SunMonTueWedThuFriSat" + const months = "JanFebMarAprMayJunJulAugSepOctNovDec" + + b := make([]byte, 0) + t = t.UTC() + yy, mm, dd := t.Date() + hh, mn, ss := t.Clock() + day := days[3 * t.Weekday():] + mon := months[3 * (mm - 1):] + + return append(b, + day[0], day[1], day[2], ',', ' ', + byte('0' + dd / 10), byte('0' + dd % 10), ' ', + mon[0], mon[1], mon[2], ' ', + byte('0' + yy / 1000), byte('0' + (yy / 100) % 10), byte('0' + (yy / 10) % 10), byte('0' + yy % 10), ' ', + byte('0' + hh / 10), byte('0' + hh % 10), ':', + byte('0' + mn / 10), byte('0' + mn % 10), ':', + byte('0' + ss / 10), byte('0' + ss % 10), ' ', + 'G', 'M', 'T') +} + +// httpConnHijacker is a http.Handler implementation that will hijack the HTTP +// connection, wraps it in a HttpMessageStream that is then wrapped in a JSON +// codec which will be served on the rpcServer. +type httpConnHijacker struct { + corsdomains []string + rpcServer *Server +} + +// ServeHTTP will hijack the connection, wraps the captured connection in a +// HttpMessageStream which is then used as codec. +func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) { + hj, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError) + return + } + + conn, rwbuf, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + httpRequestStream := NewHTTPMessageStream(conn, rwbuf, req, h.corsdomains) + + codec := NewJSONCodec(httpRequestStream) + go h.rpcServer.ServeCodec(codec) +} + +// StartHTTP will start the JSONRPC HTTP RPC interface when its not yet running. +func StartHTTP(address string, port int, corsdomains []string, apis []API) error { + httpServerMu.Lock() + defer httpServerMu.Unlock() + + if httpRPCServer != nil { + return fmt.Errorf("HTTP RPC interface already started on %s", httpListener.Addr()) + } + + rpcServer := NewServer() + + for _, api := range apis { + if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + } + + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) + if err != nil { + return err + } + + httpServer := http.Server{Handler: &httpConnHijacker{corsdomains, rpcServer}} + go httpServer.Serve(listener) + + httpListener = listener + httpRPCServer = rpcServer + + return nil +} + +// StopHTTP will stop the running HTTP interface. If it is not running an error will be returned. +func StopHTTP() error { + httpServerMu.Lock() + defer httpServerMu.Unlock() + + if httpRPCServer == nil { + return errors.New("HTTP RPC interface not started") + } + + httpListener.Close() + httpRPCServer.Stop() + + httpRPCServer = nil + httpListener = nil + + return nil +} + +// httpClient connects to a geth RPC server over HTTP. +type httpClient struct { + endpoint *url.URL // HTTP-RPC server endpoint + lastRes []byte // HTTP requests are synchronous, store last response +} + +// NewHTTPClient create a new RPC clients that connection to a geth RPC server +// over HTTP. +func NewHTTPClient(endpoint string) (*httpClient, error) { + url, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + return &httpClient{endpoint: url}, nil +} + +// Send will serialize the given msg to JSON and sends it to the RPC server. +// Since HTTP is synchronous the response is stored until Recv is called. +func (client *httpClient) Send(msg interface{}) error { + var body []byte + var err error + + client.lastRes = nil + + if body, err = json.Marshal(msg); err != nil { + return err + } + + httpReq, err := http.NewRequest("POST", client.endpoint.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + httpReq.Header.Set("Content-Type", "application/json") + + httpClient := http.Client{} + resp, err := httpClient.Do(httpReq) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + client.lastRes, err = ioutil.ReadAll(resp.Body) + return err + } + + return fmt.Errorf("unable to handle request") +} + +// Recv will try to deserialize the last received response into the given msg. +func (client *httpClient) Recv(msg interface{}) error { + return json.Unmarshal(client.lastRes, &msg) +} + +// Close is not necessary for httpClient +func (client *httpClient) Close() { +} + +// SupportedModules will return the collection of offered RPC modules. +func (client *httpClient) SupportedModules() (map[string]string, error) { + return SupportedModules(client) +} |