aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-09-25 16:08:07 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-09-25 16:08:07 +0800
commit9feec51e2dd754819e5c730ac5985d28d57adb48 (patch)
tree32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /rpc
parent673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff)
downloaddexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.bz2
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.lz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.xz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.zip
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which can be used to run simulated networks of devp2p nodes. The intention is to use this for testing protocols, performing benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'rpc')
-rw-r--r--rpc/client.go64
-rw-r--r--rpc/client_test.go32
2 files changed, 46 insertions, 50 deletions
diff --git a/rpc/client.go b/rpc/client.go
index f02366a39..8aa84ec98 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -349,85 +349,49 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
return err
}
-// ShhSubscribe calls the "shh_subscribe" method with the given arguments,
-// registering a subscription. Server notifications for the subscription are
-// sent to the given channel. The element type of the channel must match the
-// expected type of content returned by the subscription.
-//
-// The context argument cancels the RPC request that sets up the subscription but has no
-// effect on the subscription after ShhSubscribe has returned.
-//
-// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
-// before considering the subscriber dead. The subscription Err channel will receive
-// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
-// that the channel usually has at least one reader to prevent this issue.
-func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
- // Check type of channel first.
- chanVal := reflect.ValueOf(channel)
- if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
- panic("first argument to ShhSubscribe must be a writable channel")
- }
- if chanVal.IsNil() {
- panic("channel given to ShhSubscribe must not be nil")
- }
- if c.isHTTP {
- return nil, ErrNotificationsUnsupported
- }
-
- msg, err := c.newMessage("shh"+subscribeMethodSuffix, args...)
- if err != nil {
- return nil, err
- }
- op := &requestOp{
- ids: []json.RawMessage{msg.ID},
- resp: make(chan *jsonrpcMessage),
- sub: newClientSubscription(c, "shh", chanVal),
- }
+// EthSubscribe registers a subscripion under the "eth" namespace.
+func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ return c.Subscribe(ctx, "eth", channel, args...)
+}
- // Send the subscription request.
- // The arrival and validity of the response is signaled on sub.quit.
- if err := c.send(ctx, op, msg); err != nil {
- return nil, err
- }
- if _, err := op.wait(ctx); err != nil {
- return nil, err
- }
- return op.sub, nil
+// ShhSubscribe registers a subscripion under the "shh" namespace.
+func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ return c.Subscribe(ctx, "shh", channel, args...)
}
-// EthSubscribe calls the "eth_subscribe" method with the given arguments,
+// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
//
// The context argument cancels the RPC request that sets up the subscription but has no
-// effect on the subscription after EthSubscribe has returned.
+// effect on the subscription after Subscribe has returned.
//
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
-func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
- panic("first argument to EthSubscribe must be a writable channel")
+ panic("first argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
- panic("channel given to EthSubscribe must not be nil")
+ panic("channel given to Subscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
- msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...)
+ msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
- sub: newClientSubscription(c, "eth", chanVal),
+ sub: newClientSubscription(c, namespace, chanVal),
}
// Send the subscription request.
diff --git a/rpc/client_test.go b/rpc/client_test.go
index 10d74670b..4f354d389 100644
--- a/rpc/client_test.go
+++ b/rpc/client_test.go
@@ -251,6 +251,38 @@ func TestClientSubscribe(t *testing.T) {
}
}
+func TestClientSubscribeCustomNamespace(t *testing.T) {
+ namespace := "custom"
+ server := newTestServer(namespace, new(NotificationTestService))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ nc := make(chan int)
+ count := 10
+ sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0)
+ if err != nil {
+ t.Fatal("can't subscribe:", err)
+ }
+ for i := 0; i < count; i++ {
+ if val := <-nc; val != i {
+ t.Fatalf("value mismatch: got %d, want %d", val, i)
+ }
+ }
+
+ sub.Unsubscribe()
+ select {
+ case v := <-nc:
+ t.Fatal("received value after unsubscribe:", v)
+ case err := <-sub.Err():
+ if err != nil {
+ t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("subscription not closed within 1s after unsubscribe")
+ }
+}
+
// In this test, the connection drops while EthSubscribe is
// waiting for a response.
func TestClientSubscribeClose(t *testing.T) {