From 2e98631c5e5724bea1a29b877929b4911bf50b86 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 15 Oct 2018 10:56:04 +0200 Subject: rpc: fix client shutdown hang when Close races with Unsubscribe (#17894) Fixes #17837 --- rpc/client.go | 59 ++++++++++-------------------------------------- rpc/client_test.go | 24 ++++++++++++++++++++ rpc/stdio.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 47 deletions(-) create mode 100644 rpc/stdio.go diff --git a/rpc/client.go b/rpc/client.go index d96189a2d..6254c95ff 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -25,7 +25,6 @@ import ( "fmt" "net" "net/url" - "os" "reflect" "strconv" "strings" @@ -118,7 +117,8 @@ type Client struct { // for dispatch close chan struct{} - didQuit chan struct{} // closed when client quits + closing chan struct{} // closed when client is quitting + didClose chan struct{} // closed when client quits reconnected chan net.Conn // where write/reconnect sends the new connection readErr chan error // errors from read readResp chan []*jsonrpcMessage // valid messages from read @@ -181,45 +181,6 @@ func DialContext(ctx context.Context, rawurl string) (*Client, error) { } } -type StdIOConn struct{} - -func (io StdIOConn) Read(b []byte) (n int, err error) { - return os.Stdin.Read(b) -} - -func (io StdIOConn) Write(b []byte) (n int, err error) { - return os.Stdout.Write(b) -} - -func (io StdIOConn) Close() error { - return nil -} - -func (io StdIOConn) LocalAddr() net.Addr { - return &net.UnixAddr{Name: "stdio", Net: "stdio"} -} - -func (io StdIOConn) RemoteAddr() net.Addr { - return &net.UnixAddr{Name: "stdio", Net: "stdio"} -} - -func (io StdIOConn) SetDeadline(t time.Time) error { - return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} -} - -func (io StdIOConn) SetReadDeadline(t time.Time) error { - return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} -} - -func (io StdIOConn) SetWriteDeadline(t time.Time) error { - return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} -} -func DialStdIO(ctx context.Context) (*Client, error) { - return newClient(ctx, func(_ context.Context) (net.Conn, error) { - return StdIOConn{}, nil - }) -} - func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) { conn, err := connectFunc(initctx) if err != nil { @@ -231,7 +192,8 @@ func newClient(initctx context.Context, connectFunc func(context.Context) (net.C isHTTP: isHTTP, connectFunc: connectFunc, close: make(chan struct{}), - didQuit: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), reconnected: make(chan net.Conn), readErr: make(chan error), readResp: make(chan []*jsonrpcMessage), @@ -268,8 +230,8 @@ func (c *Client) Close() { } select { case c.close <- struct{}{}: - <-c.didQuit - case <-c.didQuit: + <-c.didClose + case <-c.didClose: } } @@ -469,7 +431,9 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error // This can happen if the client is overloaded or unable to keep up with // subscription notifications. return ctx.Err() - case <-c.didQuit: + case <-c.closing: + return ErrClientQuit + case <-c.didClose: return ErrClientQuit } } @@ -504,7 +468,7 @@ func (c *Client) reconnect(ctx context.Context) error { case c.reconnected <- newconn: c.writeConn = newconn return nil - case <-c.didQuit: + case <-c.didClose: newconn.Close() return ErrClientQuit } @@ -522,8 +486,9 @@ func (c *Client) dispatch(conn net.Conn) { requestOpLock = c.requestOp // nil while the send lock is held reading = true // if true, a read loop is running ) - defer close(c.didQuit) + defer close(c.didClose) defer func() { + close(c.closing) c.closeRequestOps(ErrClientQuit) conn.Close() if reading { diff --git a/rpc/client_test.go b/rpc/client_test.go index 4f354d389..a8195c0af 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -323,6 +323,30 @@ func TestClientSubscribeClose(t *testing.T) { } } +// This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the +// client hangs during shutdown when Unsubscribe races with Client.Close. +func TestClientCloseUnsubscribeRace(t *testing.T) { + service := &NotificationTestService{} + server := newTestServer("eth", service) + defer server.Stop() + + for i := 0; i < 20; i++ { + client := DialInProc(server) + nc := make(chan int) + sub, err := client.EthSubscribe(context.Background(), nc, "someSubscription", 3, 1) + if err != nil { + t.Fatal(err) + } + go client.Close() + go sub.Unsubscribe() + select { + case <-sub.Err(): + case <-time.After(5 * time.Second): + t.Fatal("subscription not closed within timeout") + } + } +} + // This test checks that Client doesn't lock up when a single subscriber // doesn't read subscription events. func TestClientNotificationStorm(t *testing.T) { diff --git a/rpc/stdio.go b/rpc/stdio.go new file mode 100644 index 000000000..ea552cca2 --- /dev/null +++ b/rpc/stdio.go @@ -0,0 +1,66 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "errors" + "net" + "os" + "time" +) + +// DialStdIO creates a client on stdin/stdout. +func DialStdIO(ctx context.Context) (*Client, error) { + return newClient(ctx, func(_ context.Context) (net.Conn, error) { + return stdioConn{}, nil + }) +} + +type stdioConn struct{} + +func (io stdioConn) Read(b []byte) (n int, err error) { + return os.Stdin.Read(b) +} + +func (io stdioConn) Write(b []byte) (n int, err error) { + return os.Stdout.Write(b) +} + +func (io stdioConn) Close() error { + return nil +} + +func (io stdioConn) LocalAddr() net.Addr { + return &net.UnixAddr{Name: "stdio", Net: "stdio"} +} + +func (io stdioConn) RemoteAddr() net.Addr { + return &net.UnixAddr{Name: "stdio", Net: "stdio"} +} + +func (io stdioConn) SetDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} +} + +func (io stdioConn) SetReadDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} +} + +func (io stdioConn) SetWriteDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} +} -- cgit v1.2.3