aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/http.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-07-12 23:47:15 +0800
committerFelix Lange <fjl@twurst.com>2016-07-23 05:21:27 +0800
commit91b769042857f542b2792b23ec407e1c9bd4fe8d (patch)
treef6730b3e85a7ac5ca98f9a716505349958fcacd3 /rpc/http.go
parentbb01bea4e276dad359815c682a2dee730737f4dc (diff)
downloadgo-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.gz
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.bz2
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.lz
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.xz
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.zst
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.zip
rpc: add new client, use it everywhere
The new client implementation supports concurrent requests, subscriptions and replaces the various ad hoc RPC clients throughout go-ethereum.
Diffstat (limited to 'rpc/http.go')
-rw-r--r--rpc/http.go163
1 files changed, 98 insertions, 65 deletions
diff --git a/rpc/http.go b/rpc/http.go
index 9283ce0ec..afcdd4bd6 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -22,71 +22,108 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
- "net/url"
"strings"
+ "sync"
+ "time"
"github.com/rs/cors"
+ "golang.org/x/net/context"
)
const (
maxHTTPRequestContentLength = 1024 * 128
)
-// httpClient connects to a geth RPC server over HTTP.
-type httpClient struct {
- endpoint *url.URL // HTTP-RPC server endpoint
- httpClient http.Client // reuse connection
- lastRes []byte // HTTP requests are synchronous, store last response
+var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
+
+type httpConn struct {
+ client *http.Client
+ req *http.Request
+ closeOnce sync.Once
+ closed chan struct{}
+}
+
+// httpConn is treated specially by Client.
+func (hc *httpConn) LocalAddr() net.Addr { return nullAddr }
+func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr }
+func (hc *httpConn) SetReadDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetDeadline(time.Time) error { return nil }
+func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") }
+
+func (hc *httpConn) Read(b []byte) (int, error) {
+ <-hc.closed
+ return 0, io.EOF
+}
+
+func (hc *httpConn) Close() error {
+ hc.closeOnce.Do(func() { close(hc.closed) })
+ return nil
}
-// NewHTTPClient create a new RPC clients that connection to a geth RPC server
-// over HTTP.
-func NewHTTPClient(endpoint string) (Client, error) {
- url, err := url.Parse(endpoint)
+// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP.
+func DialHTTP(endpoint string) (*Client, error) {
+ req, err := http.NewRequest("POST", endpoint, nil)
if err != nil {
return nil, err
}
- return &httpClient{endpoint: url}, nil
-}
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
-// Send will serialize the given msg to JSON and sends it to the RPC server.
-// Since HTTP is synchronous the response is stored until Recv is called.
-func (client *httpClient) Send(msg interface{}) error {
- var body []byte
- var err error
+ initctx := context.Background()
+ return newClient(initctx, func(context.Context) (net.Conn, error) {
+ return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil
+ })
+}
- client.lastRes = nil
- if body, err = json.Marshal(msg); err != nil {
+func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
+ hc := c.writeConn.(*httpConn)
+ respBody, err := hc.doRequest(ctx, msg)
+ if err != nil {
return err
}
+ defer respBody.Close()
+ var respmsg jsonrpcMessage
+ if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
+ return err
+ }
+ op.resp <- &respmsg
+ return nil
+}
- resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
+func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
+ hc := c.writeConn.(*httpConn)
+ respBody, err := hc.doRequest(ctx, msgs)
if err != nil {
return err
}
-
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- client.lastRes, err = ioutil.ReadAll(resp.Body)
+ defer respBody.Close()
+ var respmsgs []jsonrpcMessage
+ if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
-
- return fmt.Errorf("request failed: %s", resp.Status)
-}
-
-// Recv will try to deserialize the last received response into the given msg.
-func (client *httpClient) Recv(msg interface{}) error {
- return json.Unmarshal(client.lastRes, &msg)
+ for _, respmsg := range respmsgs {
+ op.resp <- &respmsg
+ }
+ return nil
}
-// Close is not necessary for httpClient
-func (client *httpClient) Close() {
-}
+func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
+ body, err := json.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+ client, req := requestWithContext(hc.client, hc.req, ctx)
+ req.Body = ioutil.NopCloser(bytes.NewReader(body))
+ req.ContentLength = int64(len(body))
-// SupportedModules will return the collection of offered RPC modules.
-func (client *httpClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ return resp.Body, nil
}
// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
@@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error {
return nil
}
-// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
-// send the request to the given API provider and sends the response back to the caller.
-func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if r.ContentLength > maxHTTPRequestContentLength {
- http.Error(w,
- fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
- http.StatusRequestEntityTooLarge)
- return
- }
-
- w.Header().Set("content-type", "application/json")
-
- // create a codec that reads direct from the request body until
- // EOF and writes the response to w and order the server to process
- // a single request.
- codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
- defer codec.Close()
- srv.ServeSingleRequest(codec, OptionMethodInvocation)
+// NewHTTPServer creates a new HTTP RPC server around an API provider.
+//
+// Deprecated: Server implements http.Handler
+func NewHTTPServer(corsString string, srv *Server) *http.Server {
+ return &http.Server{Handler: newCorsHandler(srv, corsString)}
+}
+
+// ServeHTTP serves JSON-RPC requests over HTTP.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.ContentLength > maxHTTPRequestContentLength {
+ http.Error(w,
+ fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
+ http.StatusRequestEntityTooLarge)
+ return
}
+ w.Header().Set("content-type", "application/json")
+
+ // create a codec that reads direct from the request body until
+ // EOF and writes the response to w and order the server to process
+ // a single request.
+ codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
+ defer codec.Close()
+ srv.ServeSingleRequest(codec, OptionMethodInvocation)
}
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-func NewHTTPServer(corsString string, srv *Server) *http.Server {
+func newCorsHandler(srv *Server, corsString string) http.Handler {
var allowedOrigins []string
for _, domain := range strings.Split(corsString, ",") {
allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
}
-
c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{"POST", "GET"},
})
-
- handler := c.Handler(newJSONHTTPHandler(srv))
-
- return &http.Server{
- Handler: handler,
- }
+ return c.Handler(srv)
}