diff options
Diffstat (limited to 'node')
-rw-r--r-- | node/api.go | 42 | ||||
-rw-r--r-- | node/config.go | 22 | ||||
-rw-r--r-- | node/doc.go | 2 | ||||
-rw-r--r-- | node/errors.go | 18 | ||||
-rw-r--r-- | node/node.go | 57 | ||||
-rw-r--r-- | node/service.go | 6 | ||||
-rw-r--r-- | node/utils_test.go | 2 |
7 files changed, 111 insertions, 38 deletions
diff --git a/node/api.go b/node/api.go index 570cb9d98..1b04b7093 100644 --- a/node/api.go +++ b/node/api.go @@ -17,6 +17,7 @@ package node import ( + "context" "fmt" "strings" "time" @@ -25,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" "github.com/rcrowley/go-metrics" ) @@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) { return true, nil } +// PeerEvents creates an RPC subscription which receives peer events from the +// node's p2p.Server +func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) { + // Make sure the server is running, fail otherwise + server := api.node.Server() + if server == nil { + return nil, ErrNodeStopped + } + + // Create the subscription + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + rpcSub := notifier.CreateSubscription() + + go func() { + events := make(chan *p2p.PeerEvent) + sub := server.SubscribeEvents(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + notifier.Notify(rpcSub.ID, event) + case <-sub.Err(): + return + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + + return rpcSub, nil +} + // StartRPC starts the HTTP RPC API server. func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) { api.node.lock.Lock() @@ -163,7 +203,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str } } - if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins); err != nil { + if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins, api.node.config.WSExposeAll); err != nil { return false, err } return true, nil diff --git a/node/config.go b/node/config.go index 61e0008ef..be9e21b4f 100644 --- a/node/config.go +++ b/node/config.go @@ -35,7 +35,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" ) -var ( +const ( datadirPrivateKey = "nodekey" // Path within the datadir to the node's private key datadirDefaultKeyStore = "keystore" // Path within the datadir to the keystore datadirStaticNodes = "static-nodes.json" // Path within the datadir to the static node list @@ -128,6 +128,13 @@ type Config struct { // If the module list is empty, all RPC API endpoints designated public will be // exposed. WSModules []string `toml:",omitempty"` + + // WSExposeAll exposes all API modules via the WebSocket RPC interface rather + // than just the public ones. + // + // *WARNING* Only set this if the node is running in a trusted network, exposing + // private APIs to untrusted users is a major security risk. + WSExposeAll bool `toml:",omitempty"` } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into @@ -160,7 +167,7 @@ func (c *Config) NodeDB() string { if c.DataDir == "" { return "" // ephemeral } - return c.resolvePath("nodes") + return c.resolvePath(datadirNodeDatabase) } // DefaultIPCEndpoint returns the IPC path used by default. @@ -316,8 +323,8 @@ func (c *Config) StaticNodes() []*discover.Node { return c.parsePersistentNodes(c.resolvePath(datadirStaticNodes)) } -// TrusterNodes returns a list of node enode URLs configured as trusted nodes. -func (c *Config) TrusterNodes() []*discover.Node { +// TrustedNodes returns a list of node enode URLs configured as trusted nodes. +func (c *Config) TrustedNodes() []*discover.Node { return c.parsePersistentNodes(c.resolvePath(datadirTrustedNodes)) } @@ -393,11 +400,18 @@ func makeAccountManager(conf *Config) (*accounts.Manager, string, error) { keystore.NewKeyStore(keydir, scryptN, scryptP), } if !conf.NoUSB { + // Start a USB hub for Ledger hardware wallets if ledgerhub, err := usbwallet.NewLedgerHub(); err != nil { log.Warn(fmt.Sprintf("Failed to start Ledger hub, disabling: %v", err)) } else { backends = append(backends, ledgerhub) } + // Start a USB hub for Trezor hardware wallets + if trezorhub, err := usbwallet.NewTrezorHub(); err != nil { + log.Warn(fmt.Sprintf("Failed to start Trezor hub, disabling: %v", err)) + } else { + backends = append(backends, trezorhub) + } } return accounts.NewManager(backends...), ephemeral, nil } diff --git a/node/doc.go b/node/doc.go index f009e6f85..d9688e0a1 100644 --- a/node/doc.go +++ b/node/doc.go @@ -68,7 +68,7 @@ unless its location is changed through the KeyStoreDir configuration option. Data Directory Sharing Example -In this exanple, two node instances named A and B are started with the same data +In this example, two node instances named A and B are started with the same data directory. Mode instance A opens the database "db", node instance B opens the databases "db" and "db-2". The following files will be created in the data directory: diff --git a/node/errors.go b/node/errors.go index bd5ddeb5d..2e0dadc4d 100644 --- a/node/errors.go +++ b/node/errors.go @@ -17,10 +17,28 @@ package node import ( + "errors" "fmt" "reflect" + "syscall" ) +var ( + ErrDatadirUsed = errors.New("datadir already used by another process") + ErrNodeStopped = errors.New("node not started") + ErrNodeRunning = errors.New("node already running") + ErrServiceUnknown = errors.New("unknown service") + + datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} +) + +func convertFileLockError(err error) error { + if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] { + return ErrDatadirUsed + } + return err +} + // DuplicateServiceError is returned during Node startup if a registered service // constructor returns a service of the same type that was already started. type DuplicateServiceError struct { diff --git a/node/node.go b/node/node.go index a372b1c25..6f189d8fe 100644 --- a/node/node.go +++ b/node/node.go @@ -25,7 +25,6 @@ import ( "reflect" "strings" "sync" - "syscall" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/ethdb" @@ -34,16 +33,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" - "github.com/syndtr/goleveldb/leveldb/storage" -) - -var ( - ErrDatadirUsed = errors.New("datadir already used") - ErrNodeStopped = errors.New("node not started") - ErrNodeRunning = errors.New("node already running") - ErrServiceUnknown = errors.New("unknown service") - - datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} + "github.com/prometheus/prometheus/util/flock" ) // Node is a container on which services can be registered. @@ -52,8 +42,8 @@ type Node struct { config *Config accman *accounts.Manager - ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop - instanceDirLock storage.Storage // prevents concurrent use of instance directory + ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop + instanceDirLock flock.Releaser // prevents concurrent use of instance directory serverConfig p2p.Config server *p2p.Server // Currently running P2P networking layer @@ -160,7 +150,7 @@ func (n *Node) Start() error { n.serverConfig.StaticNodes = n.config.StaticNodes() } if n.serverConfig.TrustedNodes == nil { - n.serverConfig.TrustedNodes = n.config.TrusterNodes() + n.serverConfig.TrustedNodes = n.config.TrustedNodes() } if n.serverConfig.NodeDatabase == "" { n.serverConfig.NodeDatabase = n.config.NodeDB() @@ -197,10 +187,7 @@ func (n *Node) Start() error { running.Protocols = append(running.Protocols, service.Protocols()...) } if err := running.Start(); err != nil { - if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] { - return ErrDatadirUsed - } - return err + return convertFileLockError(err) } // Start each of the services started := []reflect.Type{} @@ -242,14 +229,13 @@ func (n *Node) openDataDir() error { if err := os.MkdirAll(instdir, 0700); err != nil { return err } - // Try to open the instance directory as LevelDB storage. This creates a lock file - // which prevents concurrent use by another instance as well as accidental use of the - // instance directory as a database. - storage, err := storage.OpenFile(instdir, true) + // Lock the instance directory to prevent concurrent use by another instance as well as + // accidental use of the instance directory as a database. + release, _, err := flock.New(filepath.Join(instdir, "LOCK")) if err != nil { - return err + return convertFileLockError(err) } - n.instanceDirLock = storage + n.instanceDirLock = release return nil } @@ -275,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { n.stopInProc() return err } - if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil { + if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil { n.stopHTTP() n.stopIPC() n.stopInProc() @@ -426,7 +412,7 @@ func (n *Node) stopHTTP() { } // startWS initializes and starts the websocket RPC endpoint. -func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error { +func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error { // Short circuit if the WS endpoint isn't being exposed if endpoint == "" { return nil @@ -439,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig // Register all the APIs exposed by the services handler := rpc.NewServer() for _, api := range apis { - if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } @@ -455,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig return err } go rpc.NewWSServer(wsOrigins, handler).Serve(listener) - log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint)) + log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr())) // All listeners booted successfully n.wsEndpoint = endpoint @@ -509,7 +495,9 @@ func (n *Node) Stop() error { // Release instance directory lock. if n.instanceDirLock != nil { - n.instanceDirLock.Close() + if err := n.instanceDirLock.Release(); err != nil { + log.Error("Can't release datadir lock", "err", err) + } n.instanceDirLock = nil } @@ -568,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) { return rpc.DialInProc(n.inprocHandler), nil } +// RPCHandler returns the in-process RPC request handler. +func (n *Node) RPCHandler() (*rpc.Server, error) { + n.lock.RLock() + defer n.lock.RUnlock() + + if n.inprocHandler == nil { + return nil, ErrNodeStopped + } + return n.inprocHandler, nil +} + // Server retrieves the currently running P2P network layer. This method is meant // only to inspect fields of the currently running server, life cycle management // should be left to this Node entity. diff --git a/node/service.go b/node/service.go index 5e1eb0e64..55062a500 100644 --- a/node/service.go +++ b/node/service.go @@ -43,7 +43,11 @@ func (ctx *ServiceContext) OpenDatabase(name string, cache int, handles int) (et if ctx.config.DataDir == "" { return ethdb.NewMemDatabase() } - return ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles) + db, err := ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles) + if err != nil { + return nil, err + } + return db, nil } // ResolvePath resolves a user path into the data directory if that was relative diff --git a/node/utils_test.go b/node/utils_test.go index 7cdfc2b3a..8eddce3ed 100644 --- a/node/utils_test.go +++ b/node/utils_test.go @@ -41,12 +41,10 @@ func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService), type NoopServiceA struct{ NoopService } type NoopServiceB struct{ NoopService } type NoopServiceC struct{ NoopService } -type NoopServiceD struct{ NoopService } func NewNoopServiceA(*ServiceContext) (Service, error) { return new(NoopServiceA), nil } func NewNoopServiceB(*ServiceContext) (Service, error) { return new(NoopServiceB), nil } func NewNoopServiceC(*ServiceContext) (Service, error) { return new(NoopServiceC), nil } -func NewNoopServiceD(*ServiceContext) (Service, error) { return new(NoopServiceD), nil } // InstrumentedService is an implementation of Service for which all interface // methods can be instrumented both return value as well as event hook wise. |