diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/dial.go | 86 | ||||
-rw-r--r-- | p2p/dial_test.go | 217 | ||||
-rw-r--r-- | p2p/discover/table.go | 12 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 14 | ||||
-rw-r--r-- | p2p/discover/table_util_test.go | 4 | ||||
-rw-r--r-- | p2p/peer.go | 2 | ||||
-rw-r--r-- | p2p/server.go | 271 | ||||
-rw-r--r-- | p2p/server_test.go | 192 |
8 files changed, 591 insertions, 207 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index 9b24ed96a..909bed863 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -64,6 +64,12 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) { return t.Dialer.Dial("tcp", addr.String()) } +type dialGroup struct { + name string + nodes map[enode.ID]*enode.Node + num uint64 +} + // dialstate schedules dials and discovery lookups. // it get's a chance to compute new tasks on every iteration // of the main loop in Server.run. @@ -78,6 +84,8 @@ type dialstate struct { lookupBuf []*enode.Node // current discovery lookup results randomNodes []*enode.Node // filled from Table static map[enode.ID]*dialTask + direct map[enode.ID]*dialTask + group map[string]*dialGroup hist *dialHistory start time.Time // time when the dialer was first used @@ -85,6 +93,7 @@ type dialstate struct { } type discoverTable interface { + Self() *enode.Node Close() Resolve(*enode.Node) *enode.Node LookupRandom() []*enode.Node @@ -133,6 +142,8 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node, self: self, netrestrict: netrestrict, static: make(map[enode.ID]*dialTask), + direct: make(map[enode.ID]*dialTask), + group: make(map[string]*dialGroup), dialing: make(map[enode.ID]connFlag), bootnodes: make([]*enode.Node, len(bootnodes)), randomNodes: make([]*enode.Node, maxdyn/2), @@ -159,6 +170,23 @@ func (s *dialstate) removeStatic(n *enode.Node) { s.hist.remove(n.ID()) } +func (s *dialstate) addDirect(n *enode.Node) { + s.direct[n.ID()] = &dialTask{flags: directDialedConn, dest: n} +} + +func (s *dialstate) removeDirect(n *enode.Node) { + delete(s.direct, n.ID()) + s.hist.remove(n.ID()) +} + +func (s *dialstate) addGroup(g *dialGroup) { + s.group[g.name] = g +} + +func (s *dialstate) removeGroup(g *dialGroup) { + delete(s.group, g.name) +} + func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task { if s.start.IsZero() { s.start = now @@ -196,13 +224,69 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: - log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) + log.Warn("Removing static dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) delete(s.static, t.dest.ID()) case nil: s.dialing[id] = t.flags newtasks = append(newtasks, t) } } + + for id, t := range s.direct { + err := s.checkDial(t.dest, peers) + switch err { + case errNotWhitelisted, errSelf: + log.Warn("Removing direct dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) + delete(s.direct, t.dest.ID()) + case nil: + s.dialing[id] = t.flags + newtasks = append(newtasks, t) + } + } + + // compute connected + connected := map[string]map[enode.ID]struct{}{} + for _, g := range s.group { + connected[g.name] = map[enode.ID]struct{}{} + } + + for id := range peers { + for _, g := range s.group { + if _, ok := g.nodes[id]; ok { + connected[g.name][id] = struct{}{} + } + } + } + + for id := range s.dialing { + for _, g := range s.group { + if _, ok := g.nodes[id]; ok { + connected[g.name][id] = struct{}{} + } + } + } + + groupNodes := map[enode.ID]*enode.Node{} + for _, g := range s.group { + for _, n := range g.nodes { + if uint64(len(connected[g.name])) >= g.num { + break + } + err := s.checkDial(n, peers) + switch err { + case errNotWhitelisted, errSelf: + log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err) + delete(g.nodes, n.ID()) + case nil: + groupNodes[n.ID()] = n + connected[g.name][n.ID()] = struct{}{} + } + } + } + for _, n := range groupNodes { + addDial(groupDialedConn, n) + } + // If we don't have any peers whatsoever, try to dial a random bootnode. This // scenario is useful for the testnet (and private networks) where the discovery // table might be full of mostly bad peers, making it hard to find good ones. diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 3805ff690..f9122de6f 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -525,6 +525,223 @@ func TestDialStateStaticDial(t *testing.T) { }) } +func TestDialStateDirectDial(t *testing.T) { + wantDirect := []*enode.Node{ + newNode(uintID(1), nil), + newNode(uintID(2), nil), + newNode(uintID(3), nil), + newNode(uintID(4), nil), + newNode(uintID(5), nil), + } + init := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + for _, node := range wantDirect { + init.addDirect(node) + } + + runDialTest(t, dialtest{ + init: init, + rounds: []round{ + // Direct dials are launched for the nodes that + // aren't yet connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(3), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + }, + // No new tasks are launched in this round because all direct + // nodes are either connected or still being dialed. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + }, + done: []task{ + &dialTask{flags: staticDialedConn, dest: newNode(uintID(3), nil)}, + }, + }, + // No new dial tasks are launched because all direct + // nodes are now connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + done: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + new: []task{ + &waitExpireTask{Duration: 14 * time.Second}, + }, + }, + // Wait a round for dial history to expire, no new tasks should spawn. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + }, + // If a direct node is dropped, it should be immediately redialed, + // irrespective whether it was originally static or dynamic. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(2), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + }, + }, + }, + }) +} + +func TestDialStateGroupDial(t *testing.T) { + groups := []*dialGroup{ + &dialGroup{ + name: "g1", + nodes: map[enode.ID]*enode.Node{ + uintID(1): newNode(uintID(1), nil), + uintID(2): newNode(uintID(2), nil), + }, + num: 2, + }, + &dialGroup{ + name: "g2", + nodes: map[enode.ID]*enode.Node{ + uintID(2): newNode(uintID(2), nil), + uintID(3): newNode(uintID(3), nil), + uintID(4): newNode(uintID(4), nil), + uintID(5): newNode(uintID(5), nil), + uintID(6): newNode(uintID(6), nil), + }, + num: 2, + }, + } + + type groupTest struct { + peers []*Peer + dialing map[enode.ID]connFlag + ceiling map[string]uint64 + } + + tests := []groupTest{ + { + peers: nil, + dialing: map[enode.ID]connFlag{}, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 2}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(2): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: nil, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + uintID(2): staticDialedConn, + uintID(3): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + } + + pm := func(ps []*Peer) map[enode.ID]*Peer { + m := make(map[enode.ID]*Peer) + for _, p := range ps { + m[p.rw.node.ID()] = p + } + return m + } + + run := func(i int, tt groupTest) { + d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + d.dialing = make(map[enode.ID]connFlag) + for k, v := range tt.dialing { + d.dialing[k] = v + } + + for _, g := range groups { + d.addGroup(g) + } + peermap := pm(tt.peers) + new := d.newTasks(len(tt.dialing), peermap, time.Now()) + + cnt := map[string]uint64{} + for id := range peermap { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for id := range tt.dialing { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, task := range new { + id := task.(*dialTask).dest.ID() + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, g := range groups { + if cnt[g.name] < g.num { + t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)", + i, g.name, cnt[g.name], g.num) + } + if cnt[g.name] > tt.ceiling[g.name] { + t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)", + i, g.name, cnt[g.name], tt.ceiling[g.name]) + } + } + } + + for i, tt := range tests { + run(i, tt) + } +} + // This test checks that static peers will be redialed immediately if they were re-added to a static list. func TestDialStaticAfterReset(t *testing.T) { wantStatic := []*enode.Node{ diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 25ea7b0b2..e8a219871 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -127,7 +127,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error return tab, nil } -func (tab *Table) self() *enode.Node { +func (tab *Table) Self() *enode.Node { return tab.net.self() } @@ -258,7 +258,7 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node { ) // don't query further if we hit ourself. // unlikely to happen often in practice. - asked[tab.self().ID()] = true + asked[tab.Self().ID()] = true for { tab.mutex.Lock() @@ -419,7 +419,7 @@ func (tab *Table) doRefresh(done chan struct{}) { // Run self lookup to discover new neighbor nodes. // We can only do this if we have a secp256k1 identity. var key ecdsa.PublicKey - if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil { + if err := tab.Self().Load((*enode.Secp256k1)(&key)); err == nil { tab.lookup(encodePubkey(&key), false) } @@ -544,7 +544,7 @@ func (tab *Table) len() (n int) { // bucket returns the bucket for the given node ID hash. func (tab *Table) bucket(id enode.ID) *bucket { - d := enode.LogDist(tab.self().ID(), id) + d := enode.LogDist(tab.Self().ID(), id) if d <= bucketMinDistance { return tab.buckets[0] } @@ -557,7 +557,7 @@ func (tab *Table) bucket(id enode.ID) *bucket { // // The caller must not hold tab.mutex. func (tab *Table) addSeenNode(n *node) { - if n.ID() == tab.self().ID() { + if n.ID() == tab.Self().ID() { return } @@ -599,7 +599,7 @@ func (tab *Table) addVerifiedNode(n *node) { if !tab.isInitDone() { return } - if n.ID() == tab.self().ID() { + if n.ID() == tab.Self().ID() { return } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 2321e743f..8947a074e 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -147,7 +147,7 @@ func TestTable_IPLimit(t *testing.T) { defer tab.Close() for i := 0; i < tableIPLimit+1; i++ { - n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)}) + n := nodeAtDistance(tab.Self().ID(), i, net.IP{172, 0, 1, byte(i)}) tab.addSeenNode(n) } if tab.len() > tableIPLimit { @@ -165,7 +165,7 @@ func TestTable_BucketIPLimit(t *testing.T) { d := 3 for i := 0; i < bucketIPLimit+1; i++ { - n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)}) + n := nodeAtDistance(tab.Self().ID(), d, net.IP{172, 0, 1, byte(i)}) tab.addSeenNode(n) } if tab.len() > bucketIPLimit { @@ -264,7 +264,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) { for i := 0; i < len(buf); i++ { ld := cfg.Rand.Intn(len(tab.buckets)) - fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))}) + fillTable(tab, []*node{nodeAtDistance(tab.Self().ID(), ld, intIP(ld))}) } gotN := tab.ReadRandomNodes(buf) if gotN != tab.len() { @@ -312,8 +312,8 @@ func TestTable_addVerifiedNode(t *testing.T) { defer tab.Close() // Insert two nodes. - n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) - n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addSeenNode(n1) tab.addSeenNode(n2) @@ -344,8 +344,8 @@ func TestTable_addSeenNode(t *testing.T) { defer tab.Close() // Insert two nodes. - n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) - n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addSeenNode(n1) tab.addSeenNode(n2) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 20238aabc..7e149b22c 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -75,10 +75,10 @@ func intIP(i int) net.IP { // fillBucket inserts nodes into the given bucket until it is full. func fillBucket(tab *Table, n *node) (last *node) { - ld := enode.LogDist(tab.self().ID(), n.ID()) + ld := enode.LogDist(tab.Self().ID(), n.ID()) b := tab.bucket(n.ID()) for len(b.entries) < bucketSize { - b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld))) + b.entries = append(b.entries, nodeAtDistance(tab.Self().ID(), ld, intIP(ld))) } return b.entries[bucketSize-1] } diff --git a/p2p/peer.go b/p2p/peer.go index 3c75d7dd5..2c357fdc9 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -434,7 +434,6 @@ type PeerInfo struct { RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection Inbound bool `json:"inbound"` Trusted bool `json:"trusted"` - Notary bool `json:"notary"` Static bool `json:"static"` } `json:"network"` Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields @@ -459,7 +458,6 @@ func (p *Peer) Info() *PeerInfo { info.Network.RemoteAddress = p.RemoteAddr().String() info.Network.Inbound = p.rw.is(inboundConn) info.Network.Trusted = p.rw.is(trustedConn) - info.Network.Notary = p.rw.is(notaryConn) info.Network.Static = p.rw.is(staticDialedConn) // Gather all the running protocol infos diff --git a/p2p/server.go b/p2p/server.go index c3b3a00d4..a58673342 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -33,13 +33,13 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/discv5" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/p2p/nat" "github.com/dexon-foundation/dexon/p2p/netutil" "github.com/dexon-foundation/dexon/rlp" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -178,10 +178,12 @@ type Server struct { quit chan struct{} addstatic chan *enode.Node removestatic chan *enode.Node + adddirect chan *enode.Node + removedirect chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node - addnotary chan *enode.Node - removenotary chan *enode.Node + addgroup chan *dialGroup + removegroup chan *dialGroup posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -203,9 +205,10 @@ type connFlag int32 const ( dynDialedConn connFlag = 1 << iota staticDialedConn + directDialedConn + groupDialedConn inboundConn trustedConn - notaryConn ) // conn wraps a network connection with information gathered @@ -254,12 +257,15 @@ func (f connFlag) String() string { if f&staticDialedConn != 0 { s += "-staticdial" } + if f&directDialedConn != 0 { + s += "-directdial" + } + if f&groupDialedConn != 0 { + s += "-groupdial" + } if f&inboundConn != 0 { s += "-inbound" } - if f¬aryConn != 0 { - s += "-notary" - } if s != "" { s = s[1:] } @@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) { } } -// AddTrustedPeer adds the given node to a reserved whitelist which allows the -// node to always connect, even if the slot are full. -func (srv *Server) AddTrustedPeer(node *enode.Node) { +// AddDirectPeer connects to the given node and maintains the connection until the +// server is shut down. If the connection fails for any reason, the server will +// attempt to reconnect the peer. +func (srv *Server) AddDirectPeer(node *enode.Node) { select { - case srv.addtrusted <- node: + case srv.adddirect <- node: case <-srv.quit: } } -// RemoveTrustedPeer removes the given node from the trusted peer set. -func (srv *Server) RemoveTrustedPeer(node *enode.Node) { +// RemoveDirectPeer disconnects from the given node +func (srv *Server) RemoveDirectPeer(node *enode.Node) { select { - case srv.removetrusted <- node: + case srv.removedirect <- node: case <-srv.quit: } } -// AddNotaryPeer connects to the given node and maintains the connection until the -// server is shut down. If the connection fails for any reason, the server will -// attempt to reconnect the peer. -// AddNotaryPeer also adds the given node to a reserved whitelist which allows the +func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) { + m := map[enode.ID]*enode.Node{} + for _, node := range nodes { + m[node.ID()] = node + } + g := &dialGroup{name: name, nodes: m, num: num} + select { + case srv.addgroup <- g: + case <-srv.quit: + } +} + +func (srv *Server) RemoveGroup(name string) { + g := &dialGroup{name: name} + select { + case srv.removegroup <- g: + case <-srv.quit: + } +} + +// AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. -func (srv *Server) AddNotaryPeer(node *discover.Node) { +func (srv *Server) AddTrustedPeer(node *enode.Node) { select { - case srv.addnotary <- node: + case srv.addtrusted <- node: case <-srv.quit: } } -// RemoveNotaryPeer disconnects from the given node. -// RemoveNotaryPeer also removes the given node from the notary peer set. -func (srv *Server) RemoveNotaryPeer(node *discover.Node) { +// RemoveTrustedPeer removes the given node from the trusted peer set. +func (srv *Server) RemoveTrustedPeer(node *enode.Node) { select { - case srv.removenotary <- node: + case srv.removetrusted <- node: case <-srv.quit: } } @@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node { return ln.Node() } +func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node { + // If the node is running but discovery is off, manually assemble the node infos. + if ntab == nil { + addr := srv.tcpAddr(listener) + return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0) + } + // Otherwise return the discovery node. + return ntab.Self() +} + +func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr { + addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}} + if listener == nil { + return addr // Inbound connections disabled, use zero address. + } + // Otherwise inject the listener address too. + if a, ok := listener.Addr().(*net.TCPAddr); ok { + addr = *a + } + if srv.NAT != nil { + if ip, err := srv.NAT.ExternalIP(); err == nil { + addr.IP = ip + } + } + if addr.IP.IsUnspecified() { + addr.IP = net.IP{127, 0, 0, 1} + } + return addr +} + // Stop terminates the server and all active peer connections. // It blocks until all active connections have been closed. func (srv *Server) Stop() { @@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) { srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *enode.Node) srv.removestatic = make(chan *enode.Node) + srv.adddirect = make(chan *enode.Node) + srv.removedirect = make(chan *enode.Node) + srv.addgroup = make(chan *dialGroup) + srv.removegroup = make(chan *dialGroup) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) - srv.addnotary = make(chan *enode.Node) - srv.removenotary = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -628,6 +683,10 @@ type dialer interface { taskDone(task, time.Time) addStatic(*enode.Node) removeStatic(*enode.Node) + addDirect(*enode.Node) + removeDirect(*enode.Node) + addGroup(*dialGroup) + removeGroup(*dialGroup) } func (srv *Server) run(dialstate dialer) { @@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) { defer srv.nodedb.Close() var ( - peers = make(map[enode.ID]*Peer) - inboundCount = 0 - trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) - notary = make(map[enode.ID]bool) - taskdone = make(chan task, maxActiveDialTasks) - runningTasks []task - queuedTasks []task // tasks that can't run yet + peers = make(map[enode.ID]*Peer) + inboundCount = 0 + trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + peerflags = make(map[enode.ID]connFlag) + groupRefCount = make(map[enode.ID]int32) + groups = make(map[string]*dialGroup) + taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. @@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) { } } + // remember and maintain the connection flags locally + setConnFlags := func(id enode.ID, f connFlag, val bool) { + if p, ok := peers[id]; ok { + p.rw.set(f, val) + } + if val { + peerflags[id] |= f + } else { + peerflags[id] &= ^f + } + if peerflags[id] == 0 { + delete(peerflags, id) + } + } + + // Put trusted nodes into a map to speed up checks. + // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. + for _, n := range srv.TrustedNodes { + setConnFlags(n.ID(), trustedConn, true) + } + + canDisconnect := func(p *Peer) bool { + f, ok := peerflags[p.ID()] + if ok && f != 0 { + return false + } + return !p.rw.is(dynDialedConn | inboundConn) + } + + removeGroup := func(g *dialGroup) { + if gg, ok := groups[g.name]; ok { + for id := range gg.nodes { + groupRefCount[id]-- + if groupRefCount[id] == 0 { + setConnFlags(id, groupDialedConn, false) + delete(groupRefCount, id) + } + } + } + } + + addGroup := func(g *dialGroup) { + if _, ok := groups[g.name]; ok { + removeGroup(groups[g.name]) + } + for id := range g.nodes { + groupRefCount[id]++ + if groupRefCount[id] > 0 { + setConnFlags(id, groupDialedConn, true) + } + } + groups[g.name] = g + } + running: for { scheduleTasks() @@ -693,63 +808,59 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, true) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, false) dialstate.removeStatic(n) - if p, ok := peers[n.ID()]; ok { + if p, ok := peers[n.ID()]; ok && canDisconnect(p) { p.Disconnect(DiscRequested) } + case n := <-srv.adddirect: + // This channel is used by AddDirectPeer to add to the + // ephemeral direct peer list. Add it to the dialer, + // it will keep the node connected. + srv.log.Trace("Adding direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, true) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, true) + } + dialstate.addDirect(n) + case n := <-srv.removedirect: + // This channel is used by RemoveDirectPeer to send a + // disconnect request to a peer and begin the + // stop keeping the node connected. + srv.log.Trace("Removing direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, false) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, false) + if !p.rw.is(trustedConn | groupDialedConn) { + p.Disconnect(DiscRequested) + } + } + dialstate.removeDirect(n) + case g := <-srv.addgroup: + srv.log.Trace("Adding group", "group", g) + addGroup(g) + dialstate.addGroup(g) + case g := <-srv.removegroup: + srv.log.Trace("Removing group", "group", g) + removeGroup(g) + dialstate.removeGroup(g) case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add an enode // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) - trusted[n.ID()] = true - // Mark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, true) - } + setConnFlags(n.ID(), trustedConn, true) case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - if _, ok := trusted[n.ID()]; ok { - delete(trusted, n.ID()) - } - // Unmark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, false) - } - case n := <-srv.addnotary: - // This channel is used by AddNotaryPeer to add to the - // ephemeral notary peer list. Add it to the dialer, - // it will keep the node connected. - srv.log.Trace("Adding notary node", "node", n) - notary[n.ID] = true - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, true) - } - dialstate.addStatic(n) - case n := <-srv.removenotary: - // This channel is used by RemoveNotaryPeer to send a - // disconnect request to a peer and begin the - // stop keeping the node connected. - srv.log.Trace("Removing notary node", "node", n) - if _, ok := notary[n.ID]; ok { - delete(notary, n.ID) - } - - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, false) - } - - dialstate.removeStatic(n) - if p, ok := peers[n.ID]; ok { - p.Disconnect(DiscRequested) - } + setConnFlags(n.ID(), trustedConn, false) case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -764,15 +875,9 @@ running: case c := <-srv.posthandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). - if trusted[c.node.ID()] { - // Ensure that the trusted flag is set before checking against MaxPeers. - c.flags |= trustedConn + if f, ok := peerflags[c.node.ID()]; ok { + c.flags |= f } - - if notary[c.id] { - c.flags |= notaryConn - } - // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): @@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected diff --git a/p2p/server_test.go b/p2p/server_test.go index b46240722..8bd113791 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,7 +27,6 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "golang.org/x/crypto/sha3" @@ -173,14 +172,10 @@ func TestServerDial(t *testing.T) { } // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags - // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags. // Particularly for race conditions on changing the flag state. if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted prematurely: %v", peer) } - if peer := srv.Peers()[0]; peer.Info().Network.Notary { - t.Errorf("peer is notary prematurely: %v", peer) - } done := make(chan bool) go func() { srv.AddTrustedPeer(node) @@ -192,14 +187,6 @@ func TestServerDial(t *testing.T) { t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) } - srv.AddNotaryPeer(node) - if peer := srv.Peers()[0]; !peer.Info().Network.Notary { - t.Errorf("peer is not notary after AddNotaryPeer: %v", peer) - } - srv.RemoveNotaryPeer(node) - if peer := srv.Peers()[0]; peer.Info().Network.Notary { - t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer) - } done <- true }() // Trigger potential race conditions @@ -216,70 +203,89 @@ func TestServerDial(t *testing.T) { } } -// TestNotaryPeer checks that the node is added to and remove from static when -// AddNotaryPeer and RemoveNotaryPeer is called. -func TestNotaryPeer(t *testing.T) { - var ( - returned = make(chan struct{}) - add, remove = make(chan *discover.Node), make(chan *discover.Node) - tg = taskgen{ - newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { - return []task{} - }, - doneFunc: func(t task) {}, - addFunc: func(n *discover.Node) { - add <- n - }, - removeFunc: func(n *discover.Node) { - remove <- n - }, - } - ) - +func TestServerPeerConnFlag(t *testing.T) { srv := &Server{ - Config: Config{MaxPeers: 10}, - quit: make(chan struct{}), - ntab: fakeTable{}, - addnotary: make(chan *discover.Node), - removenotary: make(chan *discover.Node), - running: true, - log: log.New(), + Config: Config{ + PrivateKey: newkey(), + MaxPeers: 10, + NoDial: true, + }, } - srv.loopWG.Add(1) - go func() { - srv.run(tg) - close(returned) - }() + if err := srv.Start(); err != nil { + t.Fatalf("could not start: %v", err) + } + defer srv.Stop() - notaryID := randomID() - go srv.AddNotaryPeer(&discover.Node{ID: notaryID}) + // inject a peer + key := newkey() + id := enode.PubkeyToIDV4(&key.PublicKey) + node := newNode(id, nil) + fd, _ := net.Pipe() + c := &conn{ + node: node, + fd: fd, + transport: newTestTransport(&key.PublicKey, fd), + flags: inboundConn, + cont: make(chan error), + } + if err := srv.checkpoint(c, srv.addpeer); err != nil { + t.Fatalf("could not add conn: %v", err) + } - select { - case n := <-add: - if n.ID != notaryID { - t.Errorf("node ID mismatched: got %s, want %s", - n.ID.String(), notaryID.String()) - } - case <-time.After(1 * time.Second): - t.Error("add static is not called within one second") + srv.AddTrustedPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn)) } - go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) - select { - case n := <-remove: - if n.ID != notaryID { - t.Errorf("node ID mismatched: got %s, want %s", - n.ID.String(), notaryID.String()) - } - case <-time.After(1 * time.Second): - t.Error("remove static is not called within one second") + srv.AddDirectPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn)) } - srv.Stop() - select { - case <-returned: - case <-time.After(500 * time.Millisecond): - t.Error("Server.run did not return within 500ms") + srv.AddGroup("g1", []*enode.Node{node}, 1) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) + } + + srv.AddGroup("g2", []*enode.Node{node}, 1) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) + } + + srv.RemoveTrustedPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn | directDialedConn)) + } + + srv.RemoveDirectPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn)) + } + + srv.RemoveGroup("g1") + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn)) + } + + srv.RemoveGroup("g2") + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != inboundConn { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, inboundConn) } } @@ -407,9 +413,6 @@ func TestServerManyTasks(t *testing.T) { type taskgen struct { newFunc func(running int, peers map[enode.ID]*Peer) []task doneFunc func(task) - - addFunc func(*discover.Node) - removeFunc func(*discover.Node) } func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task { @@ -418,11 +421,17 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) func (tg taskgen) taskDone(t task, now time.Time) { tg.doneFunc(t) } -func (tg taskgen) addStatic(n *enode.Node) { - tg.addFunc(n) +func (tg taskgen) addStatic(*enode.Node) { +} +func (tg taskgen) removeStatic(*enode.Node) { } -func (tg taskgen) removeStatic(n *enode.Node) { - tg.removeFunc(n) +func (tg taskgen) addDirect(*enode.Node) { +} +func (tg taskgen) removeDirect(*enode.Node) { +} +func (tg taskgen) addGroup(*dialGroup) { +} +func (tg taskgen) removeGroup(*dialGroup) { } type testTask struct { @@ -436,11 +445,10 @@ func (t *testTask) Do(srv *Server) { // This test checks that connections are disconnected // just after the encryption handshake when the server is -// at capacity. Trusted and Notary connections should still be accepted. +// at capacity. Trusted connections should still be accepted. func TestServerAtCap(t *testing.T) { trustedNode := newkey() trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey) - notaryID := randomID() srv := &Server{ Config: Config{ PrivateKey: newkey(), @@ -453,7 +461,6 @@ func TestServerAtCap(t *testing.T) { t.Fatalf("could not start: %v", err) } defer srv.Stop() - srv.AddNotaryPeer(&discover.Node{ID: notaryID}) newconn := func(id enode.ID) *conn { fd, _ := net.Pipe() @@ -484,15 +491,6 @@ func TestServerAtCap(t *testing.T) { t.Error("Server did not set trusted flag") } - // Try inserting a notary connection. - c = newconn(notaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != nil { - t.Error("unexpected error for notary conn @posthandshake:", err) - } - if !c.is(notaryConn) { - t.Error("Server did not set notary flag") - } - // Remove from trusted set and try again srv.RemoveTrustedPeer(newNode(trustedID, nil)) c = newconn(trustedID) @@ -509,24 +507,6 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } - - // Remove from notary set and try again - srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) - c = newconn(notaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { - t.Error("wrong error for insert:", err) - } - - // Add anotherID to notary set and try again - anotherNotaryID := randomID() - srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID}) - c = newconn(anotherNotaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != nil { - t.Error("unexpected error for notary conn @posthandshake:", err) - } - if !c.is(notaryConn) { - t.Error("Server did not set notary flag") - } } func TestServerPeerLimits(t *testing.T) { |