diff options
Diffstat (limited to 'p2p/simulations')
-rw-r--r-- | p2p/simulations/connect.go | 43 | ||||
-rw-r--r-- | p2p/simulations/events.go | 2 | ||||
-rw-r--r-- | p2p/simulations/http_test.go | 18 | ||||
-rw-r--r-- | p2p/simulations/mocker_test.go | 9 | ||||
-rw-r--r-- | p2p/simulations/network.go | 155 | ||||
-rw-r--r-- | p2p/simulations/network_test.go | 135 |
6 files changed, 290 insertions, 72 deletions
diff --git a/p2p/simulations/connect.go b/p2p/simulations/connect.go index bb7e7999a..ede96b34c 100644 --- a/p2p/simulations/connect.go +++ b/p2p/simulations/connect.go @@ -32,6 +32,9 @@ var ( // It is useful when constructing a chain network topology // when Network adds and removes nodes dynamically. func (net *Network) ConnectToLastNode(id enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + ids := net.getUpNodeIDs() l := len(ids) if l < 2 { @@ -41,29 +44,35 @@ func (net *Network) ConnectToLastNode(id enode.ID) (err error) { if last == id { last = ids[l-2] } - return net.connect(last, id) + return net.connectNotConnected(last, id) } // ConnectToRandomNode connects the node with provided NodeID // to a random node that is up. func (net *Network) ConnectToRandomNode(id enode.ID) (err error) { - selected := net.GetRandomUpNode(id) + net.lock.Lock() + defer net.lock.Unlock() + + selected := net.getRandomUpNode(id) if selected == nil { return ErrNodeNotFound } - return net.connect(selected.ID(), id) + return net.connectNotConnected(selected.ID(), id) } // ConnectNodesFull connects all nodes one to another. // It provides a complete connectivity in the network // which should be rarely needed. func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + if ids == nil { ids = net.getUpNodeIDs() } for i, lid := range ids { for _, rid := range ids[i+1:] { - if err = net.connect(lid, rid); err != nil { + if err = net.connectNotConnected(lid, rid); err != nil { return err } } @@ -74,12 +83,19 @@ func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) { // ConnectNodesChain connects all nodes in a chain topology. // If ids argument is nil, all nodes that are up will be connected. func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + + return net.connectNodesChain(ids) +} + +func (net *Network) connectNodesChain(ids []enode.ID) (err error) { if ids == nil { ids = net.getUpNodeIDs() } l := len(ids) for i := 0; i < l-1; i++ { - if err := net.connect(ids[i], ids[i+1]); err != nil { + if err := net.connectNotConnected(ids[i], ids[i+1]); err != nil { return err } } @@ -89,6 +105,9 @@ func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) { // ConnectNodesRing connects all nodes in a ring topology. // If ids argument is nil, all nodes that are up will be connected. func (net *Network) ConnectNodesRing(ids []enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + if ids == nil { ids = net.getUpNodeIDs() } @@ -96,15 +115,18 @@ func (net *Network) ConnectNodesRing(ids []enode.ID) (err error) { if l < 2 { return nil } - if err := net.ConnectNodesChain(ids); err != nil { + if err := net.connectNodesChain(ids); err != nil { return err } - return net.connect(ids[l-1], ids[0]) + return net.connectNotConnected(ids[l-1], ids[0]) } // ConnectNodesStar connects all nodes into a star topology // If ids argument is nil, all nodes that are up will be connected. func (net *Network) ConnectNodesStar(ids []enode.ID, center enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + if ids == nil { ids = net.getUpNodeIDs() } @@ -112,16 +134,15 @@ func (net *Network) ConnectNodesStar(ids []enode.ID, center enode.ID) (err error if center == id { continue } - if err := net.connect(center, id); err != nil { + if err := net.connectNotConnected(center, id); err != nil { return err } } return nil } -// connect connects two nodes but ignores already connected error. -func (net *Network) connect(oneID, otherID enode.ID) error { - return ignoreAlreadyConnectedErr(net.Connect(oneID, otherID)) +func (net *Network) connectNotConnected(oneID, otherID enode.ID) error { + return ignoreAlreadyConnectedErr(net.connect(oneID, otherID)) } func ignoreAlreadyConnectedErr(err error) error { diff --git a/p2p/simulations/events.go b/p2p/simulations/events.go index 9b2a990e0..984c2e088 100644 --- a/p2p/simulations/events.go +++ b/p2p/simulations/events.go @@ -100,7 +100,7 @@ func ControlEvent(v interface{}) *Event { func (e *Event) String() string { switch e.Type { case EventTypeNode: - return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up) + return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up()) case EventTypeConn: return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up) case EventTypeMsg: diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index c0a5acb3d..ed43c0ed7 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -421,14 +421,15 @@ type expectEvents struct { } func (t *expectEvents) nodeEvent(id string, up bool) *Event { + node := Node{ + Config: &adapters.NodeConfig{ + ID: enode.HexID(id), + }, + up: up, + } return &Event{ Type: EventTypeNode, - Node: &Node{ - Config: &adapters.NodeConfig{ - ID: enode.HexID(id), - }, - Up: up, - }, + Node: &node, } } @@ -480,6 +481,7 @@ loop: } func (t *expectEvents) expect(events ...*Event) { + t.Helper() timeout := time.After(10 * time.Second) i := 0 for { @@ -501,8 +503,8 @@ func (t *expectEvents) expect(events ...*Event) { if event.Node.ID() != expected.Node.ID() { t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString()) } - if event.Node.Up != expected.Node.Up { - t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + if event.Node.Up() != expected.Node.Up() { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up()) } case EventTypeConn: diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go index 192be1732..069040257 100644 --- a/p2p/simulations/mocker_test.go +++ b/p2p/simulations/mocker_test.go @@ -90,15 +90,12 @@ func TestMocker(t *testing.T) { for { select { case event := <-events: - //if the event is a node Up event only - if event.Node != nil && event.Node.Up { + if isNodeUp(event) { //add the correspondent node ID to the map nodemap[event.Node.Config.ID] = true //this means all nodes got a nodeUp event, so we can continue the test if len(nodemap) == nodeCount { nodesComplete = true - //wait for 3s as the mocker will need time to connect the nodes - //time.Sleep( 3 *time.Second) } } else if event.Conn != nil && nodesComplete { connCount += 1 @@ -169,3 +166,7 @@ func TestMocker(t *testing.T) { t.Fatalf("Expected empty list of nodes, got: %d", len(nodesInfo)) } } + +func isNodeUp(event *Event) bool { + return event.Node != nil && event.Node.Up() +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index a0e621b88..2049a5108 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -136,7 +136,7 @@ func (net *Network) Config() *NetworkConfig { // StartAll starts all nodes in the network func (net *Network) StartAll() error { for _, node := range net.Nodes { - if node.Up { + if node.Up() { continue } if err := net.Start(node.ID()); err != nil { @@ -149,7 +149,7 @@ func (net *Network) StartAll() error { // StopAll stops all nodes in the network func (net *Network) StopAll() error { for _, node := range net.Nodes { - if !node.Up { + if !node.Up() { continue } if err := net.Stop(node.ID()); err != nil { @@ -168,27 +168,23 @@ func (net *Network) Start(id enode.ID) error { // snapshots func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error { net.lock.Lock() + defer net.lock.Unlock() node := net.getNode(id) if node == nil { - net.lock.Unlock() return fmt.Errorf("node %v does not exist", id) } - if node.Up { - net.lock.Unlock() + if node.Up() { return fmt.Errorf("node %v already up", id) } log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name()) if err := node.Start(snapshots); err != nil { - net.lock.Unlock() log.Warn("Node startup failed", "id", id, "err", err) return err } - node.Up = true + node.SetUp(true) log.Info("Started node", "id", id) ev := NewEvent(node) - net.lock.Unlock() - net.events.Send(ev) // subscribe to peer events @@ -219,7 +215,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub if node == nil { return } - node.Up = false + node.SetUp(false) ev := NewEvent(node) net.events.Send(ev) }() @@ -257,30 +253,42 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub // Stop stops the node with the given ID func (net *Network) Stop(id enode.ID) error { - net.lock.Lock() - node := net.getNode(id) - if node == nil { - net.lock.Unlock() - return fmt.Errorf("node %v does not exist", id) - } - if !node.Up { - net.lock.Unlock() - return fmt.Errorf("node %v already down", id) + // IMPORTANT: node.Stop() must NOT be called under net.lock as + // node.Reachable() closure has a reference to the network and + // calls net.InitConn() what also locks the network. => DEADLOCK + // That holds until the following ticket is not resolved: + + var err error + + node, err := func() (*Node, error) { + net.lock.Lock() + defer net.lock.Unlock() + + node := net.getNode(id) + if node == nil { + return nil, fmt.Errorf("node %v does not exist", id) + } + if !node.Up() { + return nil, fmt.Errorf("node %v already down", id) + } + node.SetUp(false) + return node, nil + }() + if err != nil { + return err } - node.Up = false - net.lock.Unlock() - err := node.Stop() + err = node.Stop() // must be called without net.lock + + net.lock.Lock() + defer net.lock.Unlock() + if err != nil { - net.lock.Lock() - node.Up = true - net.lock.Unlock() + node.SetUp(true) return err } log.Info("Stopped node", "id", id, "err", err) - net.lock.Lock() ev := ControlEvent(node) - net.lock.Unlock() net.events.Send(ev) return nil } @@ -288,8 +296,14 @@ func (net *Network) Stop(id enode.ID) error { // Connect connects two nodes together by calling the "admin_addPeer" RPC // method on the "one" node so that it connects to the "other" node func (net *Network) Connect(oneID, otherID enode.ID) error { + net.lock.Lock() + defer net.lock.Unlock() + return net.connect(oneID, otherID) +} + +func (net *Network) connect(oneID, otherID enode.ID) error { log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID) - conn, err := net.InitConn(oneID, otherID) + conn, err := net.initConn(oneID, otherID) if err != nil { return err } @@ -387,6 +401,14 @@ func (net *Network) GetNode(id enode.ID) *Node { return net.getNode(id) } +func (net *Network) getNode(id enode.ID) *Node { + i, found := net.nodeMap[id] + if !found { + return nil + } + return net.Nodes[i] +} + // GetNode gets the node with the given name, returning nil if the node does // not exist func (net *Network) GetNodeByName(name string) *Node { @@ -409,28 +431,29 @@ func (net *Network) GetNodes() (nodes []*Node) { net.lock.RLock() defer net.lock.RUnlock() - nodes = append(nodes, net.Nodes...) - return nodes + return net.getNodes() } -func (net *Network) getNode(id enode.ID) *Node { - i, found := net.nodeMap[id] - if !found { - return nil - } - return net.Nodes[i] +func (net *Network) getNodes() (nodes []*Node) { + nodes = append(nodes, net.Nodes...) + return nodes } // GetRandomUpNode returns a random node on the network, which is running. func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node { net.lock.RLock() defer net.lock.RUnlock() + return net.getRandomUpNode(excludeIDs...) +} + +// GetRandomUpNode returns a random node on the network, which is running. +func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node { return net.getRandomNode(net.getUpNodeIDs(), excludeIDs) } func (net *Network) getUpNodeIDs() (ids []enode.ID) { for _, node := range net.Nodes { - if node.Up { + if node.Up() { ids = append(ids, node.ID()) } } @@ -445,8 +468,8 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node { } func (net *Network) getDownNodeIDs() (ids []enode.ID) { - for _, node := range net.GetNodes() { - if !node.Up { + for _, node := range net.getNodes() { + if !node.Up() { ids = append(ids, node.ID()) } } @@ -538,6 +561,10 @@ func (net *Network) getConn(oneID, otherID enode.ID) *Conn { func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) { net.lock.Lock() defer net.lock.Unlock() + return net.initConn(oneID, otherID) +} + +func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) { if oneID == otherID { return nil, fmt.Errorf("refusing to connect to self %v", oneID) } @@ -595,8 +622,21 @@ type Node struct { // Config if the config used to created the node Config *adapters.NodeConfig `json:"config"` - // Up tracks whether or not the node is running - Up bool `json:"up"` + // up tracks whether or not the node is running + up bool + upMu sync.RWMutex +} + +func (n *Node) Up() bool { + n.upMu.RLock() + defer n.upMu.RUnlock() + return n.up +} + +func (n *Node) SetUp(up bool) { + n.upMu.Lock() + defer n.upMu.Unlock() + n.up = up } // ID returns the ID of the node @@ -630,10 +670,29 @@ func (n *Node) MarshalJSON() ([]byte, error) { }{ Info: n.NodeInfo(), Config: n.Config, - Up: n.Up, + Up: n.Up(), }) } +// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose +// Node.up status. IMPORTANT: The implementation is incomplete; we lose +// p2p.NodeInfo. +func (n *Node) UnmarshalJSON(raw []byte) error { + // TODO: How should we turn back NodeInfo into n.Node? + // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177 + node := struct { + Config *adapters.NodeConfig `json:"config,omitempty"` + Up bool `json:"up"` + }{} + if err := json.Unmarshal(raw, &node); err != nil { + return err + } + + n.SetUp(node.Up) + n.Config = node.Config + return nil +} + // Conn represents a connection between two nodes in the network type Conn struct { // One is the node which initiated the connection @@ -653,10 +712,10 @@ type Conn struct { // nodesUp returns whether both nodes are currently up func (c *Conn) nodesUp() error { - if !c.one.Up { + if !c.one.Up() { return fmt.Errorf("one %v is not up", c.One) } - if !c.other.Up { + if !c.other.Up() { return fmt.Errorf("other %v is not up", c.Other) } return nil @@ -728,7 +787,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn } for i, node := range net.Nodes { snap.Nodes[i] = NodeSnapshot{Node: *node} - if !node.Up { + if !node.Up() { continue } snapshots, err := node.Snapshots() @@ -783,7 +842,7 @@ func (net *Network) Load(snap *Snapshot) error { if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil { return err } - if !n.Node.Up { + if !n.Node.Up() { continue } if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { @@ -855,7 +914,7 @@ func (net *Network) Load(snap *Snapshot) error { // Start connecting. for _, conn := range snap.Conns { - if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up { + if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() { //in this case, at least one of the nodes of a connection is not up, //so it would result in the snapshot `Load` to fail continue @@ -909,7 +968,7 @@ func (net *Network) executeControlEvent(event *Event) { } func (net *Network) executeNodeEvent(e *Event) error { - if !e.Node.Up { + if !e.Node.Up() { return net.Stop(e.Node.ID()) } diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index b7852addb..8b644ffb0 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "reflect" "strconv" "strings" "testing" @@ -485,3 +486,137 @@ func benchmarkMinimalServiceTmp(b *testing.B) { } } } + +func TestNode_UnmarshalJSON(t *testing.T) { + t.Run( + "test unmarshal of Node up field", + func(t *testing.T) { + runNodeUnmarshalJSON(t, casesNodeUnmarshalJSONUpField()) + }, + ) + t.Run( + "test unmarshal of Node Config field", + func(t *testing.T) { + runNodeUnmarshalJSON(t, casesNodeUnmarshalJSONConfigField()) + }, + ) +} + +func runNodeUnmarshalJSON(t *testing.T, tests []nodeUnmarshalTestCase) { + t.Helper() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got Node + if err := got.UnmarshalJSON([]byte(tt.marshaled)); err != nil { + expectErrorMessageToContain(t, err, tt.wantErr) + } + expectNodeEquality(t, got, tt.want) + }) + } +} + +type nodeUnmarshalTestCase struct { + name string + marshaled string + want Node + wantErr string +} + +func expectErrorMessageToContain(t *testing.T, got error, want string) { + t.Helper() + if got == nil && want == "" { + return + } + + if got == nil && want != "" { + t.Errorf("error was expected, got: nil, want: %v", want) + return + } + + if !strings.Contains(got.Error(), want) { + t.Errorf( + "unexpected error message, got %v, want: %v", + want, + got, + ) + } +} + +func expectNodeEquality(t *testing.T, got Node, want Node) { + t.Helper() + if !reflect.DeepEqual(got, want) { + t.Errorf("Node.UnmarshalJSON() = %v, want %v", got, want) + } +} + +func casesNodeUnmarshalJSONUpField() []nodeUnmarshalTestCase { + return []nodeUnmarshalTestCase{ + { + name: "empty json", + marshaled: "{}", + want: Node{ + up: false, + }, + }, + { + name: "a stopped node", + marshaled: "{\"up\": false}", + want: Node{ + up: false, + }, + }, + { + name: "a running node", + marshaled: "{\"up\": true}", + want: Node{ + up: true, + }, + }, + { + name: "invalid JSON value on valid key", + marshaled: "{\"up\": foo}", + wantErr: "invalid character", + }, + { + name: "invalid JSON key and value", + marshaled: "{foo: bar}", + wantErr: "invalid character", + }, + { + name: "bool value expected but got something else (string)", + marshaled: "{\"up\": \"true\"}", + wantErr: "cannot unmarshal string into Go struct", + }, + } +} + +func casesNodeUnmarshalJSONConfigField() []nodeUnmarshalTestCase { + // Don't do a big fuss around testing, as adapters.NodeConfig should + // handle it's own serialization. Just do a sanity check. + return []nodeUnmarshalTestCase{ + { + name: "Config field is omitted", + marshaled: "{}", + want: Node{ + Config: nil, + }, + }, + { + name: "Config field is nil", + marshaled: "{\"config\": nil}", + want: Node{ + Config: nil, + }, + }, + { + name: "a non default Config field", + marshaled: "{\"config\":{\"name\":\"node_ecdd0\",\"port\":44665}}", + want: Node{ + Config: &adapters.NodeConfig{ + Name: "node_ecdd0", + Port: 44665, + }, + }, + }, + } +} |