aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--node/config.go3
-rw-r--r--node/node.go33
-rw-r--r--p2p/dial.go25
-rw-r--r--p2p/peer.go5
-rw-r--r--p2p/server.go84
-rw-r--r--p2p/server_test.go10
-rw-r--r--p2p/simulations/adapters/docker.go2
-rw-r--r--p2p/simulations/adapters/exec.go1
-rw-r--r--p2p/simulations/adapters/inproc.go4
-rw-r--r--p2p/simulations/adapters/types.go6
-rw-r--r--p2p/simulations/network.go67
11 files changed, 171 insertions, 69 deletions
diff --git a/node/config.go b/node/config.go
index 1ee02d896..7a0c1688e 100644
--- a/node/config.go
+++ b/node/config.go
@@ -135,6 +135,9 @@ type Config struct {
// *WARNING* Only set this if the node is running in a trusted network, exposing
// private APIs to untrusted users is a major security risk.
WSExposeAll bool `toml:",omitempty"`
+
+ // Logger is a custom logger to use with the p2p.Server.
+ Logger log.Logger
}
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
diff --git a/node/node.go b/node/node.go
index 6f189d8fe..ff7258033 100644
--- a/node/node.go
+++ b/node/node.go
@@ -69,6 +69,8 @@ type Node struct {
stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex
+
+ log log.Logger
}
// New creates a new P2P node, ready for protocol registration.
@@ -101,6 +103,9 @@ func New(conf *Config) (*Node, error) {
if err != nil {
return nil, err
}
+ if conf.Logger == nil {
+ conf.Logger = log.New()
+ }
// Note: any interaction with Config that would create/touch files
// in the data directory or instance directory is delayed until Start.
return &Node{
@@ -112,6 +117,7 @@ func New(conf *Config) (*Node, error) {
httpEndpoint: conf.HTTPEndpoint(),
wsEndpoint: conf.WSEndpoint(),
eventmux: new(event.TypeMux),
+ log: conf.Logger,
}, nil
}
@@ -146,6 +152,7 @@ func (n *Node) Start() error {
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
+ n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
@@ -156,7 +163,7 @@ func (n *Node) Start() error {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
- log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
+ n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
@@ -280,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
}
n.inprocHandler = handler
return nil
@@ -306,7 +313,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
}
// All APIs registered, start the IPC listener
var (
@@ -317,7 +324,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
return err
}
go func() {
- log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
+ n.log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
for {
conn, err := listener.Accept()
@@ -330,7 +337,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
return
}
// Not closed, just some error; report and continue
- log.Error(fmt.Sprintf("IPC accept failed: %v", err))
+ n.log.Error(fmt.Sprintf("IPC accept failed: %v", err))
continue
}
go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
@@ -349,7 +356,7 @@ func (n *Node) stopIPC() {
n.ipcListener.Close()
n.ipcListener = nil
- log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
+ n.log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
@@ -375,7 +382,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
}
}
// All APIs registered, start the HTTP listener
@@ -387,7 +394,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
return err
}
go rpc.NewHTTPServer(cors, handler).Serve(listener)
- log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
+ n.log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
// All listeners booted successfully
n.httpEndpoint = endpoint
@@ -403,7 +410,7 @@ func (n *Node) stopHTTP() {
n.httpListener.Close()
n.httpListener = nil
- log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
+ n.log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
}
if n.httpHandler != nil {
n.httpHandler.Stop()
@@ -429,7 +436,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
- log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
+ n.log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
}
}
// All APIs registered, start the HTTP listener
@@ -441,7 +448,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
- log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
+ n.log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
@@ -457,7 +464,7 @@ func (n *Node) stopWS() {
n.wsListener.Close()
n.wsListener = nil
- log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
+ n.log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
}
if n.wsHandler != nil {
n.wsHandler.Stop()
@@ -496,7 +503,7 @@ func (n *Node) Stop() error {
// Release instance directory lock.
if n.instanceDirLock != nil {
if err := n.instanceDirLock.Release(); err != nil {
- log.Error("Can't release datadir lock", "err", err)
+ n.log.Error("Can't release datadir lock", "err", err)
}
n.instanceDirLock = nil
}
diff --git a/p2p/dial.go b/p2p/dial.go
index 2d9e3a0ed..8ca3dc5a1 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) {
return
}
}
- success := t.dial(srv, t.dest)
- // Try resolving the ID of static nodes if dialing failed.
- if !success && t.flags&staticDialedConn != 0 {
- if t.resolve(srv) {
- t.dial(srv, t.dest)
+ err := t.dial(srv, t.dest)
+ if err != nil {
+ log.Trace("Dial error", "task", t, "err", err)
+ // Try resolving the ID of static nodes if dialing failed.
+ if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
+ if t.resolve(srv) {
+ t.dial(srv, t.dest)
+ }
}
}
}
@@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool {
return true
}
+type dialError struct {
+ error
+}
+
// dial performs the actual connection attempt.
-func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
+func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
fd, err := srv.Dialer.Dial(dest)
if err != nil {
- log.Trace("Dial error", "task", t, "err", err)
- return false
+ return &dialError{err}
}
mfd := newMeteredConn(fd, false)
- srv.SetupConn(mfd, t.flags, dest)
- return true
+ return srv.SetupConn(mfd, t.flags, dest)
}
func (t *dialTask) String() string {
diff --git a/p2p/peer.go b/p2p/peer.go
index 1d2b726e8..bad1c8c8b 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -160,6 +160,11 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
}
+// Inbound returns true if the peer is an inbound connection
+func (p *Peer) Inbound() bool {
+ return p.rw.flags&inboundConn != 0
+}
+
func newPeer(conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
diff --git a/p2p/server.go b/p2p/server.go
index d1d578401..922df55ba 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -139,6 +139,9 @@ type Config struct {
// If EnableMsgEvents is set then the server will emit PeerEvents
// whenever a message is sent to or received from a peer
EnableMsgEvents bool
+
+ // Logger is a custom logger to use with the p2p.Server.
+ Logger log.Logger
}
// Server manages all peer connections.
@@ -172,6 +175,7 @@ type Server struct {
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
+ log log.Logger
}
type peerOpFunc func(map[discover.NodeID]*Peer)
@@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) {
return errors.New("server already running")
}
srv.running = true
- log.Info("Starting P2P networking")
+ srv.log = srv.Config.Logger
+ if srv.log == nil {
+ srv.log = log.New()
+ }
+ srv.log.Info("Starting P2P networking")
// static fields
if srv.PrivateKey == nil {
@@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) {
}
}
if srv.NoDial && srv.ListenAddr == "" {
- log.Warn("P2P server will be useless, neither dialing nor listening")
+ srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
@@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
- log.Trace("New dial task", "task", t)
+ srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
@@ -517,13 +525,13 @@ running:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
- log.Debug("Adding static node", "node", n)
+ srv.log.Debug("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
- log.Debug("Removing static node", "node", n)
+ srv.log.Debug("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
@@ -536,7 +544,7 @@ running:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
- log.Trace("Dial task done", "task", t)
+ srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
@@ -565,7 +573,7 @@ running:
p.events = &srv.peerFeed
}
name := truncateName(c.name)
- log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
+ srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
go srv.runPeer(p)
}
@@ -585,7 +593,7 @@ running:
}
}
- log.Trace("P2P networking is spinning down")
+ srv.log.Trace("P2P networking is spinning down")
// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
@@ -639,7 +647,7 @@ type tempError interface {
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
- log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
+ srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
// This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake.
@@ -664,10 +672,10 @@ func (srv *Server) listenLoop() {
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
- log.Debug("Temporary read error", "err", err)
+ srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
- log.Debug("Read error", "err", err)
+ srv.log.Debug("Read error", "err", err)
return
}
break
@@ -676,7 +684,7 @@ func (srv *Server) listenLoop() {
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
- log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
+ srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
@@ -684,7 +692,7 @@ func (srv *Server) listenLoop() {
}
fd = newMeteredConn(fd, true)
- log.Trace("Accepted connection", "addr", fd.RemoteAddr())
+ srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
// Spawn the handler. It will give the slot back when the connection
// has been established.
@@ -698,55 +706,65 @@ func (srv *Server) listenLoop() {
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
-func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
+func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
+ self := srv.Self()
+ if self == nil {
+ return errors.New("shutdown")
+ }
+ c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
+ err := srv.setupConn(c, flags, dialDest)
+ if err != nil {
+ c.close(err)
+ srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
+ }
+ return err
+}
+
+func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
- c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
if !running {
- c.close(errServerStopped)
- return
+ return errServerStopped
}
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
- log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
- c.close(err)
- return
+ srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
+ return err
}
- clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
+ clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
- c.close(DiscUnexpectedIdentity)
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
- return
+ return DiscUnexpectedIdentity
}
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ err = srv.checkpoint(c, srv.posthandshake)
+ if err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
- c.close(err)
- return
+ return err
}
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed proto handshake", "err", err)
- c.close(err)
- return
+ return err
}
if phs.ID != c.id {
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
- c.close(DiscUnexpectedIdentity)
- return
+ return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
- if err := srv.checkpoint(c, srv.addpeer); err != nil {
+ err = srv.checkpoint(c, srv.addpeer)
+ if err != nil {
clog.Trace("Rejected peer", "err", err)
- c.close(err)
- return
+ return err
}
// If the checks completed successfully, runPeer has now been
// launched by run.
+ clog.Trace("connection set up", "inbound", dialDest == nil)
+ return nil
}
func truncateName(s string) string {
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 11dd83e5d..10c36528e 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
)
@@ -206,6 +207,7 @@ func TestServerTaskScheduling(t *testing.T) {
quit: make(chan struct{}),
ntab: fakeTable{},
running: true,
+ log: log.New(),
}
srv.loopWG.Add(1)
go func() {
@@ -246,7 +248,12 @@ func TestServerManyTasks(t *testing.T) {
}
var (
- srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
+ srv = &Server{
+ quit: make(chan struct{}),
+ ntab: fakeTable{},
+ running: true,
+ log: log.New(),
+ }
done = make(chan *testTask)
start, end = 0, 0
)
@@ -428,6 +435,7 @@ func TestServerSetupConn(t *testing.T) {
Protocols: []Protocol{discard},
},
newTransport: func(fd net.Conn) transport { return test.tt },
+ log: log.New(),
}
if !test.dontstart {
if err := srv.Start(); err != nil {
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
index 022314b3d..8ef5629fb 100644
--- a/p2p/simulations/adapters/docker.go
+++ b/p2p/simulations/adapters/docker.go
@@ -28,6 +28,7 @@ import (
"strings"
"github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
)
@@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
conf.Stack.P2P.NoDiscovery = true
conf.Stack.P2P.NAT = nil
conf.Stack.NoUSB = true
+ conf.Stack.Logger = log.New("node.id", config.ID.String())
node := &DockerNode{
ExecNode: ExecNode{
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
index bdb92cc1d..a566fb27d 100644
--- a/p2p/simulations/adapters/exec.go
+++ b/p2p/simulations/adapters/exec.go
@@ -359,6 +359,7 @@ func execP2PNode() {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
}
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
+ conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
// use explicit IP address in ListenAddr so that Enode URL is usable
externalIP := func() string {
diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go
index c97188def..48d7c1730 100644
--- a/p2p/simulations/adapters/inproc.go
+++ b/p2p/simulations/adapters/inproc.go
@@ -24,6 +24,7 @@ import (
"sync"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
Dialer: s,
EnableMsgEvents: true,
},
- NoUSB: true,
+ NoUSB: true,
+ Logger: log.New("node.id", id.String()),
})
if err != nil {
return nil, err
diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go
index ed6cfc504..5b4b47fe2 100644
--- a/p2p/simulations/adapters/types.go
+++ b/p2p/simulations/adapters/types.go
@@ -83,6 +83,9 @@ type NodeConfig struct {
// stack to encrypt communications
PrivateKey *ecdsa.PrivateKey
+ // Enable peer events for Msgs
+ EnableMsgEvents bool
+
// Name is a human friendly name for the node like "node01"
Name string
@@ -91,6 +94,9 @@ type NodeConfig struct {
// contained in SimAdapter.services, for other nodes it should be
// services registered by calling the RegisterService function)
Services []string
+
+ // function to sanction or prevent suggesting a peer
+ Reachable func(id discover.NodeID) bool
}
// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index 06890ffcf..fd8777673 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@@ -30,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
+var dialBanTimeout = 200 * time.Millisecond
+
// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
ID string `json:"id"`
@@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
conf.PrivateKey = c.PrivateKey
}
id := conf.ID
+ if conf.Reachable == nil {
+ conf.Reachable = func(otherID discover.NodeID) bool {
+ _, err := self.InitConn(conf.ID, otherID)
+ return err == nil
+ }
+ }
// assign a name to the node if not set
if conf.Name == "" {
@@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error {
// method on the "one" node so that it connects to the "other" node
func (self *Network) Connect(oneID, otherID discover.NodeID) error {
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
- conn, err := self.GetOrCreateConn(oneID, otherID)
+ conn, err := self.InitConn(oneID, otherID)
if err != nil {
return err
}
- if conn.Up {
- return fmt.Errorf("%v and %v already connected", oneID, otherID)
- }
- if err := conn.nodesUp(); err != nil {
- return err
- }
client, err := conn.one.Client()
if err != nil {
return err
@@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error {
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (self *Network) DidDisconnect(one, other discover.NodeID) error {
- conn, err := self.GetOrCreateConn(one, other)
- if err != nil {
+ conn := self.GetConn(one, other)
+ if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
+ conn.initiated = time.Now().Add(-dialBanTimeout)
self.events.Send(NewEvent(conn))
return nil
}
@@ -396,10 +400,13 @@ func (self *Network) getNodeByName(name string) *Node {
}
// GetNodes returns the existing nodes
-func (self *Network) GetNodes() []*Node {
+func (self *Network) GetNodes() (nodes []*Node) {
self.lock.Lock()
defer self.lock.Unlock()
- return self.Nodes
+ for _, node := range self.Nodes {
+ nodes = append(nodes, node)
+ }
+ return nodes
}
// GetConn returns the connection which exists between "one" and "other"
@@ -415,6 +422,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
self.lock.Lock()
defer self.lock.Unlock()
+ return self.getOrCreateConn(oneID, otherID)
+}
+
+func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
if conn := self.getConn(oneID, otherID); conn != nil {
return conn, nil
}
@@ -448,6 +459,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
return self.Conns[i]
}
+// InitConn(one, other) retrieves the connectiton model for the connection between
+// peers one and other, or creates a new one if it does not exist
+// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
+// it checks if the connection is already up, and if the nodes are running
+// NOTE:
+// it also checks whether there has been recent attempt to connect the peers
+// this is cheating as the simulation is used as an oracle and know about
+// remote peers attempt to connect to a node which will then not initiate the connection
+func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if oneID == otherID {
+ return nil, fmt.Errorf("refusing to connect to self %v", oneID)
+ }
+ conn, err := self.getOrCreateConn(oneID, otherID)
+ if err != nil {
+ return nil, err
+ }
+ if time.Now().Sub(conn.initiated) < dialBanTimeout {
+ return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
+ }
+ if conn.Up {
+ return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
+ }
+ err = conn.nodesUp()
+ if err != nil {
+ return nil, fmt.Errorf("nodes not up: %v", err)
+ }
+ conn.initiated = time.Now()
+ return conn, nil
+}
+
// Shutdown stops all nodes in the network and closes the quit channel
func (self *Network) Shutdown() {
for _, node := range self.Nodes {
@@ -516,6 +559,8 @@ type Conn struct {
// Up tracks whether or not the connection is active
Up bool `json:"up"`
+ // Registers when the connection was grabbed to dial
+ initiated time.Time
one *Node
other *Node