aboutsummaryrefslogtreecommitdiffstats
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/api.go42
-rw-r--r--node/config.go22
-rw-r--r--node/doc.go2
-rw-r--r--node/errors.go18
-rw-r--r--node/node.go57
-rw-r--r--node/service.go6
-rw-r--r--node/utils_test.go2
7 files changed, 111 insertions, 38 deletions
diff --git a/node/api.go b/node/api.go
index 570cb9d98..1b04b7093 100644
--- a/node/api.go
+++ b/node/api.go
@@ -17,6 +17,7 @@
package node
import (
+ "context"
"fmt"
"strings"
"time"
@@ -25,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
"github.com/rcrowley/go-metrics"
)
@@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}
+// PeerEvents creates an RPC subscription which receives peer events from the
+// node's p2p.Server
+func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
+ // Make sure the server is running, fail otherwise
+ server := api.node.Server()
+ if server == nil {
+ return nil, ErrNodeStopped
+ }
+
+ // Create the subscription
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ events := make(chan *p2p.PeerEvent)
+ sub := server.SubscribeEvents(events)
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case event := <-events:
+ notifier.Notify(rpcSub.ID, event)
+ case <-sub.Err():
+ return
+ case <-rpcSub.Err():
+ return
+ case <-notifier.Closed():
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
// StartRPC starts the HTTP RPC API server.
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) {
api.node.lock.Lock()
@@ -163,7 +203,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
}
}
- if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins); err != nil {
+ if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins, api.node.config.WSExposeAll); err != nil {
return false, err
}
return true, nil
diff --git a/node/config.go b/node/config.go
index 61e0008ef..be9e21b4f 100644
--- a/node/config.go
+++ b/node/config.go
@@ -35,7 +35,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
)
-var (
+const (
datadirPrivateKey = "nodekey" // Path within the datadir to the node's private key
datadirDefaultKeyStore = "keystore" // Path within the datadir to the keystore
datadirStaticNodes = "static-nodes.json" // Path within the datadir to the static node list
@@ -128,6 +128,13 @@ type Config struct {
// If the module list is empty, all RPC API endpoints designated public will be
// exposed.
WSModules []string `toml:",omitempty"`
+
+ // WSExposeAll exposes all API modules via the WebSocket RPC interface rather
+ // than just the public ones.
+ //
+ // *WARNING* Only set this if the node is running in a trusted network, exposing
+ // private APIs to untrusted users is a major security risk.
+ WSExposeAll bool `toml:",omitempty"`
}
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
@@ -160,7 +167,7 @@ func (c *Config) NodeDB() string {
if c.DataDir == "" {
return "" // ephemeral
}
- return c.resolvePath("nodes")
+ return c.resolvePath(datadirNodeDatabase)
}
// DefaultIPCEndpoint returns the IPC path used by default.
@@ -316,8 +323,8 @@ func (c *Config) StaticNodes() []*discover.Node {
return c.parsePersistentNodes(c.resolvePath(datadirStaticNodes))
}
-// TrusterNodes returns a list of node enode URLs configured as trusted nodes.
-func (c *Config) TrusterNodes() []*discover.Node {
+// TrustedNodes returns a list of node enode URLs configured as trusted nodes.
+func (c *Config) TrustedNodes() []*discover.Node {
return c.parsePersistentNodes(c.resolvePath(datadirTrustedNodes))
}
@@ -393,11 +400,18 @@ func makeAccountManager(conf *Config) (*accounts.Manager, string, error) {
keystore.NewKeyStore(keydir, scryptN, scryptP),
}
if !conf.NoUSB {
+ // Start a USB hub for Ledger hardware wallets
if ledgerhub, err := usbwallet.NewLedgerHub(); err != nil {
log.Warn(fmt.Sprintf("Failed to start Ledger hub, disabling: %v", err))
} else {
backends = append(backends, ledgerhub)
}
+ // Start a USB hub for Trezor hardware wallets
+ if trezorhub, err := usbwallet.NewTrezorHub(); err != nil {
+ log.Warn(fmt.Sprintf("Failed to start Trezor hub, disabling: %v", err))
+ } else {
+ backends = append(backends, trezorhub)
+ }
}
return accounts.NewManager(backends...), ephemeral, nil
}
diff --git a/node/doc.go b/node/doc.go
index f009e6f85..d9688e0a1 100644
--- a/node/doc.go
+++ b/node/doc.go
@@ -68,7 +68,7 @@ unless its location is changed through the KeyStoreDir configuration option.
Data Directory Sharing Example
-In this exanple, two node instances named A and B are started with the same data
+In this example, two node instances named A and B are started with the same data
directory. Mode instance A opens the database "db", node instance B opens the databases
"db" and "db-2". The following files will be created in the data directory:
diff --git a/node/errors.go b/node/errors.go
index bd5ddeb5d..2e0dadc4d 100644
--- a/node/errors.go
+++ b/node/errors.go
@@ -17,10 +17,28 @@
package node
import (
+ "errors"
"fmt"
"reflect"
+ "syscall"
)
+var (
+ ErrDatadirUsed = errors.New("datadir already used by another process")
+ ErrNodeStopped = errors.New("node not started")
+ ErrNodeRunning = errors.New("node already running")
+ ErrServiceUnknown = errors.New("unknown service")
+
+ datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
+)
+
+func convertFileLockError(err error) error {
+ if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
+ return ErrDatadirUsed
+ }
+ return err
+}
+
// DuplicateServiceError is returned during Node startup if a registered service
// constructor returns a service of the same type that was already started.
type DuplicateServiceError struct {
diff --git a/node/node.go b/node/node.go
index a372b1c25..6f189d8fe 100644
--- a/node/node.go
+++ b/node/node.go
@@ -25,7 +25,6 @@ import (
"reflect"
"strings"
"sync"
- "syscall"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/ethdb"
@@ -34,16 +33,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
- "github.com/syndtr/goleveldb/leveldb/storage"
-)
-
-var (
- ErrDatadirUsed = errors.New("datadir already used")
- ErrNodeStopped = errors.New("node not started")
- ErrNodeRunning = errors.New("node already running")
- ErrServiceUnknown = errors.New("unknown service")
-
- datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
+ "github.com/prometheus/prometheus/util/flock"
)
// Node is a container on which services can be registered.
@@ -52,8 +42,8 @@ type Node struct {
config *Config
accman *accounts.Manager
- ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
- instanceDirLock storage.Storage // prevents concurrent use of instance directory
+ ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
+ instanceDirLock flock.Releaser // prevents concurrent use of instance directory
serverConfig p2p.Config
server *p2p.Server // Currently running P2P networking layer
@@ -160,7 +150,7 @@ func (n *Node) Start() error {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
- n.serverConfig.TrustedNodes = n.config.TrusterNodes()
+ n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
@@ -197,10 +187,7 @@ func (n *Node) Start() error {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
- if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
- return ErrDatadirUsed
- }
- return err
+ return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
@@ -242,14 +229,13 @@ func (n *Node) openDataDir() error {
if err := os.MkdirAll(instdir, 0700); err != nil {
return err
}
- // Try to open the instance directory as LevelDB storage. This creates a lock file
- // which prevents concurrent use by another instance as well as accidental use of the
- // instance directory as a database.
- storage, err := storage.OpenFile(instdir, true)
+ // Lock the instance directory to prevent concurrent use by another instance as well as
+ // accidental use of the instance directory as a database.
+ release, _, err := flock.New(filepath.Join(instdir, "LOCK"))
if err != nil {
- return err
+ return convertFileLockError(err)
}
- n.instanceDirLock = storage
+ n.instanceDirLock = release
return nil
}
@@ -275,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
- if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
+ if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
@@ -426,7 +412,7 @@ func (n *Node) stopHTTP() {
}
// startWS initializes and starts the websocket RPC endpoint.
-func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error {
+func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
@@ -439,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
- if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
+ if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
@@ -455,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
- log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint))
+ log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
@@ -509,7 +495,9 @@ func (n *Node) Stop() error {
// Release instance directory lock.
if n.instanceDirLock != nil {
- n.instanceDirLock.Close()
+ if err := n.instanceDirLock.Release(); err != nil {
+ log.Error("Can't release datadir lock", "err", err)
+ }
n.instanceDirLock = nil
}
@@ -568,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
return rpc.DialInProc(n.inprocHandler), nil
}
+// RPCHandler returns the in-process RPC request handler.
+func (n *Node) RPCHandler() (*rpc.Server, error) {
+ n.lock.RLock()
+ defer n.lock.RUnlock()
+
+ if n.inprocHandler == nil {
+ return nil, ErrNodeStopped
+ }
+ return n.inprocHandler, nil
+}
+
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.
diff --git a/node/service.go b/node/service.go
index 5e1eb0e64..55062a500 100644
--- a/node/service.go
+++ b/node/service.go
@@ -43,7 +43,11 @@ func (ctx *ServiceContext) OpenDatabase(name string, cache int, handles int) (et
if ctx.config.DataDir == "" {
return ethdb.NewMemDatabase()
}
- return ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles)
+ db, err := ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles)
+ if err != nil {
+ return nil, err
+ }
+ return db, nil
}
// ResolvePath resolves a user path into the data directory if that was relative
diff --git a/node/utils_test.go b/node/utils_test.go
index 7cdfc2b3a..8eddce3ed 100644
--- a/node/utils_test.go
+++ b/node/utils_test.go
@@ -41,12 +41,10 @@ func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService),
type NoopServiceA struct{ NoopService }
type NoopServiceB struct{ NoopService }
type NoopServiceC struct{ NoopService }
-type NoopServiceD struct{ NoopService }
func NewNoopServiceA(*ServiceContext) (Service, error) { return new(NoopServiceA), nil }
func NewNoopServiceB(*ServiceContext) (Service, error) { return new(NoopServiceB), nil }
func NewNoopServiceC(*ServiceContext) (Service, error) { return new(NoopServiceC), nil }
-func NewNoopServiceD(*ServiceContext) (Service, error) { return new(NoopServiceD), nil }
// InstrumentedService is an implementation of Service for which all interface
// methods can be instrumented both return value as well as event hook wise.