From 8335eac4a488716c396f114cbe7522919b97e224 Mon Sep 17 00:00:00 2001 From: Sonic Date: Tue, 25 Sep 2018 20:37:11 +0800 Subject: dex: redesign p2p network topology - Let p2p server support direct connection and group connection. - Introduce node meta table to maintain IP of all nodes in node set, in memory and let nodes in the network can sync this table. - Let peerSet able to manage direct connections to notary set and dkg set. The mechanism to refresh the network topology when configuration round change is not done yet. --- p2p/dial_test.go | 217 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) (limited to 'p2p/dial_test.go') diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 3805ff690..f9122de6f 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -525,6 +525,223 @@ func TestDialStateStaticDial(t *testing.T) { }) } +func TestDialStateDirectDial(t *testing.T) { + wantDirect := []*enode.Node{ + newNode(uintID(1), nil), + newNode(uintID(2), nil), + newNode(uintID(3), nil), + newNode(uintID(4), nil), + newNode(uintID(5), nil), + } + init := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + for _, node := range wantDirect { + init.addDirect(node) + } + + runDialTest(t, dialtest{ + init: init, + rounds: []round{ + // Direct dials are launched for the nodes that + // aren't yet connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(3), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + }, + // No new tasks are launched in this round because all direct + // nodes are either connected or still being dialed. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + }, + done: []task{ + &dialTask{flags: staticDialedConn, dest: newNode(uintID(3), nil)}, + }, + }, + // No new dial tasks are launched because all direct + // nodes are now connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + done: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + new: []task{ + &waitExpireTask{Duration: 14 * time.Second}, + }, + }, + // Wait a round for dial history to expire, no new tasks should spawn. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + }, + // If a direct node is dropped, it should be immediately redialed, + // irrespective whether it was originally static or dynamic. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(2), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + }, + }, + }, + }) +} + +func TestDialStateGroupDial(t *testing.T) { + groups := []*dialGroup{ + &dialGroup{ + name: "g1", + nodes: map[enode.ID]*enode.Node{ + uintID(1): newNode(uintID(1), nil), + uintID(2): newNode(uintID(2), nil), + }, + num: 2, + }, + &dialGroup{ + name: "g2", + nodes: map[enode.ID]*enode.Node{ + uintID(2): newNode(uintID(2), nil), + uintID(3): newNode(uintID(3), nil), + uintID(4): newNode(uintID(4), nil), + uintID(5): newNode(uintID(5), nil), + uintID(6): newNode(uintID(6), nil), + }, + num: 2, + }, + } + + type groupTest struct { + peers []*Peer + dialing map[enode.ID]connFlag + ceiling map[string]uint64 + } + + tests := []groupTest{ + { + peers: nil, + dialing: map[enode.ID]connFlag{}, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 2}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(2): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: nil, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + uintID(2): staticDialedConn, + uintID(3): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + } + + pm := func(ps []*Peer) map[enode.ID]*Peer { + m := make(map[enode.ID]*Peer) + for _, p := range ps { + m[p.rw.node.ID()] = p + } + return m + } + + run := func(i int, tt groupTest) { + d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + d.dialing = make(map[enode.ID]connFlag) + for k, v := range tt.dialing { + d.dialing[k] = v + } + + for _, g := range groups { + d.addGroup(g) + } + peermap := pm(tt.peers) + new := d.newTasks(len(tt.dialing), peermap, time.Now()) + + cnt := map[string]uint64{} + for id := range peermap { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for id := range tt.dialing { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, task := range new { + id := task.(*dialTask).dest.ID() + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, g := range groups { + if cnt[g.name] < g.num { + t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)", + i, g.name, cnt[g.name], g.num) + } + if cnt[g.name] > tt.ceiling[g.name] { + t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)", + i, g.name, cnt[g.name], tt.ceiling[g.name]) + } + } + } + + for i, tt := range tests { + run(i, tt) + } +} + // This test checks that static peers will be redialed immediately if they were re-added to a static list. func TestDialStaticAfterReset(t *testing.T) { wantStatic := []*enode.Node{ -- cgit v1.2.3