aboutsummaryrefslogtreecommitdiffstats
path: root/node/node.go
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-12-01 19:49:04 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-12-01 19:49:04 +0800
commit54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d (patch)
tree07bd996822874272ef163bedb56a2ade537cf658 /node/node.go
parent73067fd24f39cb7d2cdf63a99f6fdac661f7a8bf (diff)
downloaddexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.gz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.bz2
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.lz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.xz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.zst
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.zip
p2p/simulations: various stability fixes (#15198)
p2p/simulations: introduce dialBan - Refactor simulations/network connection getters to support avoiding simultaneous dials between two peers If two peers dial simultaneously, the connection will be dropped to help avoid that, we essentially lock the connection object with a timestamp which serves as a ban on dialing for a period of time (dialBanTimeout). - The connection getter InitConn can be wrapped and passed to the nodes via adapters.NodeConfig#Reachable field and then used by the respective services when they initiate connections. This massively stablise the emerging connectivity when running with hundreds of nodes bootstrapping a network. p2p: add Inbound public method to p2p.Peer p2p/simulations: Add server id to logs to support debugging in-memory network simulations when multiple peers are logging. p2p: SetupConn now returns error. The dialer checks the error and only calls resolve if the actual TCP dial fails.
Diffstat (limited to 'node/node.go')
-rw-r--r--node/node.go33
1 files changed, 20 insertions, 13 deletions
diff --git a/node/node.go b/node/node.go
index 6f189d8fe..ff7258033 100644
--- a/node/node.go
+++ b/node/node.go
@@ -69,6 +69,8 @@ type Node struct {
stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex
+
+ log log.Logger
}
// New creates a new P2P node, ready for protocol registration.
@@ -101,6 +103,9 @@ func New(conf *Config) (*Node, error) {
if err != nil {
return nil, err
}
+ if conf.Logger == nil {
+ conf.Logger = log.New()
+ }
// Note: any interaction with Config that would create/touch files
// in the data directory or instance directory is delayed until Start.
return &Node{
@@ -112,6 +117,7 @@ func New(conf *Config) (*Node, error) {
httpEndpoint: conf.HTTPEndpoint(),
wsEndpoint: conf.WSEndpoint(),
eventmux: new(event.TypeMux),
+ log: conf.Logger,
}, nil
}
@@ -146,6 +152,7 @@ func (n *Node) Start() error {
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
+ n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
@@ -156,7 +163,7 @@ func (n *Node) Start() error {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
- log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
+ n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
@@ -280,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
}
n.inprocHandler = handler
return nil
@@ -306,7 +313,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
}
// All APIs registered, start the IPC listener
var (
@@ -317,7 +324,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
return err
}
go func() {
- log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
+ n.log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
for {
conn, err := listener.Accept()
@@ -330,7 +337,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
return
}
// Not closed, just some error; report and continue
- log.Error(fmt.Sprintf("IPC accept failed: %v", err))
+ n.log.Error(fmt.Sprintf("IPC accept failed: %v", err))
continue
}
go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
@@ -349,7 +356,7 @@ func (n *Node) stopIPC() {
n.ipcListener.Close()
n.ipcListener = nil
- log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
+ n.log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
@@ -375,7 +382,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
}
}
// All APIs registered, start the HTTP listener
@@ -387,7 +394,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
return err
}
go rpc.NewHTTPServer(cors, handler).Serve(listener)
- log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
+ n.log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
// All listeners booted successfully
n.httpEndpoint = endpoint
@@ -403,7 +410,7 @@ func (n *Node) stopHTTP() {
n.httpListener.Close()
n.httpListener = nil
- log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
+ n.log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
}
if n.httpHandler != nil {
n.httpHandler.Stop()
@@ -429,7 +436,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
}
}
// All APIs registered, start the HTTP listener
@@ -441,7 +448,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
- log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
+ n.log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
@@ -457,7 +464,7 @@ func (n *Node) stopWS() {
n.wsListener.Close()
n.wsListener = nil
- log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
+ n.log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
}
if n.wsHandler != nil {
n.wsHandler.Stop()
@@ -496,7 +503,7 @@ func (n *Node) Stop() error {
// Release instance directory lock.
if n.instanceDirLock != nil {
if err := n.instanceDirLock.Release(); err != nil {
- log.Error("Can't release datadir lock", "err", err)
+ n.log.Error("Can't release datadir lock", "err", err)
}
n.instanceDirLock = nil
}