aboutsummaryrefslogtreecommitdiffstats
path: root/node/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'node/node.go')
-rw-r--r--node/node.go57
1 files changed, 28 insertions, 29 deletions
diff --git a/node/node.go b/node/node.go
index a372b1c25..6f189d8fe 100644
--- a/node/node.go
+++ b/node/node.go
@@ -25,7 +25,6 @@ import (
"reflect"
"strings"
"sync"
- "syscall"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/ethdb"
@@ -34,16 +33,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
- "github.com/syndtr/goleveldb/leveldb/storage"
-)
-
-var (
- ErrDatadirUsed = errors.New("datadir already used")
- ErrNodeStopped = errors.New("node not started")
- ErrNodeRunning = errors.New("node already running")
- ErrServiceUnknown = errors.New("unknown service")
-
- datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
+ "github.com/prometheus/prometheus/util/flock"
)
// Node is a container on which services can be registered.
@@ -52,8 +42,8 @@ type Node struct {
config *Config
accman *accounts.Manager
- ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
- instanceDirLock storage.Storage // prevents concurrent use of instance directory
+ ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
+ instanceDirLock flock.Releaser // prevents concurrent use of instance directory
serverConfig p2p.Config
server *p2p.Server // Currently running P2P networking layer
@@ -160,7 +150,7 @@ func (n *Node) Start() error {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
- n.serverConfig.TrustedNodes = n.config.TrusterNodes()
+ n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
@@ -197,10 +187,7 @@ func (n *Node) Start() error {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
- if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
- return ErrDatadirUsed
- }
- return err
+ return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
@@ -242,14 +229,13 @@ func (n *Node) openDataDir() error {
if err := os.MkdirAll(instdir, 0700); err != nil {
return err
}
- // Try to open the instance directory as LevelDB storage. This creates a lock file
- // which prevents concurrent use by another instance as well as accidental use of the
- // instance directory as a database.
- storage, err := storage.OpenFile(instdir, true)
+ // Lock the instance directory to prevent concurrent use by another instance as well as
+ // accidental use of the instance directory as a database.
+ release, _, err := flock.New(filepath.Join(instdir, "LOCK"))
if err != nil {
- return err
+ return convertFileLockError(err)
}
- n.instanceDirLock = storage
+ n.instanceDirLock = release
return nil
}
@@ -275,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
- if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
+ if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
@@ -426,7 +412,7 @@ func (n *Node) stopHTTP() {
}
// startWS initializes and starts the websocket RPC endpoint.
-func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error {
+func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
@@ -439,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
- if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
+ if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
@@ -455,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
- log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint))
+ log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
@@ -509,7 +495,9 @@ func (n *Node) Stop() error {
// Release instance directory lock.
if n.instanceDirLock != nil {
- n.instanceDirLock.Close()
+ if err := n.instanceDirLock.Release(); err != nil {
+ log.Error("Can't release datadir lock", "err", err)
+ }
n.instanceDirLock = nil
}
@@ -568,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
return rpc.DialInProc(n.inprocHandler), nil
}
+// RPCHandler returns the in-process RPC request handler.
+func (n *Node) RPCHandler() (*rpc.Server, error) {
+ n.lock.RLock()
+ defer n.lock.RUnlock()
+
+ if n.inprocHandler == nil {
+ return nil, ErrNodeStopped
+ }
+ return n.inprocHandler, nil
+}
+
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.