diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 20:37:11 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:49 +0800 |
commit | 8335eac4a488716c396f114cbe7522919b97e224 (patch) | |
tree | c680248b9a7346e63ddde403689c2a484a43c4b4 /p2p | |
parent | 6f442cd7793daad014aa0d55b3b7320392c22f02 (diff) | |
download | go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.tar go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.tar.gz go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.tar.bz2 go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.tar.lz go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.tar.xz go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.tar.zst go-tangerine-8335eac4a488716c396f114cbe7522919b97e224.zip |
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection.
- Introduce node meta table to maintain IP of all nodes in node set,
in memory and let nodes in the network can sync this table.
- Let peerSet able to manage direct connections to notary set and dkg set.
The mechanism to refresh the network topology when configuration round
change is not done yet.
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/dial.go | 86 | ||||
-rw-r--r-- | p2p/dial_test.go | 217 | ||||
-rw-r--r-- | p2p/discover/table.go | 12 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 14 | ||||
-rw-r--r-- | p2p/discover/table_util_test.go | 4 | ||||
-rw-r--r-- | p2p/peer.go | 2 | ||||
-rw-r--r-- | p2p/server.go | 271 | ||||
-rw-r--r-- | p2p/server_test.go | 192 |
8 files changed, 591 insertions, 207 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index 9b24ed96a..909bed863 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -64,6 +64,12 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) { return t.Dialer.Dial("tcp", addr.String()) } +type dialGroup struct { + name string + nodes map[enode.ID]*enode.Node + num uint64 +} + // dialstate schedules dials and discovery lookups. // it get's a chance to compute new tasks on every iteration // of the main loop in Server.run. @@ -78,6 +84,8 @@ type dialstate struct { lookupBuf []*enode.Node // current discovery lookup results randomNodes []*enode.Node // filled from Table static map[enode.ID]*dialTask + direct map[enode.ID]*dialTask + group map[string]*dialGroup hist *dialHistory start time.Time // time when the dialer was first used @@ -85,6 +93,7 @@ type dialstate struct { } type discoverTable interface { + Self() *enode.Node Close() Resolve(*enode.Node) *enode.Node LookupRandom() []*enode.Node @@ -133,6 +142,8 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node, self: self, netrestrict: netrestrict, static: make(map[enode.ID]*dialTask), + direct: make(map[enode.ID]*dialTask), + group: make(map[string]*dialGroup), dialing: make(map[enode.ID]connFlag), bootnodes: make([]*enode.Node, len(bootnodes)), randomNodes: make([]*enode.Node, maxdyn/2), @@ -159,6 +170,23 @@ func (s *dialstate) removeStatic(n *enode.Node) { s.hist.remove(n.ID()) } +func (s *dialstate) addDirect(n *enode.Node) { + s.direct[n.ID()] = &dialTask{flags: directDialedConn, dest: n} +} + +func (s *dialstate) removeDirect(n *enode.Node) { + delete(s.direct, n.ID()) + s.hist.remove(n.ID()) +} + +func (s *dialstate) addGroup(g *dialGroup) { + s.group[g.name] = g +} + +func (s *dialstate) removeGroup(g *dialGroup) { + delete(s.group, g.name) +} + func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task { if s.start.IsZero() { s.start = now @@ -196,13 +224,69 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: - log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) + log.Warn("Removing static dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) delete(s.static, t.dest.ID()) case nil: s.dialing[id] = t.flags newtasks = append(newtasks, t) } } + + for id, t := range s.direct { + err := s.checkDial(t.dest, peers) + switch err { + case errNotWhitelisted, errSelf: + log.Warn("Removing direct dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) + delete(s.direct, t.dest.ID()) + case nil: + s.dialing[id] = t.flags + newtasks = append(newtasks, t) + } + } + + // compute connected + connected := map[string]map[enode.ID]struct{}{} + for _, g := range s.group { + connected[g.name] = map[enode.ID]struct{}{} + } + + for id := range peers { + for _, g := range s.group { + if _, ok := g.nodes[id]; ok { + connected[g.name][id] = struct{}{} + } + } + } + + for id := range s.dialing { + for _, g := range s.group { + if _, ok := g.nodes[id]; ok { + connected[g.name][id] = struct{}{} + } + } + } + + groupNodes := map[enode.ID]*enode.Node{} + for _, g := range s.group { + for _, n := range g.nodes { + if uint64(len(connected[g.name])) >= g.num { + break + } + err := s.checkDial(n, peers) + switch err { + case errNotWhitelisted, errSelf: + log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err) + delete(g.nodes, n.ID()) + case nil: + groupNodes[n.ID()] = n + connected[g.name][n.ID()] = struct{}{} + } + } + } + for _, n := range groupNodes { + addDial(groupDialedConn, n) + } + // If we don't have any peers whatsoever, try to dial a random bootnode. This // scenario is useful for the testnet (and private networks) where the discovery // table might be full of mostly bad peers, making it hard to find good ones. diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 3805ff690..f9122de6f 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -525,6 +525,223 @@ func TestDialStateStaticDial(t *testing.T) { }) } +func TestDialStateDirectDial(t *testing.T) { + wantDirect := []*enode.Node{ + newNode(uintID(1), nil), + newNode(uintID(2), nil), + newNode(uintID(3), nil), + newNode(uintID(4), nil), + newNode(uintID(5), nil), + } + init := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + for _, node := range wantDirect { + init.addDirect(node) + } + + runDialTest(t, dialtest{ + init: init, + rounds: []round{ + // Direct dials are launched for the nodes that + // aren't yet connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(3), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + }, + // No new tasks are launched in this round because all direct + // nodes are either connected or still being dialed. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + }, + done: []task{ + &dialTask{flags: staticDialedConn, dest: newNode(uintID(3), nil)}, + }, + }, + // No new dial tasks are launched because all direct + // nodes are now connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + done: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + new: []task{ + &waitExpireTask{Duration: 14 * time.Second}, + }, + }, + // Wait a round for dial history to expire, no new tasks should spawn. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + }, + // If a direct node is dropped, it should be immediately redialed, + // irrespective whether it was originally static or dynamic. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(2), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + }, + }, + }, + }) +} + +func TestDialStateGroupDial(t *testing.T) { + groups := []*dialGroup{ + &dialGroup{ + name: "g1", + nodes: map[enode.ID]*enode.Node{ + uintID(1): newNode(uintID(1), nil), + uintID(2): newNode(uintID(2), nil), + }, + num: 2, + }, + &dialGroup{ + name: "g2", + nodes: map[enode.ID]*enode.Node{ + uintID(2): newNode(uintID(2), nil), + uintID(3): newNode(uintID(3), nil), + uintID(4): newNode(uintID(4), nil), + uintID(5): newNode(uintID(5), nil), + uintID(6): newNode(uintID(6), nil), + }, + num: 2, + }, + } + + type groupTest struct { + peers []*Peer + dialing map[enode.ID]connFlag + ceiling map[string]uint64 + } + + tests := []groupTest{ + { + peers: nil, + dialing: map[enode.ID]connFlag{}, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 2}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(2): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: nil, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + uintID(2): staticDialedConn, + uintID(3): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + } + + pm := func(ps []*Peer) map[enode.ID]*Peer { + m := make(map[enode.ID]*Peer) + for _, p := range ps { + m[p.rw.node.ID()] = p + } + return m + } + + run := func(i int, tt groupTest) { + d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + d.dialing = make(map[enode.ID]connFlag) + for k, v := range tt.dialing { + d.dialing[k] = v + } + + for _, g := range groups { + d.addGroup(g) + } + peermap := pm(tt.peers) + new := d.newTasks(len(tt.dialing), peermap, time.Now()) + + cnt := map[string]uint64{} + for id := range peermap { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for id := range tt.dialing { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, task := range new { + id := task.(*dialTask).dest.ID() + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, g := range groups { + if cnt[g.name] < g.num { + t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)", + i, g.name, cnt[g.name], g.num) + } + if cnt[g.name] > tt.ceiling[g.name] { + t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)", + i, g.name, cnt[g.name], tt.ceiling[g.name]) + } + } + } + + for i, tt := range tests { + run(i, tt) + } +} + // This test checks that static peers will be redialed immediately if they were re-added to a static list. func TestDialStaticAfterReset(t *testing.T) { wantStatic := []*enode.Node{ diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 25ea7b0b2..e8a219871 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -127,7 +127,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error return tab, nil } -func (tab *Table) self() *enode.Node { +func (tab *Table) Self() *enode.Node { return tab.net.self() } @@ -258,7 +258,7 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node { ) // don't query further if we hit ourself. // unlikely to happen often in practice. - asked[tab.self().ID()] = true + asked[tab.Self().ID()] = true for { tab.mutex.Lock() @@ -419,7 +419,7 @@ func (tab *Table) doRefresh(done chan struct{}) { // Run self lookup to discover new neighbor nodes. // We can only do this if we have a secp256k1 identity. var key ecdsa.PublicKey - if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil { + if err := tab.Self().Load((*enode.Secp256k1)(&key)); err == nil { tab.lookup(encodePubkey(&key), false) } @@ -544,7 +544,7 @@ func (tab *Table) len() (n int) { // bucket returns the bucket for the given node ID hash. func (tab *Table) bucket(id enode.ID) *bucket { - d := enode.LogDist(tab.self().ID(), id) + d := enode.LogDist(tab.Self().ID(), id) if d <= bucketMinDistance { return tab.buckets[0] } @@ -557,7 +557,7 @@ func (tab *Table) bucket(id enode.ID) *bucket { // // The caller must not hold tab.mutex. func (tab *Table) addSeenNode(n *node) { - if n.ID() == tab.self().ID() { + if n.ID() == tab.Self().ID() { return } @@ -599,7 +599,7 @@ func (tab *Table) addVerifiedNode(n *node) { if !tab.isInitDone() { return } - if n.ID() == tab.self().ID() { + if n.ID() == tab.Self().ID() { return } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 2321e743f..8947a074e 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -147,7 +147,7 @@ func TestTable_IPLimit(t *testing.T) { defer tab.Close() for i := 0; i < tableIPLimit+1; i++ { - n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)}) + n := nodeAtDistance(tab.Self().ID(), i, net.IP{172, 0, 1, byte(i)}) tab.addSeenNode(n) } if tab.len() > tableIPLimit { @@ -165,7 +165,7 @@ func TestTable_BucketIPLimit(t *testing.T) { d := 3 for i := 0; i < bucketIPLimit+1; i++ { - n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)}) + n := nodeAtDistance(tab.Self().ID(), d, net.IP{172, 0, 1, byte(i)}) tab.addSeenNode(n) } if tab.len() > bucketIPLimit { @@ -264,7 +264,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) { for i := 0; i < len(buf); i++ { ld := cfg.Rand.Intn(len(tab.buckets)) - fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))}) + fillTable(tab, []*node{nodeAtDistance(tab.Self().ID(), ld, intIP(ld))}) } gotN := tab.ReadRandomNodes(buf) if gotN != tab.len() { @@ -312,8 +312,8 @@ func TestTable_addVerifiedNode(t *testing.T) { defer tab.Close() // Insert two nodes. - n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) - n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addSeenNode(n1) tab.addSeenNode(n2) @@ -344,8 +344,8 @@ func TestTable_addSeenNode(t *testing.T) { defer tab.Close() // Insert two nodes. - n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) - n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addSeenNode(n1) tab.addSeenNode(n2) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 20238aabc..7e149b22c 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -75,10 +75,10 @@ func intIP(i int) net.IP { // fillBucket inserts nodes into the given bucket until it is full. func fillBucket(tab *Table, n *node) (last *node) { - ld := enode.LogDist(tab.self().ID(), n.ID()) + ld := enode.LogDist(tab.Self().ID(), n.ID()) b := tab.bucket(n.ID()) for len(b.entries) < bucketSize { - b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld))) + b.entries = append(b.entries, nodeAtDistance(tab.Self().ID(), ld, intIP(ld))) } return b.entries[bucketSize-1] } diff --git a/p2p/peer.go b/p2p/peer.go index 3c75d7dd5..2c357fdc9 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -434,7 +434,6 @@ type PeerInfo struct { RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection Inbound bool `json:"inbound"` Trusted bool `json:"trusted"` - Notary bool `json:"notary"` Static bool `json:"static"` } `json:"network"` Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields @@ -459,7 +458,6 @@ func (p *Peer) Info() *PeerInfo { info.Network.RemoteAddress = p.RemoteAddr().String() info.Network.Inbound = p.rw.is(inboundConn) info.Network.Trusted = p.rw.is(trustedConn) - info.Network.Notary = p.rw.is(notaryConn) info.Network.Static = p.rw.is(staticDialedConn) // Gather all the running protocol infos diff --git a/p2p/server.go b/p2p/server.go index c3b3a00d4..a58673342 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -33,13 +33,13 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/discv5" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/p2p/nat" "github.com/dexon-foundation/dexon/p2p/netutil" "github.com/dexon-foundation/dexon/rlp" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -178,10 +178,12 @@ type Server struct { quit chan struct{} addstatic chan *enode.Node removestatic chan *enode.Node + adddirect chan *enode.Node + removedirect chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node - addnotary chan *enode.Node - removenotary chan *enode.Node + addgroup chan *dialGroup + removegroup chan *dialGroup posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -203,9 +205,10 @@ type connFlag int32 const ( dynDialedConn connFlag = 1 << iota staticDialedConn + directDialedConn + groupDialedConn inboundConn trustedConn - notaryConn ) // conn wraps a network connection with information gathered @@ -254,12 +257,15 @@ func (f connFlag) String() string { if f&staticDialedConn != 0 { s += "-staticdial" } + if f&directDialedConn != 0 { + s += "-directdial" + } + if f&groupDialedConn != 0 { + s += "-groupdial" + } if f&inboundConn != 0 { s += "-inbound" } - if f¬aryConn != 0 { - s += "-notary" - } if s != "" { s = s[1:] } @@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) { } } -// AddTrustedPeer adds the given node to a reserved whitelist which allows the -// node to always connect, even if the slot are full. -func (srv *Server) AddTrustedPeer(node *enode.Node) { +// AddDirectPeer connects to the given node and maintains the connection until the +// server is shut down. If the connection fails for any reason, the server will +// attempt to reconnect the peer. +func (srv *Server) AddDirectPeer(node *enode.Node) { select { - case srv.addtrusted <- node: + case srv.adddirect <- node: case <-srv.quit: } } -// RemoveTrustedPeer removes the given node from the trusted peer set. -func (srv *Server) RemoveTrustedPeer(node *enode.Node) { +// RemoveDirectPeer disconnects from the given node +func (srv *Server) RemoveDirectPeer(node *enode.Node) { select { - case srv.removetrusted <- node: + case srv.removedirect <- node: case <-srv.quit: } } -// AddNotaryPeer connects to the given node and maintains the connection until the -// server is shut down. If the connection fails for any reason, the server will -// attempt to reconnect the peer. -// AddNotaryPeer also adds the given node to a reserved whitelist which allows the +func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) { + m := map[enode.ID]*enode.Node{} + for _, node := range nodes { + m[node.ID()] = node + } + g := &dialGroup{name: name, nodes: m, num: num} + select { + case srv.addgroup <- g: + case <-srv.quit: + } +} + +func (srv *Server) RemoveGroup(name string) { + g := &dialGroup{name: name} + select { + case srv.removegroup <- g: + case <-srv.quit: + } +} + +// AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. -func (srv *Server) AddNotaryPeer(node *discover.Node) { +func (srv *Server) AddTrustedPeer(node *enode.Node) { select { - case srv.addnotary <- node: + case srv.addtrusted <- node: case <-srv.quit: } } -// RemoveNotaryPeer disconnects from the given node. -// RemoveNotaryPeer also removes the given node from the notary peer set. -func (srv *Server) RemoveNotaryPeer(node *discover.Node) { +// RemoveTrustedPeer removes the given node from the trusted peer set. +func (srv *Server) RemoveTrustedPeer(node *enode.Node) { select { - case srv.removenotary <- node: + case srv.removetrusted <- node: case <-srv.quit: } } @@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node { return ln.Node() } +func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node { + // If the node is running but discovery is off, manually assemble the node infos. + if ntab == nil { + addr := srv.tcpAddr(listener) + return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0) + } + // Otherwise return the discovery node. + return ntab.Self() +} + +func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr { + addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}} + if listener == nil { + return addr // Inbound connections disabled, use zero address. + } + // Otherwise inject the listener address too. + if a, ok := listener.Addr().(*net.TCPAddr); ok { + addr = *a + } + if srv.NAT != nil { + if ip, err := srv.NAT.ExternalIP(); err == nil { + addr.IP = ip + } + } + if addr.IP.IsUnspecified() { + addr.IP = net.IP{127, 0, 0, 1} + } + return addr +} + // Stop terminates the server and all active peer connections. // It blocks until all active connections have been closed. func (srv *Server) Stop() { @@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) { srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *enode.Node) srv.removestatic = make(chan *enode.Node) + srv.adddirect = make(chan *enode.Node) + srv.removedirect = make(chan *enode.Node) + srv.addgroup = make(chan *dialGroup) + srv.removegroup = make(chan *dialGroup) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) - srv.addnotary = make(chan *enode.Node) - srv.removenotary = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -628,6 +683,10 @@ type dialer interface { taskDone(task, time.Time) addStatic(*enode.Node) removeStatic(*enode.Node) + addDirect(*enode.Node) + removeDirect(*enode.Node) + addGroup(*dialGroup) + removeGroup(*dialGroup) } func (srv *Server) run(dialstate dialer) { @@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) { defer srv.nodedb.Close() var ( - peers = make(map[enode.ID]*Peer) - inboundCount = 0 - trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) - notary = make(map[enode.ID]bool) - taskdone = make(chan task, maxActiveDialTasks) - runningTasks []task - queuedTasks []task // tasks that can't run yet + peers = make(map[enode.ID]*Peer) + inboundCount = 0 + trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + peerflags = make(map[enode.ID]connFlag) + groupRefCount = make(map[enode.ID]int32) + groups = make(map[string]*dialGroup) + taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. @@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) { } } + // remember and maintain the connection flags locally + setConnFlags := func(id enode.ID, f connFlag, val bool) { + if p, ok := peers[id]; ok { + p.rw.set(f, val) + } + if val { + peerflags[id] |= f + } else { + peerflags[id] &= ^f + } + if peerflags[id] == 0 { + delete(peerflags, id) + } + } + + // Put trusted nodes into a map to speed up checks. + // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. + for _, n := range srv.TrustedNodes { + setConnFlags(n.ID(), trustedConn, true) + } + + canDisconnect := func(p *Peer) bool { + f, ok := peerflags[p.ID()] + if ok && f != 0 { + return false + } + return !p.rw.is(dynDialedConn | inboundConn) + } + + removeGroup := func(g *dialGroup) { + if gg, ok := groups[g.name]; ok { + for id := range gg.nodes { + groupRefCount[id]-- + if groupRefCount[id] == 0 { + setConnFlags(id, groupDialedConn, false) + delete(groupRefCount, id) + } + } + } + } + + addGroup := func(g *dialGroup) { + if _, ok := groups[g.name]; ok { + removeGroup(groups[g.name]) + } + for id := range g.nodes { + groupRefCount[id]++ + if groupRefCount[id] > 0 { + setConnFlags(id, groupDialedConn, true) + } + } + groups[g.name] = g + } + running: for { scheduleTasks() @@ -693,63 +808,59 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, true) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, false) dialstate.removeStatic(n) - if p, ok := peers[n.ID()]; ok { + if p, ok := peers[n.ID()]; ok && canDisconnect(p) { p.Disconnect(DiscRequested) } + case n := <-srv.adddirect: + // This channel is used by AddDirectPeer to add to the + // ephemeral direct peer list. Add it to the dialer, + // it will keep the node connected. + srv.log.Trace("Adding direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, true) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, true) + } + dialstate.addDirect(n) + case n := <-srv.removedirect: + // This channel is used by RemoveDirectPeer to send a + // disconnect request to a peer and begin the + // stop keeping the node connected. + srv.log.Trace("Removing direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, false) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, false) + if !p.rw.is(trustedConn | groupDialedConn) { + p.Disconnect(DiscRequested) + } + } + dialstate.removeDirect(n) + case g := <-srv.addgroup: + srv.log.Trace("Adding group", "group", g) + addGroup(g) + dialstate.addGroup(g) + case g := <-srv.removegroup: + srv.log.Trace("Removing group", "group", g) + removeGroup(g) + dialstate.removeGroup(g) case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add an enode // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) - trusted[n.ID()] = true - // Mark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, true) - } + setConnFlags(n.ID(), trustedConn, true) case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - if _, ok := trusted[n.ID()]; ok { - delete(trusted, n.ID()) - } - // Unmark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, false) - } - case n := <-srv.addnotary: - // This channel is used by AddNotaryPeer to add to the - // ephemeral notary peer list. Add it to the dialer, - // it will keep the node connected. - srv.log.Trace("Adding notary node", "node", n) - notary[n.ID] = true - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, true) - } - dialstate.addStatic(n) - case n := <-srv.removenotary: - // This channel is used by RemoveNotaryPeer to send a - // disconnect request to a peer and begin the - // stop keeping the node connected. - srv.log.Trace("Removing notary node", "node", n) - if _, ok := notary[n.ID]; ok { - delete(notary, n.ID) - } - - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, false) - } - - dialstate.removeStatic(n) - if p, ok := peers[n.ID]; ok { - p.Disconnect(DiscRequested) - } + setConnFlags(n.ID(), trustedConn, false) case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -764,15 +875,9 @@ running: case c := <-srv.posthandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). - if trusted[c.node.ID()] { - // Ensure that the trusted flag is set before checking against MaxPeers. - c.flags |= trustedConn + if f, ok := peerflags[c.node.ID()]; ok { + c.flags |= f } - - if notary[c.id] { - c.flags |= notaryConn - } - // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): @@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected diff --git a/p2p/server_test.go b/p2p/server_test.go index b46240722..8bd113791 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,7 +27,6 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "golang.org/x/crypto/sha3" @@ -173,14 +172,10 @@ func TestServerDial(t *testing.T) { } // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags - // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags. // Particularly for race conditions on changing the flag state. if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted prematurely: %v", peer) } - if peer := srv.Peers()[0]; peer.Info().Network.Notary { - t.Errorf("peer is notary prematurely: %v", peer) - } done := make(chan bool) go func() { srv.AddTrustedPeer(node) @@ -192,14 +187,6 @@ func TestServerDial(t *testing.T) { t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) } - srv.AddNotaryPeer(node) - if peer := srv.Peers()[0]; !peer.Info().Network.Notary { - t.Errorf("peer is not notary after AddNotaryPeer: %v", peer) - } - srv.RemoveNotaryPeer(node) - if peer := srv.Peers()[0]; peer.Info().Network.Notary { - t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer) - } done <- true }() // Trigger potential race conditions @@ -216,70 +203,89 @@ func TestServerDial(t *testing.T) { } } -// TestNotaryPeer checks that the node is added to and remove from static when -// AddNotaryPeer and RemoveNotaryPeer is called. -func TestNotaryPeer(t *testing.T) { - var ( - returned = make(chan struct{}) - add, remove = make(chan *discover.Node), make(chan *discover.Node) - tg = taskgen{ - newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { - return []task{} - }, - doneFunc: func(t task) {}, - addFunc: func(n *discover.Node) { - add <- n - }, - removeFunc: func(n *discover.Node) { - remove <- n - }, - } - ) - +func TestServerPeerConnFlag(t *testing.T) { srv := &Server{ - Config: Config{MaxPeers: 10}, - quit: make(chan struct{}), - ntab: fakeTable{}, - addnotary: make(chan *discover.Node), - removenotary: make(chan *discover.Node), - running: true, - log: log.New(), + Config: Config{ + PrivateKey: newkey(), + MaxPeers: 10, + NoDial: true, + }, } - srv.loopWG.Add(1) - go func() { - srv.run(tg) - close(returned) - }() + if err := srv.Start(); err != nil { + t.Fatalf("could not start: %v", err) + } + defer srv.Stop() - notaryID := randomID() - go srv.AddNotaryPeer(&discover.Node{ID: notaryID}) + // inject a peer + key := newkey() + id := enode.PubkeyToIDV4(&key.PublicKey) + node := newNode(id, nil) + fd, _ := net.Pipe() + c := &conn{ + node: node, + fd: fd, + transport: newTestTransport(&key.PublicKey, fd), + flags: inboundConn, + cont: make(chan error), + } + if err := srv.checkpoint(c, srv.addpeer); err != nil { + t.Fatalf("could not add conn: %v", err) + } - select { - case n := <-add: - if n.ID != notaryID { - t.Errorf("node ID mismatched: got %s, want %s", - n.ID.String(), notaryID.String()) - } - case <-time.After(1 * time.Second): - t.Error("add static is not called within one second") + srv.AddTrustedPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn)) } - go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) - select { - case n := <-remove: - if n.ID != notaryID { - t.Errorf("node ID mismatched: got %s, want %s", - n.ID.String(), notaryID.String()) - } - case <-time.After(1 * time.Second): - t.Error("remove static is not called within one second") + srv.AddDirectPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn)) } - srv.Stop() - select { - case <-returned: - case <-time.After(500 * time.Millisecond): - t.Error("Server.run did not return within 500ms") + srv.AddGroup("g1", []*enode.Node{node}, 1) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) + } + + srv.AddGroup("g2", []*enode.Node{node}, 1) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) + } + + srv.RemoveTrustedPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn | directDialedConn)) + } + + srv.RemoveDirectPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn)) + } + + srv.RemoveGroup("g1") + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn)) + } + + srv.RemoveGroup("g2") + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != inboundConn { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, inboundConn) } } @@ -407,9 +413,6 @@ func TestServerManyTasks(t *testing.T) { type taskgen struct { newFunc func(running int, peers map[enode.ID]*Peer) []task doneFunc func(task) - - addFunc func(*discover.Node) - removeFunc func(*discover.Node) } func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task { @@ -418,11 +421,17 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) func (tg taskgen) taskDone(t task, now time.Time) { tg.doneFunc(t) } -func (tg taskgen) addStatic(n *enode.Node) { - tg.addFunc(n) +func (tg taskgen) addStatic(*enode.Node) { +} +func (tg taskgen) removeStatic(*enode.Node) { } -func (tg taskgen) removeStatic(n *enode.Node) { - tg.removeFunc(n) +func (tg taskgen) addDirect(*enode.Node) { +} +func (tg taskgen) removeDirect(*enode.Node) { +} +func (tg taskgen) addGroup(*dialGroup) { +} +func (tg taskgen) removeGroup(*dialGroup) { } type testTask struct { @@ -436,11 +445,10 @@ func (t *testTask) Do(srv *Server) { // This test checks that connections are disconnected // just after the encryption handshake when the server is -// at capacity. Trusted and Notary connections should still be accepted. +// at capacity. Trusted connections should still be accepted. func TestServerAtCap(t *testing.T) { trustedNode := newkey() trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey) - notaryID := randomID() srv := &Server{ Config: Config{ PrivateKey: newkey(), @@ -453,7 +461,6 @@ func TestServerAtCap(t *testing.T) { t.Fatalf("could not start: %v", err) } defer srv.Stop() - srv.AddNotaryPeer(&discover.Node{ID: notaryID}) newconn := func(id enode.ID) *conn { fd, _ := net.Pipe() @@ -484,15 +491,6 @@ func TestServerAtCap(t *testing.T) { t.Error("Server did not set trusted flag") } - // Try inserting a notary connection. - c = newconn(notaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != nil { - t.Error("unexpected error for notary conn @posthandshake:", err) - } - if !c.is(notaryConn) { - t.Error("Server did not set notary flag") - } - // Remove from trusted set and try again srv.RemoveTrustedPeer(newNode(trustedID, nil)) c = newconn(trustedID) @@ -509,24 +507,6 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } - - // Remove from notary set and try again - srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) - c = newconn(notaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { - t.Error("wrong error for insert:", err) - } - - // Add anotherID to notary set and try again - anotherNotaryID := randomID() - srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID}) - c = newconn(anotherNotaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != nil { - t.Error("unexpected error for notary conn @posthandshake:", err) - } - if !c.is(notaryConn) { - t.Error("Server did not set notary flag") - } } func TestServerPeerLimits(t *testing.T) { |