diff options
Diffstat (limited to 'p2p/simulations/network.go')
-rw-r--r-- | p2p/simulations/network.go | 158 |
1 files changed, 114 insertions, 44 deletions
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 86f7dc9be..2049a5108 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -136,7 +136,7 @@ func (net *Network) Config() *NetworkConfig { // StartAll starts all nodes in the network func (net *Network) StartAll() error { for _, node := range net.Nodes { - if node.Up { + if node.Up() { continue } if err := net.Start(node.ID()); err != nil { @@ -149,7 +149,7 @@ func (net *Network) StartAll() error { // StopAll stops all nodes in the network func (net *Network) StopAll() error { for _, node := range net.Nodes { - if !node.Up { + if !node.Up() { continue } if err := net.Stop(node.ID()); err != nil { @@ -174,7 +174,7 @@ func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) if node == nil { return fmt.Errorf("node %v does not exist", id) } - if node.Up { + if node.Up() { return fmt.Errorf("node %v already up", id) } log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name()) @@ -182,10 +182,10 @@ func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) log.Warn("Node startup failed", "id", id, "err", err) return err } - node.Up = true + node.SetUp(true) log.Info("Started node", "id", id) - - net.events.Send(NewEvent(node)) + ev := NewEvent(node) + net.events.Send(ev) // subscribe to peer events client, err := node.Client() @@ -210,12 +210,14 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub // assume the node is now down net.lock.Lock() defer net.lock.Unlock() + node := net.getNode(id) if node == nil { return } - node.Up = false - net.events.Send(NewEvent(node)) + node.SetUp(false) + ev := NewEvent(node) + net.events.Send(ev) }() for { select { @@ -251,34 +253,57 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub // Stop stops the node with the given ID func (net *Network) Stop(id enode.ID) error { - net.lock.Lock() - node := net.getNode(id) - if node == nil { - return fmt.Errorf("node %v does not exist", id) - } - if !node.Up { - return fmt.Errorf("node %v already down", id) + // IMPORTANT: node.Stop() must NOT be called under net.lock as + // node.Reachable() closure has a reference to the network and + // calls net.InitConn() what also locks the network. => DEADLOCK + // That holds until the following ticket is not resolved: + + var err error + + node, err := func() (*Node, error) { + net.lock.Lock() + defer net.lock.Unlock() + + node := net.getNode(id) + if node == nil { + return nil, fmt.Errorf("node %v does not exist", id) + } + if !node.Up() { + return nil, fmt.Errorf("node %v already down", id) + } + node.SetUp(false) + return node, nil + }() + if err != nil { + return err } - node.Up = false - net.lock.Unlock() - err := node.Stop() + err = node.Stop() // must be called without net.lock + + net.lock.Lock() + defer net.lock.Unlock() + if err != nil { - net.lock.Lock() - node.Up = true - net.lock.Unlock() + node.SetUp(true) return err } log.Info("Stopped node", "id", id, "err", err) - net.events.Send(ControlEvent(node)) + ev := ControlEvent(node) + net.events.Send(ev) return nil } // Connect connects two nodes together by calling the "admin_addPeer" RPC // method on the "one" node so that it connects to the "other" node func (net *Network) Connect(oneID, otherID enode.ID) error { + net.lock.Lock() + defer net.lock.Unlock() + return net.connect(oneID, otherID) +} + +func (net *Network) connect(oneID, otherID enode.ID) error { log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID) - conn, err := net.InitConn(oneID, otherID) + conn, err := net.initConn(oneID, otherID) if err != nil { return err } @@ -376,6 +401,14 @@ func (net *Network) GetNode(id enode.ID) *Node { return net.getNode(id) } +func (net *Network) getNode(id enode.ID) *Node { + i, found := net.nodeMap[id] + if !found { + return nil + } + return net.Nodes[i] +} + // GetNode gets the node with the given name, returning nil if the node does // not exist func (net *Network) GetNodeByName(name string) *Node { @@ -398,28 +431,29 @@ func (net *Network) GetNodes() (nodes []*Node) { net.lock.RLock() defer net.lock.RUnlock() - nodes = append(nodes, net.Nodes...) - return nodes + return net.getNodes() } -func (net *Network) getNode(id enode.ID) *Node { - i, found := net.nodeMap[id] - if !found { - return nil - } - return net.Nodes[i] +func (net *Network) getNodes() (nodes []*Node) { + nodes = append(nodes, net.Nodes...) + return nodes } // GetRandomUpNode returns a random node on the network, which is running. func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node { net.lock.RLock() defer net.lock.RUnlock() + return net.getRandomUpNode(excludeIDs...) +} + +// GetRandomUpNode returns a random node on the network, which is running. +func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node { return net.getRandomNode(net.getUpNodeIDs(), excludeIDs) } func (net *Network) getUpNodeIDs() (ids []enode.ID) { for _, node := range net.Nodes { - if node.Up { + if node.Up() { ids = append(ids, node.ID()) } } @@ -434,8 +468,8 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node { } func (net *Network) getDownNodeIDs() (ids []enode.ID) { - for _, node := range net.GetNodes() { - if !node.Up { + for _, node := range net.getNodes() { + if !node.Up() { ids = append(ids, node.ID()) } } @@ -449,7 +483,7 @@ func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node { if l == 0 { return nil } - return net.GetNode(filtered[rand.Intn(l)]) + return net.getNode(filtered[rand.Intn(l)]) } func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID { @@ -527,6 +561,10 @@ func (net *Network) getConn(oneID, otherID enode.ID) *Conn { func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) { net.lock.Lock() defer net.lock.Unlock() + return net.initConn(oneID, otherID) +} + +func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) { if oneID == otherID { return nil, fmt.Errorf("refusing to connect to self %v", oneID) } @@ -584,8 +622,21 @@ type Node struct { // Config if the config used to created the node Config *adapters.NodeConfig `json:"config"` - // Up tracks whether or not the node is running - Up bool `json:"up"` + // up tracks whether or not the node is running + up bool + upMu sync.RWMutex +} + +func (n *Node) Up() bool { + n.upMu.RLock() + defer n.upMu.RUnlock() + return n.up +} + +func (n *Node) SetUp(up bool) { + n.upMu.Lock() + defer n.upMu.Unlock() + n.up = up } // ID returns the ID of the node @@ -619,10 +670,29 @@ func (n *Node) MarshalJSON() ([]byte, error) { }{ Info: n.NodeInfo(), Config: n.Config, - Up: n.Up, + Up: n.Up(), }) } +// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose +// Node.up status. IMPORTANT: The implementation is incomplete; we lose +// p2p.NodeInfo. +func (n *Node) UnmarshalJSON(raw []byte) error { + // TODO: How should we turn back NodeInfo into n.Node? + // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177 + node := struct { + Config *adapters.NodeConfig `json:"config,omitempty"` + Up bool `json:"up"` + }{} + if err := json.Unmarshal(raw, &node); err != nil { + return err + } + + n.SetUp(node.Up) + n.Config = node.Config + return nil +} + // Conn represents a connection between two nodes in the network type Conn struct { // One is the node which initiated the connection @@ -642,10 +712,10 @@ type Conn struct { // nodesUp returns whether both nodes are currently up func (c *Conn) nodesUp() error { - if !c.one.Up { + if !c.one.Up() { return fmt.Errorf("one %v is not up", c.One) } - if !c.other.Up { + if !c.other.Up() { return fmt.Errorf("other %v is not up", c.Other) } return nil @@ -717,7 +787,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn } for i, node := range net.Nodes { snap.Nodes[i] = NodeSnapshot{Node: *node} - if !node.Up { + if !node.Up() { continue } snapshots, err := node.Snapshots() @@ -772,7 +842,7 @@ func (net *Network) Load(snap *Snapshot) error { if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil { return err } - if !n.Node.Up { + if !n.Node.Up() { continue } if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { @@ -844,7 +914,7 @@ func (net *Network) Load(snap *Snapshot) error { // Start connecting. for _, conn := range snap.Conns { - if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up { + if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() { //in this case, at least one of the nodes of a connection is not up, //so it would result in the snapshot `Load` to fail continue @@ -898,7 +968,7 @@ func (net *Network) executeControlEvent(event *Event) { } func (net *Network) executeNodeEvent(e *Event) error { - if !e.Node.Up { + if !e.Node.Up() { return net.Stop(e.Node.ID()) } |