aboutsummaryrefslogtreecommitdiffstats
path: root/node
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-09-25 16:08:07 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-09-25 16:08:07 +0800
commit9feec51e2dd754819e5c730ac5985d28d57adb48 (patch)
tree32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /node
parent673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff)
downloaddexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.bz2
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.lz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.xz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.zip
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which can be used to run simulated networks of devp2p nodes. The intention is to use this for testing protocols, performing benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'node')
-rw-r--r--node/api.go42
-rw-r--r--node/config.go7
-rw-r--r--node/node.go19
3 files changed, 63 insertions, 5 deletions
diff --git a/node/api.go b/node/api.go
index 570cb9d98..1b04b7093 100644
--- a/node/api.go
+++ b/node/api.go
@@ -17,6 +17,7 @@
package node
import (
+ "context"
"fmt"
"strings"
"time"
@@ -25,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
"github.com/rcrowley/go-metrics"
)
@@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}
+// PeerEvents creates an RPC subscription which receives peer events from the
+// node's p2p.Server
+func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
+ // Make sure the server is running, fail otherwise
+ server := api.node.Server()
+ if server == nil {
+ return nil, ErrNodeStopped
+ }
+
+ // Create the subscription
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ events := make(chan *p2p.PeerEvent)
+ sub := server.SubscribeEvents(events)
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case event := <-events:
+ notifier.Notify(rpcSub.ID, event)
+ case <-sub.Err():
+ return
+ case <-rpcSub.Err():
+ return
+ case <-notifier.Closed():
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
// StartRPC starts the HTTP RPC API server.
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) {
api.node.lock.Lock()
@@ -163,7 +203,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
}
}
- if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins); err != nil {
+ if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins, api.node.config.WSExposeAll); err != nil {
return false, err
}
return true, nil
diff --git a/node/config.go b/node/config.go
index b9b5e5b92..be9e21b4f 100644
--- a/node/config.go
+++ b/node/config.go
@@ -128,6 +128,13 @@ type Config struct {
// If the module list is empty, all RPC API endpoints designated public will be
// exposed.
WSModules []string `toml:",omitempty"`
+
+ // WSExposeAll exposes all API modules via the WebSocket RPC interface rather
+ // than just the public ones.
+ //
+ // *WARNING* Only set this if the node is running in a trusted network, exposing
+ // private APIs to untrusted users is a major security risk.
+ WSExposeAll bool `toml:",omitempty"`
}
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
diff --git a/node/node.go b/node/node.go
index 86cfb29ba..6f189d8fe 100644
--- a/node/node.go
+++ b/node/node.go
@@ -261,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
- if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
+ if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
@@ -412,7 +412,7 @@ func (n *Node) stopHTTP() {
}
// startWS initializes and starts the websocket RPC endpoint.
-func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error {
+func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
@@ -425,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
- if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
+ if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
@@ -441,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
- log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint))
+ log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
@@ -556,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
return rpc.DialInProc(n.inprocHandler), nil
}
+// RPCHandler returns the in-process RPC request handler.
+func (n *Node) RPCHandler() (*rpc.Server, error) {
+ n.lock.RLock()
+ defer n.lock.RUnlock()
+
+ if n.inprocHandler == nil {
+ return nil, ErrNodeStopped
+ }
+ return n.inprocHandler, nil
+}
+
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.