From 9feec51e2dd754819e5c730ac5985d28d57adb48 Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Mon, 25 Sep 2017 09:08:07 +0100 Subject: p2p: add network simulation framework (#14982) This commit introduces a network simulation framework which can be used to run simulated networks of devp2p nodes. The intention is to use this for testing protocols, performing benchmarks and visualising emergent network behaviour. --- rpc/client.go | 64 ++++++++++++------------------------------------------ rpc/client_test.go | 32 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 50 deletions(-) (limited to 'rpc') diff --git a/rpc/client.go b/rpc/client.go index f02366a39..8aa84ec98 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -349,85 +349,49 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { return err } -// ShhSubscribe calls the "shh_subscribe" method with the given arguments, -// registering a subscription. Server notifications for the subscription are -// sent to the given channel. The element type of the channel must match the -// expected type of content returned by the subscription. -// -// The context argument cancels the RPC request that sets up the subscription but has no -// effect on the subscription after ShhSubscribe has returned. -// -// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications -// before considering the subscriber dead. The subscription Err channel will receive -// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure -// that the channel usually has at least one reader to prevent this issue. -func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - // Check type of channel first. - chanVal := reflect.ValueOf(channel) - if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic("first argument to ShhSubscribe must be a writable channel") - } - if chanVal.IsNil() { - panic("channel given to ShhSubscribe must not be nil") - } - if c.isHTTP { - return nil, ErrNotificationsUnsupported - } - - msg, err := c.newMessage("shh"+subscribeMethodSuffix, args...) - if err != nil { - return nil, err - } - op := &requestOp{ - ids: []json.RawMessage{msg.ID}, - resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, "shh", chanVal), - } +// EthSubscribe registers a subscripion under the "eth" namespace. +func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "eth", channel, args...) +} - // Send the subscription request. - // The arrival and validity of the response is signaled on sub.quit. - if err := c.send(ctx, op, msg); err != nil { - return nil, err - } - if _, err := op.wait(ctx); err != nil { - return nil, err - } - return op.sub, nil +// ShhSubscribe registers a subscripion under the "shh" namespace. +func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "shh", channel, args...) } -// EthSubscribe calls the "eth_subscribe" method with the given arguments, +// Subscribe calls the "_subscribe" method with the given arguments, // registering a subscription. Server notifications for the subscription are // sent to the given channel. The element type of the channel must match the // expected type of content returned by the subscription. // // The context argument cancels the RPC request that sets up the subscription but has no -// effect on the subscription after EthSubscribe has returned. +// effect on the subscription after Subscribe has returned. // // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications // before considering the subscriber dead. The subscription Err channel will receive // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure // that the channel usually has at least one reader to prevent this issue. -func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { +func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { // Check type of channel first. chanVal := reflect.ValueOf(channel) if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic("first argument to EthSubscribe must be a writable channel") + panic("first argument to Subscribe must be a writable channel") } if chanVal.IsNil() { - panic("channel given to EthSubscribe must not be nil") + panic("channel given to Subscribe must not be nil") } if c.isHTTP { return nil, ErrNotificationsUnsupported } - msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...) + msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) if err != nil { return nil, err } op := &requestOp{ ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, "eth", chanVal), + sub: newClientSubscription(c, namespace, chanVal), } // Send the subscription request. diff --git a/rpc/client_test.go b/rpc/client_test.go index 10d74670b..4f354d389 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -251,6 +251,38 @@ func TestClientSubscribe(t *testing.T) { } } +func TestClientSubscribeCustomNamespace(t *testing.T) { + namespace := "custom" + server := newTestServer(namespace, new(NotificationTestService)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + nc := make(chan int) + count := 10 + sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + for i := 0; i < count; i++ { + if val := <-nc; val != i { + t.Fatalf("value mismatch: got %d, want %d", val, i) + } + } + + sub.Unsubscribe() + select { + case v := <-nc: + t.Fatal("received value after unsubscribe:", v) + case err := <-sub.Err(): + if err != nil { + t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("subscription not closed within 1s after unsubscribe") + } +} + // In this test, the connection drops while EthSubscribe is // waiting for a response. func TestClientSubscribeClose(t *testing.T) { -- cgit v1.2.3