diff options
author | Felix Lange <fjl@twurst.com> | 2016-07-12 23:47:15 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-07-23 05:21:27 +0800 |
commit | 91b769042857f542b2792b23ec407e1c9bd4fe8d (patch) | |
tree | f6730b3e85a7ac5ca98f9a716505349958fcacd3 /rpc/http.go | |
parent | bb01bea4e276dad359815c682a2dee730737f4dc (diff) | |
download | go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.gz go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.bz2 go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.lz go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.xz go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.zst go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.zip |
rpc: add new client, use it everywhere
The new client implementation supports concurrent requests,
subscriptions and replaces the various ad hoc RPC clients
throughout go-ethereum.
Diffstat (limited to 'rpc/http.go')
-rw-r--r-- | rpc/http.go | 163 |
1 files changed, 98 insertions, 65 deletions
diff --git a/rpc/http.go b/rpc/http.go index 9283ce0ec..afcdd4bd6 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -22,71 +22,108 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" - "net/url" "strings" + "sync" + "time" "github.com/rs/cors" + "golang.org/x/net/context" ) const ( maxHTTPRequestContentLength = 1024 * 128 ) -// httpClient connects to a geth RPC server over HTTP. -type httpClient struct { - endpoint *url.URL // HTTP-RPC server endpoint - httpClient http.Client // reuse connection - lastRes []byte // HTTP requests are synchronous, store last response +var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0") + +type httpConn struct { + client *http.Client + req *http.Request + closeOnce sync.Once + closed chan struct{} +} + +// httpConn is treated specially by Client. +func (hc *httpConn) LocalAddr() net.Addr { return nullAddr } +func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr } +func (hc *httpConn) SetReadDeadline(time.Time) error { return nil } +func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil } +func (hc *httpConn) SetDeadline(time.Time) error { return nil } +func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") } + +func (hc *httpConn) Read(b []byte) (int, error) { + <-hc.closed + return 0, io.EOF +} + +func (hc *httpConn) Close() error { + hc.closeOnce.Do(func() { close(hc.closed) }) + return nil } -// NewHTTPClient create a new RPC clients that connection to a geth RPC server -// over HTTP. -func NewHTTPClient(endpoint string) (Client, error) { - url, err := url.Parse(endpoint) +// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP. +func DialHTTP(endpoint string) (*Client, error) { + req, err := http.NewRequest("POST", endpoint, nil) if err != nil { return nil, err } - return &httpClient{endpoint: url}, nil -} + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") -// Send will serialize the given msg to JSON and sends it to the RPC server. -// Since HTTP is synchronous the response is stored until Recv is called. -func (client *httpClient) Send(msg interface{}) error { - var body []byte - var err error + initctx := context.Background() + return newClient(initctx, func(context.Context) (net.Conn, error) { + return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil + }) +} - client.lastRes = nil - if body, err = json.Marshal(msg); err != nil { +func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msg) + if err != nil { return err } + defer respBody.Close() + var respmsg jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { + return err + } + op.resp <- &respmsg + return nil +} - resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body)) +func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msgs) if err != nil { return err } - - defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { - client.lastRes, err = ioutil.ReadAll(resp.Body) + defer respBody.Close() + var respmsgs []jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } - - return fmt.Errorf("request failed: %s", resp.Status) -} - -// Recv will try to deserialize the last received response into the given msg. -func (client *httpClient) Recv(msg interface{}) error { - return json.Unmarshal(client.lastRes, &msg) + for _, respmsg := range respmsgs { + op.resp <- &respmsg + } + return nil } -// Close is not necessary for httpClient -func (client *httpClient) Close() { -} +func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { + body, err := json.Marshal(msg) + if err != nil { + return nil, err + } + client, req := requestWithContext(hc.client, hc.req, ctx) + req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req.ContentLength = int64(len(body)) -// SupportedModules will return the collection of offered RPC modules. -func (client *httpClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + return resp.Body, nil } // httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method. @@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error { return nil } -// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests, -// send the request to the given API provider and sends the response back to the caller. -func newJSONHTTPHandler(srv *Server) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.ContentLength > maxHTTPRequestContentLength { - http.Error(w, - fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), - http.StatusRequestEntityTooLarge) - return - } - - w.Header().Set("content-type", "application/json") - - // create a codec that reads direct from the request body until - // EOF and writes the response to w and order the server to process - // a single request. - codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) - defer codec.Close() - srv.ServeSingleRequest(codec, OptionMethodInvocation) +// NewHTTPServer creates a new HTTP RPC server around an API provider. +// +// Deprecated: Server implements http.Handler +func NewHTTPServer(corsString string, srv *Server) *http.Server { + return &http.Server{Handler: newCorsHandler(srv, corsString)} +} + +// ServeHTTP serves JSON-RPC requests over HTTP. +func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > maxHTTPRequestContentLength { + http.Error(w, + fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), + http.StatusRequestEntityTooLarge) + return } + w.Header().Set("content-type", "application/json") + + // create a codec that reads direct from the request body until + // EOF and writes the response to w and order the server to process + // a single request. + codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) + defer codec.Close() + srv.ServeSingleRequest(codec, OptionMethodInvocation) } -// NewHTTPServer creates a new HTTP RPC server around an API provider. -func NewHTTPServer(corsString string, srv *Server) *http.Server { +func newCorsHandler(srv *Server, corsString string) http.Handler { var allowedOrigins []string for _, domain := range strings.Split(corsString, ",") { allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain)) } - c := cors.New(cors.Options{ AllowedOrigins: allowedOrigins, AllowedMethods: []string{"POST", "GET"}, }) - - handler := c.Handler(newJSONHTTPHandler(srv)) - - return &http.Server{ - Handler: handler, - } + return c.Handler(srv) } |