aboutsummaryrefslogtreecommitdiffstats
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/api.go42
-rw-r--r--node/config.go7
-rw-r--r--node/node.go19
3 files changed, 63 insertions, 5 deletions
diff --git a/node/api.go b/node/api.go
index 570cb9d98..1b04b7093 100644
--- a/node/api.go
+++ b/node/api.go
@@ -17,6 +17,7 @@
package node
import (
+ "context"
"fmt"
"strings"
"time"
@@ -25,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
"github.com/rcrowley/go-metrics"
)
@@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}
+// PeerEvents creates an RPC subscription which receives peer events from the
+// node's p2p.Server
+func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
+ // Make sure the server is running, fail otherwise
+ server := api.node.Server()
+ if server == nil {
+ return nil, ErrNodeStopped
+ }
+
+ // Create the subscription
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ events := make(chan *p2p.PeerEvent)
+ sub := server.SubscribeEvents(events)
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case event := <-events:
+ notifier.Notify(rpcSub.ID, event)
+ case <-sub.Err():
+ return
+ case <-rpcSub.Err():
+ return
+ case <-notifier.Closed():
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
// StartRPC starts the HTTP RPC API server.
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) {
api.node.lock.Lock()
@@ -163,7 +203,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
}
}
- if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins); err != nil {
+ if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins, api.node.config.WSExposeAll); err != nil {
return false, err
}
return true, nil
diff --git a/node/config.go b/node/config.go
index b9b5e5b92..be9e21b4f 100644
--- a/node/config.go
+++ b/node/config.go
@@ -128,6 +128,13 @@ type Config struct {
// If the module list is empty, all RPC API endpoints designated public will be
// exposed.
WSModules []string `toml:",omitempty"`
+
+ // WSExposeAll exposes all API modules via the WebSocket RPC interface rather
+ // than just the public ones.
+ //
+ // *WARNING* Only set this if the node is running in a trusted network, exposing
+ // private APIs to untrusted users is a major security risk.
+ WSExposeAll bool `toml:",omitempty"`
}
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
diff --git a/node/node.go b/node/node.go
index 86cfb29ba..6f189d8fe 100644
--- a/node/node.go
+++ b/node/node.go
@@ -261,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
- if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
+ if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
@@ -412,7 +412,7 @@ func (n *Node) stopHTTP() {
}
// startWS initializes and starts the websocket RPC endpoint.
-func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error {
+func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
@@ -425,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
- if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
+ if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
@@ -441,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
- log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint))
+ log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
@@ -556,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
return rpc.DialInProc(n.inprocHandler), nil
}
+// RPCHandler returns the in-process RPC request handler.
+func (n *Node) RPCHandler() (*rpc.Server, error) {
+ n.lock.RLock()
+ defer n.lock.RUnlock()
+
+ if n.inprocHandler == nil {
+ return nil, ErrNodeStopped
+ }
+ return n.inprocHandler, nil
+}
+
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.