aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-12-01 19:49:04 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-12-01 19:49:04 +0800
commit54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d (patch)
tree07bd996822874272ef163bedb56a2ade537cf658 /p2p/simulations
parent73067fd24f39cb7d2cdf63a99f6fdac661f7a8bf (diff)
downloaddexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.gz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.bz2
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.lz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.xz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.zst
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.zip
p2p/simulations: various stability fixes (#15198)
p2p/simulations: introduce dialBan - Refactor simulations/network connection getters to support avoiding simultaneous dials between two peers If two peers dial simultaneously, the connection will be dropped to help avoid that, we essentially lock the connection object with a timestamp which serves as a ban on dialing for a period of time (dialBanTimeout). - The connection getter InitConn can be wrapped and passed to the nodes via adapters.NodeConfig#Reachable field and then used by the respective services when they initiate connections. This massively stablise the emerging connectivity when running with hundreds of nodes bootstrapping a network. p2p: add Inbound public method to p2p.Peer p2p/simulations: Add server id to logs to support debugging in-memory network simulations when multiple peers are logging. p2p: SetupConn now returns error. The dialer checks the error and only calls resolve if the actual TCP dial fails.
Diffstat (limited to 'p2p/simulations')
-rw-r--r--p2p/simulations/adapters/docker.go2
-rw-r--r--p2p/simulations/adapters/exec.go1
-rw-r--r--p2p/simulations/adapters/inproc.go4
-rw-r--r--p2p/simulations/adapters/types.go6
-rw-r--r--p2p/simulations/network.go67
5 files changed, 68 insertions, 12 deletions
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
index 022314b3d..8ef5629fb 100644
--- a/p2p/simulations/adapters/docker.go
+++ b/p2p/simulations/adapters/docker.go
@@ -28,6 +28,7 @@ import (
"strings"
"github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
)
@@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
conf.Stack.P2P.NoDiscovery = true
conf.Stack.P2P.NAT = nil
conf.Stack.NoUSB = true
+ conf.Stack.Logger = log.New("node.id", config.ID.String())
node := &DockerNode{
ExecNode: ExecNode{
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
index bdb92cc1d..a566fb27d 100644
--- a/p2p/simulations/adapters/exec.go
+++ b/p2p/simulations/adapters/exec.go
@@ -359,6 +359,7 @@ func execP2PNode() {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
}
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
+ conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
// use explicit IP address in ListenAddr so that Enode URL is usable
externalIP := func() string {
diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go
index c97188def..48d7c1730 100644
--- a/p2p/simulations/adapters/inproc.go
+++ b/p2p/simulations/adapters/inproc.go
@@ -24,6 +24,7 @@ import (
"sync"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
Dialer: s,
EnableMsgEvents: true,
},
- NoUSB: true,
+ NoUSB: true,
+ Logger: log.New("node.id", id.String()),
})
if err != nil {
return nil, err
diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go
index ed6cfc504..5b4b47fe2 100644
--- a/p2p/simulations/adapters/types.go
+++ b/p2p/simulations/adapters/types.go
@@ -83,6 +83,9 @@ type NodeConfig struct {
// stack to encrypt communications
PrivateKey *ecdsa.PrivateKey
+ // Enable peer events for Msgs
+ EnableMsgEvents bool
+
// Name is a human friendly name for the node like "node01"
Name string
@@ -91,6 +94,9 @@ type NodeConfig struct {
// contained in SimAdapter.services, for other nodes it should be
// services registered by calling the RegisterService function)
Services []string
+
+ // function to sanction or prevent suggesting a peer
+ Reachable func(id discover.NodeID) bool
}
// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index 06890ffcf..fd8777673 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@@ -30,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
+var dialBanTimeout = 200 * time.Millisecond
+
// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
ID string `json:"id"`
@@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
conf.PrivateKey = c.PrivateKey
}
id := conf.ID
+ if conf.Reachable == nil {
+ conf.Reachable = func(otherID discover.NodeID) bool {
+ _, err := self.InitConn(conf.ID, otherID)
+ return err == nil
+ }
+ }
// assign a name to the node if not set
if conf.Name == "" {
@@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error {
// method on the "one" node so that it connects to the "other" node
func (self *Network) Connect(oneID, otherID discover.NodeID) error {
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
- conn, err := self.GetOrCreateConn(oneID, otherID)
+ conn, err := self.InitConn(oneID, otherID)
if err != nil {
return err
}
- if conn.Up {
- return fmt.Errorf("%v and %v already connected", oneID, otherID)
- }
- if err := conn.nodesUp(); err != nil {
- return err
- }
client, err := conn.one.Client()
if err != nil {
return err
@@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error {
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (self *Network) DidDisconnect(one, other discover.NodeID) error {
- conn, err := self.GetOrCreateConn(one, other)
- if err != nil {
+ conn := self.GetConn(one, other)
+ if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
+ conn.initiated = time.Now().Add(-dialBanTimeout)
self.events.Send(NewEvent(conn))
return nil
}
@@ -396,10 +400,13 @@ func (self *Network) getNodeByName(name string) *Node {
}
// GetNodes returns the existing nodes
-func (self *Network) GetNodes() []*Node {
+func (self *Network) GetNodes() (nodes []*Node) {
self.lock.Lock()
defer self.lock.Unlock()
- return self.Nodes
+ for _, node := range self.Nodes {
+ nodes = append(nodes, node)
+ }
+ return nodes
}
// GetConn returns the connection which exists between "one" and "other"
@@ -415,6 +422,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
self.lock.Lock()
defer self.lock.Unlock()
+ return self.getOrCreateConn(oneID, otherID)
+}
+
+func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
if conn := self.getConn(oneID, otherID); conn != nil {
return conn, nil
}
@@ -448,6 +459,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
return self.Conns[i]
}
+// InitConn(one, other) retrieves the connectiton model for the connection between
+// peers one and other, or creates a new one if it does not exist
+// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
+// it checks if the connection is already up, and if the nodes are running
+// NOTE:
+// it also checks whether there has been recent attempt to connect the peers
+// this is cheating as the simulation is used as an oracle and know about
+// remote peers attempt to connect to a node which will then not initiate the connection
+func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if oneID == otherID {
+ return nil, fmt.Errorf("refusing to connect to self %v", oneID)
+ }
+ conn, err := self.getOrCreateConn(oneID, otherID)
+ if err != nil {
+ return nil, err
+ }
+ if time.Now().Sub(conn.initiated) < dialBanTimeout {
+ return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
+ }
+ if conn.Up {
+ return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
+ }
+ err = conn.nodesUp()
+ if err != nil {
+ return nil, fmt.Errorf("nodes not up: %v", err)
+ }
+ conn.initiated = time.Now()
+ return conn, nil
+}
+
// Shutdown stops all nodes in the network and closes the quit channel
func (self *Network) Shutdown() {
for _, node := range self.Nodes {
@@ -516,6 +559,8 @@ type Conn struct {
// Up tracks whether or not the connection is active
Up bool `json:"up"`
+ // Registers when the connection was grabbed to dial
+ initiated time.Time
one *Node
other *Node