diff options
Diffstat (limited to 'node/node.go')
-rw-r--r-- | node/node.go | 57 |
1 files changed, 28 insertions, 29 deletions
diff --git a/node/node.go b/node/node.go index a372b1c25..6f189d8fe 100644 --- a/node/node.go +++ b/node/node.go @@ -25,7 +25,6 @@ import ( "reflect" "strings" "sync" - "syscall" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/ethdb" @@ -34,16 +33,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" - "github.com/syndtr/goleveldb/leveldb/storage" -) - -var ( - ErrDatadirUsed = errors.New("datadir already used") - ErrNodeStopped = errors.New("node not started") - ErrNodeRunning = errors.New("node already running") - ErrServiceUnknown = errors.New("unknown service") - - datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} + "github.com/prometheus/prometheus/util/flock" ) // Node is a container on which services can be registered. @@ -52,8 +42,8 @@ type Node struct { config *Config accman *accounts.Manager - ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop - instanceDirLock storage.Storage // prevents concurrent use of instance directory + ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop + instanceDirLock flock.Releaser // prevents concurrent use of instance directory serverConfig p2p.Config server *p2p.Server // Currently running P2P networking layer @@ -160,7 +150,7 @@ func (n *Node) Start() error { n.serverConfig.StaticNodes = n.config.StaticNodes() } if n.serverConfig.TrustedNodes == nil { - n.serverConfig.TrustedNodes = n.config.TrusterNodes() + n.serverConfig.TrustedNodes = n.config.TrustedNodes() } if n.serverConfig.NodeDatabase == "" { n.serverConfig.NodeDatabase = n.config.NodeDB() @@ -197,10 +187,7 @@ func (n *Node) Start() error { running.Protocols = append(running.Protocols, service.Protocols()...) } if err := running.Start(); err != nil { - if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] { - return ErrDatadirUsed - } - return err + return convertFileLockError(err) } // Start each of the services started := []reflect.Type{} @@ -242,14 +229,13 @@ func (n *Node) openDataDir() error { if err := os.MkdirAll(instdir, 0700); err != nil { return err } - // Try to open the instance directory as LevelDB storage. This creates a lock file - // which prevents concurrent use by another instance as well as accidental use of the - // instance directory as a database. - storage, err := storage.OpenFile(instdir, true) + // Lock the instance directory to prevent concurrent use by another instance as well as + // accidental use of the instance directory as a database. + release, _, err := flock.New(filepath.Join(instdir, "LOCK")) if err != nil { - return err + return convertFileLockError(err) } - n.instanceDirLock = storage + n.instanceDirLock = release return nil } @@ -275,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { n.stopInProc() return err } - if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil { + if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil { n.stopHTTP() n.stopIPC() n.stopInProc() @@ -426,7 +412,7 @@ func (n *Node) stopHTTP() { } // startWS initializes and starts the websocket RPC endpoint. -func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error { +func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error { // Short circuit if the WS endpoint isn't being exposed if endpoint == "" { return nil @@ -439,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig // Register all the APIs exposed by the services handler := rpc.NewServer() for _, api := range apis { - if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } @@ -455,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig return err } go rpc.NewWSServer(wsOrigins, handler).Serve(listener) - log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint)) + log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr())) // All listeners booted successfully n.wsEndpoint = endpoint @@ -509,7 +495,9 @@ func (n *Node) Stop() error { // Release instance directory lock. if n.instanceDirLock != nil { - n.instanceDirLock.Close() + if err := n.instanceDirLock.Release(); err != nil { + log.Error("Can't release datadir lock", "err", err) + } n.instanceDirLock = nil } @@ -568,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) { return rpc.DialInProc(n.inprocHandler), nil } +// RPCHandler returns the in-process RPC request handler. +func (n *Node) RPCHandler() (*rpc.Server, error) { + n.lock.RLock() + defer n.lock.RUnlock() + + if n.inprocHandler == nil { + return nil, ErrNodeStopped + } + return n.inprocHandler, nil +} + // Server retrieves the currently running P2P network layer. This method is meant // only to inspect fields of the currently running server, life cycle management // should be left to this Node entity. |