aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/simulations/network.go')
-rw-r--r--p2p/simulations/network.go155
1 files changed, 107 insertions, 48 deletions
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index a0e621b88..2049a5108 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -136,7 +136,7 @@ func (net *Network) Config() *NetworkConfig {
// StartAll starts all nodes in the network
func (net *Network) StartAll() error {
for _, node := range net.Nodes {
- if node.Up {
+ if node.Up() {
continue
}
if err := net.Start(node.ID()); err != nil {
@@ -149,7 +149,7 @@ func (net *Network) StartAll() error {
// StopAll stops all nodes in the network
func (net *Network) StopAll() error {
for _, node := range net.Nodes {
- if !node.Up {
+ if !node.Up() {
continue
}
if err := net.Stop(node.ID()); err != nil {
@@ -168,27 +168,23 @@ func (net *Network) Start(id enode.ID) error {
// snapshots
func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
net.lock.Lock()
+ defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
- net.lock.Unlock()
return fmt.Errorf("node %v does not exist", id)
}
- if node.Up {
- net.lock.Unlock()
+ if node.Up() {
return fmt.Errorf("node %v already up", id)
}
log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
if err := node.Start(snapshots); err != nil {
- net.lock.Unlock()
log.Warn("Node startup failed", "id", id, "err", err)
return err
}
- node.Up = true
+ node.SetUp(true)
log.Info("Started node", "id", id)
ev := NewEvent(node)
- net.lock.Unlock()
-
net.events.Send(ev)
// subscribe to peer events
@@ -219,7 +215,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
if node == nil {
return
}
- node.Up = false
+ node.SetUp(false)
ev := NewEvent(node)
net.events.Send(ev)
}()
@@ -257,30 +253,42 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
// Stop stops the node with the given ID
func (net *Network) Stop(id enode.ID) error {
- net.lock.Lock()
- node := net.getNode(id)
- if node == nil {
- net.lock.Unlock()
- return fmt.Errorf("node %v does not exist", id)
- }
- if !node.Up {
- net.lock.Unlock()
- return fmt.Errorf("node %v already down", id)
+ // IMPORTANT: node.Stop() must NOT be called under net.lock as
+ // node.Reachable() closure has a reference to the network and
+ // calls net.InitConn() what also locks the network. => DEADLOCK
+ // That holds until the following ticket is not resolved:
+
+ var err error
+
+ node, err := func() (*Node, error) {
+ net.lock.Lock()
+ defer net.lock.Unlock()
+
+ node := net.getNode(id)
+ if node == nil {
+ return nil, fmt.Errorf("node %v does not exist", id)
+ }
+ if !node.Up() {
+ return nil, fmt.Errorf("node %v already down", id)
+ }
+ node.SetUp(false)
+ return node, nil
+ }()
+ if err != nil {
+ return err
}
- node.Up = false
- net.lock.Unlock()
- err := node.Stop()
+ err = node.Stop() // must be called without net.lock
+
+ net.lock.Lock()
+ defer net.lock.Unlock()
+
if err != nil {
- net.lock.Lock()
- node.Up = true
- net.lock.Unlock()
+ node.SetUp(true)
return err
}
log.Info("Stopped node", "id", id, "err", err)
- net.lock.Lock()
ev := ControlEvent(node)
- net.lock.Unlock()
net.events.Send(ev)
return nil
}
@@ -288,8 +296,14 @@ func (net *Network) Stop(id enode.ID) error {
// Connect connects two nodes together by calling the "admin_addPeer" RPC
// method on the "one" node so that it connects to the "other" node
func (net *Network) Connect(oneID, otherID enode.ID) error {
+ net.lock.Lock()
+ defer net.lock.Unlock()
+ return net.connect(oneID, otherID)
+}
+
+func (net *Network) connect(oneID, otherID enode.ID) error {
log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
- conn, err := net.InitConn(oneID, otherID)
+ conn, err := net.initConn(oneID, otherID)
if err != nil {
return err
}
@@ -387,6 +401,14 @@ func (net *Network) GetNode(id enode.ID) *Node {
return net.getNode(id)
}
+func (net *Network) getNode(id enode.ID) *Node {
+ i, found := net.nodeMap[id]
+ if !found {
+ return nil
+ }
+ return net.Nodes[i]
+}
+
// GetNode gets the node with the given name, returning nil if the node does
// not exist
func (net *Network) GetNodeByName(name string) *Node {
@@ -409,28 +431,29 @@ func (net *Network) GetNodes() (nodes []*Node) {
net.lock.RLock()
defer net.lock.RUnlock()
- nodes = append(nodes, net.Nodes...)
- return nodes
+ return net.getNodes()
}
-func (net *Network) getNode(id enode.ID) *Node {
- i, found := net.nodeMap[id]
- if !found {
- return nil
- }
- return net.Nodes[i]
+func (net *Network) getNodes() (nodes []*Node) {
+ nodes = append(nodes, net.Nodes...)
+ return nodes
}
// GetRandomUpNode returns a random node on the network, which is running.
func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
net.lock.RLock()
defer net.lock.RUnlock()
+ return net.getRandomUpNode(excludeIDs...)
+}
+
+// GetRandomUpNode returns a random node on the network, which is running.
+func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node {
return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
}
func (net *Network) getUpNodeIDs() (ids []enode.ID) {
for _, node := range net.Nodes {
- if node.Up {
+ if node.Up() {
ids = append(ids, node.ID())
}
}
@@ -445,8 +468,8 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
}
func (net *Network) getDownNodeIDs() (ids []enode.ID) {
- for _, node := range net.GetNodes() {
- if !node.Up {
+ for _, node := range net.getNodes() {
+ if !node.Up() {
ids = append(ids, node.ID())
}
}
@@ -538,6 +561,10 @@ func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
net.lock.Lock()
defer net.lock.Unlock()
+ return net.initConn(oneID, otherID)
+}
+
+func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) {
if oneID == otherID {
return nil, fmt.Errorf("refusing to connect to self %v", oneID)
}
@@ -595,8 +622,21 @@ type Node struct {
// Config if the config used to created the node
Config *adapters.NodeConfig `json:"config"`
- // Up tracks whether or not the node is running
- Up bool `json:"up"`
+ // up tracks whether or not the node is running
+ up bool
+ upMu sync.RWMutex
+}
+
+func (n *Node) Up() bool {
+ n.upMu.RLock()
+ defer n.upMu.RUnlock()
+ return n.up
+}
+
+func (n *Node) SetUp(up bool) {
+ n.upMu.Lock()
+ defer n.upMu.Unlock()
+ n.up = up
}
// ID returns the ID of the node
@@ -630,10 +670,29 @@ func (n *Node) MarshalJSON() ([]byte, error) {
}{
Info: n.NodeInfo(),
Config: n.Config,
- Up: n.Up,
+ Up: n.Up(),
})
}
+// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose
+// Node.up status. IMPORTANT: The implementation is incomplete; we lose
+// p2p.NodeInfo.
+func (n *Node) UnmarshalJSON(raw []byte) error {
+ // TODO: How should we turn back NodeInfo into n.Node?
+ // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
+ node := struct {
+ Config *adapters.NodeConfig `json:"config,omitempty"`
+ Up bool `json:"up"`
+ }{}
+ if err := json.Unmarshal(raw, &node); err != nil {
+ return err
+ }
+
+ n.SetUp(node.Up)
+ n.Config = node.Config
+ return nil
+}
+
// Conn represents a connection between two nodes in the network
type Conn struct {
// One is the node which initiated the connection
@@ -653,10 +712,10 @@ type Conn struct {
// nodesUp returns whether both nodes are currently up
func (c *Conn) nodesUp() error {
- if !c.one.Up {
+ if !c.one.Up() {
return fmt.Errorf("one %v is not up", c.One)
}
- if !c.other.Up {
+ if !c.other.Up() {
return fmt.Errorf("other %v is not up", c.Other)
}
return nil
@@ -728,7 +787,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
}
for i, node := range net.Nodes {
snap.Nodes[i] = NodeSnapshot{Node: *node}
- if !node.Up {
+ if !node.Up() {
continue
}
snapshots, err := node.Snapshots()
@@ -783,7 +842,7 @@ func (net *Network) Load(snap *Snapshot) error {
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
return err
}
- if !n.Node.Up {
+ if !n.Node.Up() {
continue
}
if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
@@ -855,7 +914,7 @@ func (net *Network) Load(snap *Snapshot) error {
// Start connecting.
for _, conn := range snap.Conns {
- if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
+ if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
//in this case, at least one of the nodes of a connection is not up,
//so it would result in the snapshot `Load` to fail
continue
@@ -909,7 +968,7 @@ func (net *Network) executeControlEvent(event *Event) {
}
func (net *Network) executeNodeEvent(e *Event) error {
- if !e.Node.Up {
+ if !e.Node.Up() {
return net.Stop(e.Node.ID())
}