aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/dial.go86
-rw-r--r--p2p/dial_test.go217
-rw-r--r--p2p/discover/table.go12
-rw-r--r--p2p/discover/table_test.go14
-rw-r--r--p2p/discover/table_util_test.go4
-rw-r--r--p2p/peer.go2
-rw-r--r--p2p/server.go271
-rw-r--r--p2p/server_test.go192
8 files changed, 591 insertions, 207 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index 9b24ed96a..909bed863 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -64,6 +64,12 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) {
return t.Dialer.Dial("tcp", addr.String())
}
+type dialGroup struct {
+ name string
+ nodes map[enode.ID]*enode.Node
+ num uint64
+}
+
// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
@@ -78,6 +84,8 @@ type dialstate struct {
lookupBuf []*enode.Node // current discovery lookup results
randomNodes []*enode.Node // filled from Table
static map[enode.ID]*dialTask
+ direct map[enode.ID]*dialTask
+ group map[string]*dialGroup
hist *dialHistory
start time.Time // time when the dialer was first used
@@ -85,6 +93,7 @@ type dialstate struct {
}
type discoverTable interface {
+ Self() *enode.Node
Close()
Resolve(*enode.Node) *enode.Node
LookupRandom() []*enode.Node
@@ -133,6 +142,8 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node,
self: self,
netrestrict: netrestrict,
static: make(map[enode.ID]*dialTask),
+ direct: make(map[enode.ID]*dialTask),
+ group: make(map[string]*dialGroup),
dialing: make(map[enode.ID]connFlag),
bootnodes: make([]*enode.Node, len(bootnodes)),
randomNodes: make([]*enode.Node, maxdyn/2),
@@ -159,6 +170,23 @@ func (s *dialstate) removeStatic(n *enode.Node) {
s.hist.remove(n.ID())
}
+func (s *dialstate) addDirect(n *enode.Node) {
+ s.direct[n.ID()] = &dialTask{flags: directDialedConn, dest: n}
+}
+
+func (s *dialstate) removeDirect(n *enode.Node) {
+ delete(s.direct, n.ID())
+ s.hist.remove(n.ID())
+}
+
+func (s *dialstate) addGroup(g *dialGroup) {
+ s.group[g.name] = g
+}
+
+func (s *dialstate) removeGroup(g *dialGroup) {
+ delete(s.group, g.name)
+}
+
func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task {
if s.start.IsZero() {
s.start = now
@@ -196,13 +224,69 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti
err := s.checkDial(t.dest, peers)
switch err {
case errNotWhitelisted, errSelf:
- log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
+ log.Warn("Removing static dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
delete(s.static, t.dest.ID())
case nil:
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
}
+
+ for id, t := range s.direct {
+ err := s.checkDial(t.dest, peers)
+ switch err {
+ case errNotWhitelisted, errSelf:
+ log.Warn("Removing direct dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
+ delete(s.direct, t.dest.ID())
+ case nil:
+ s.dialing[id] = t.flags
+ newtasks = append(newtasks, t)
+ }
+ }
+
+ // compute connected
+ connected := map[string]map[enode.ID]struct{}{}
+ for _, g := range s.group {
+ connected[g.name] = map[enode.ID]struct{}{}
+ }
+
+ for id := range peers {
+ for _, g := range s.group {
+ if _, ok := g.nodes[id]; ok {
+ connected[g.name][id] = struct{}{}
+ }
+ }
+ }
+
+ for id := range s.dialing {
+ for _, g := range s.group {
+ if _, ok := g.nodes[id]; ok {
+ connected[g.name][id] = struct{}{}
+ }
+ }
+ }
+
+ groupNodes := map[enode.ID]*enode.Node{}
+ for _, g := range s.group {
+ for _, n := range g.nodes {
+ if uint64(len(connected[g.name])) >= g.num {
+ break
+ }
+ err := s.checkDial(n, peers)
+ switch err {
+ case errNotWhitelisted, errSelf:
+ log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err)
+ delete(g.nodes, n.ID())
+ case nil:
+ groupNodes[n.ID()] = n
+ connected[g.name][n.ID()] = struct{}{}
+ }
+ }
+ }
+ for _, n := range groupNodes {
+ addDial(groupDialedConn, n)
+ }
+
// If we don't have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
diff --git a/p2p/dial_test.go b/p2p/dial_test.go
index 3805ff690..f9122de6f 100644
--- a/p2p/dial_test.go
+++ b/p2p/dial_test.go
@@ -525,6 +525,223 @@ func TestDialStateStaticDial(t *testing.T) {
})
}
+func TestDialStateDirectDial(t *testing.T) {
+ wantDirect := []*enode.Node{
+ newNode(uintID(1), nil),
+ newNode(uintID(2), nil),
+ newNode(uintID(3), nil),
+ newNode(uintID(4), nil),
+ newNode(uintID(5), nil),
+ }
+ init := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil)
+ for _, node := range wantDirect {
+ init.addDirect(node)
+ }
+
+ runDialTest(t, dialtest{
+ init: init,
+ rounds: []round{
+ // Direct dials are launched for the nodes that
+ // aren't yet connected.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ },
+ new: []task{
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(3), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)},
+ },
+ },
+ // No new tasks are launched in this round because all direct
+ // nodes are either connected or still being dialed.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}},
+ },
+ done: []task{
+ &dialTask{flags: staticDialedConn, dest: newNode(uintID(3), nil)},
+ },
+ },
+ // No new dial tasks are launched because all direct
+ // nodes are now connected.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ done: []task{
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)},
+ },
+ new: []task{
+ &waitExpireTask{Duration: 14 * time.Second},
+ },
+ },
+ // Wait a round for dial history to expire, no new tasks should spawn.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ },
+ // If a direct node is dropped, it should be immediately redialed,
+ // irrespective whether it was originally static or dynamic.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ new: []task{
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(2), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)},
+ },
+ },
+ },
+ })
+}
+
+func TestDialStateGroupDial(t *testing.T) {
+ groups := []*dialGroup{
+ &dialGroup{
+ name: "g1",
+ nodes: map[enode.ID]*enode.Node{
+ uintID(1): newNode(uintID(1), nil),
+ uintID(2): newNode(uintID(2), nil),
+ },
+ num: 2,
+ },
+ &dialGroup{
+ name: "g2",
+ nodes: map[enode.ID]*enode.Node{
+ uintID(2): newNode(uintID(2), nil),
+ uintID(3): newNode(uintID(3), nil),
+ uintID(4): newNode(uintID(4), nil),
+ uintID(5): newNode(uintID(5), nil),
+ uintID(6): newNode(uintID(6), nil),
+ },
+ num: 2,
+ },
+ }
+
+ type groupTest struct {
+ peers []*Peer
+ dialing map[enode.ID]connFlag
+ ceiling map[string]uint64
+ }
+
+ tests := []groupTest{
+ {
+ peers: nil,
+ dialing: map[enode.ID]connFlag{},
+ ceiling: map[string]uint64{"g1": 2, "g2": 4},
+ },
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}},
+ },
+ dialing: map[enode.ID]connFlag{
+ uintID(1): staticDialedConn,
+ },
+ ceiling: map[string]uint64{"g1": 2, "g2": 2},
+ },
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ dialing: map[enode.ID]connFlag{
+ uintID(2): staticDialedConn,
+ },
+ ceiling: map[string]uint64{"g1": 2, "g2": 4},
+ },
+ {
+ peers: nil,
+ dialing: map[enode.ID]connFlag{
+ uintID(1): staticDialedConn,
+ uintID(2): staticDialedConn,
+ uintID(3): staticDialedConn,
+ },
+ ceiling: map[string]uint64{"g1": 2, "g2": 4},
+ },
+ }
+
+ pm := func(ps []*Peer) map[enode.ID]*Peer {
+ m := make(map[enode.ID]*Peer)
+ for _, p := range ps {
+ m[p.rw.node.ID()] = p
+ }
+ return m
+ }
+
+ run := func(i int, tt groupTest) {
+ d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil)
+ d.dialing = make(map[enode.ID]connFlag)
+ for k, v := range tt.dialing {
+ d.dialing[k] = v
+ }
+
+ for _, g := range groups {
+ d.addGroup(g)
+ }
+ peermap := pm(tt.peers)
+ new := d.newTasks(len(tt.dialing), peermap, time.Now())
+
+ cnt := map[string]uint64{}
+ for id := range peermap {
+ for _, g := range groups {
+ if _, ok := g.nodes[id]; ok {
+ cnt[g.name]++
+ }
+ }
+ }
+
+ for id := range tt.dialing {
+ for _, g := range groups {
+ if _, ok := g.nodes[id]; ok {
+ cnt[g.name]++
+ }
+ }
+ }
+
+ for _, task := range new {
+ id := task.(*dialTask).dest.ID()
+ for _, g := range groups {
+ if _, ok := g.nodes[id]; ok {
+ cnt[g.name]++
+ }
+ }
+ }
+
+ for _, g := range groups {
+ if cnt[g.name] < g.num {
+ t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)",
+ i, g.name, cnt[g.name], g.num)
+ }
+ if cnt[g.name] > tt.ceiling[g.name] {
+ t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)",
+ i, g.name, cnt[g.name], tt.ceiling[g.name])
+ }
+ }
+ }
+
+ for i, tt := range tests {
+ run(i, tt)
+ }
+}
+
// This test checks that static peers will be redialed immediately if they were re-added to a static list.
func TestDialStaticAfterReset(t *testing.T) {
wantStatic := []*enode.Node{
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 25ea7b0b2..e8a219871 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -127,7 +127,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error
return tab, nil
}
-func (tab *Table) self() *enode.Node {
+func (tab *Table) Self() *enode.Node {
return tab.net.self()
}
@@ -258,7 +258,7 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
- asked[tab.self().ID()] = true
+ asked[tab.Self().ID()] = true
for {
tab.mutex.Lock()
@@ -419,7 +419,7 @@ func (tab *Table) doRefresh(done chan struct{}) {
// Run self lookup to discover new neighbor nodes.
// We can only do this if we have a secp256k1 identity.
var key ecdsa.PublicKey
- if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil {
+ if err := tab.Self().Load((*enode.Secp256k1)(&key)); err == nil {
tab.lookup(encodePubkey(&key), false)
}
@@ -544,7 +544,7 @@ func (tab *Table) len() (n int) {
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(id enode.ID) *bucket {
- d := enode.LogDist(tab.self().ID(), id)
+ d := enode.LogDist(tab.Self().ID(), id)
if d <= bucketMinDistance {
return tab.buckets[0]
}
@@ -557,7 +557,7 @@ func (tab *Table) bucket(id enode.ID) *bucket {
//
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
- if n.ID() == tab.self().ID() {
+ if n.ID() == tab.Self().ID() {
return
}
@@ -599,7 +599,7 @@ func (tab *Table) addVerifiedNode(n *node) {
if !tab.isInitDone() {
return
}
- if n.ID() == tab.self().ID() {
+ if n.ID() == tab.Self().ID() {
return
}
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index 2321e743f..8947a074e 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -147,7 +147,7 @@ func TestTable_IPLimit(t *testing.T) {
defer tab.Close()
for i := 0; i < tableIPLimit+1; i++ {
- n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
+ n := nodeAtDistance(tab.Self().ID(), i, net.IP{172, 0, 1, byte(i)})
tab.addSeenNode(n)
}
if tab.len() > tableIPLimit {
@@ -165,7 +165,7 @@ func TestTable_BucketIPLimit(t *testing.T) {
d := 3
for i := 0; i < bucketIPLimit+1; i++ {
- n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)})
+ n := nodeAtDistance(tab.Self().ID(), d, net.IP{172, 0, 1, byte(i)})
tab.addSeenNode(n)
}
if tab.len() > bucketIPLimit {
@@ -264,7 +264,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
- fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
+ fillTable(tab, []*node{nodeAtDistance(tab.Self().ID(), ld, intIP(ld))})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
@@ -312,8 +312,8 @@ func TestTable_addVerifiedNode(t *testing.T) {
defer tab.Close()
// Insert two nodes.
- n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
- n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
+ n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1})
+ n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2})
tab.addSeenNode(n1)
tab.addSeenNode(n2)
@@ -344,8 +344,8 @@ func TestTable_addSeenNode(t *testing.T) {
defer tab.Close()
// Insert two nodes.
- n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
- n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
+ n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1})
+ n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2})
tab.addSeenNode(n1)
tab.addSeenNode(n2)
diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go
index 20238aabc..7e149b22c 100644
--- a/p2p/discover/table_util_test.go
+++ b/p2p/discover/table_util_test.go
@@ -75,10 +75,10 @@ func intIP(i int) net.IP {
// fillBucket inserts nodes into the given bucket until it is full.
func fillBucket(tab *Table, n *node) (last *node) {
- ld := enode.LogDist(tab.self().ID(), n.ID())
+ ld := enode.LogDist(tab.Self().ID(), n.ID())
b := tab.bucket(n.ID())
for len(b.entries) < bucketSize {
- b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld)))
+ b.entries = append(b.entries, nodeAtDistance(tab.Self().ID(), ld, intIP(ld)))
}
return b.entries[bucketSize-1]
}
diff --git a/p2p/peer.go b/p2p/peer.go
index 3c75d7dd5..2c357fdc9 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -434,7 +434,6 @@ type PeerInfo struct {
RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection
Inbound bool `json:"inbound"`
Trusted bool `json:"trusted"`
- Notary bool `json:"notary"`
Static bool `json:"static"`
} `json:"network"`
Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields
@@ -459,7 +458,6 @@ func (p *Peer) Info() *PeerInfo {
info.Network.RemoteAddress = p.RemoteAddr().String()
info.Network.Inbound = p.rw.is(inboundConn)
info.Network.Trusted = p.rw.is(trustedConn)
- info.Network.Notary = p.rw.is(notaryConn)
info.Network.Static = p.rw.is(staticDialedConn)
// Gather all the running protocol infos
diff --git a/p2p/server.go b/p2p/server.go
index c3b3a00d4..a58673342 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -33,13 +33,13 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/discv5"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/p2p/nat"
"github.com/dexon-foundation/dexon/p2p/netutil"
"github.com/dexon-foundation/dexon/rlp"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
const (
@@ -178,10 +178,12 @@ type Server struct {
quit chan struct{}
addstatic chan *enode.Node
removestatic chan *enode.Node
+ adddirect chan *enode.Node
+ removedirect chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
- addnotary chan *enode.Node
- removenotary chan *enode.Node
+ addgroup chan *dialGroup
+ removegroup chan *dialGroup
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -203,9 +205,10 @@ type connFlag int32
const (
dynDialedConn connFlag = 1 << iota
staticDialedConn
+ directDialedConn
+ groupDialedConn
inboundConn
trustedConn
- notaryConn
)
// conn wraps a network connection with information gathered
@@ -254,12 +257,15 @@ func (f connFlag) String() string {
if f&staticDialedConn != 0 {
s += "-staticdial"
}
+ if f&directDialedConn != 0 {
+ s += "-directdial"
+ }
+ if f&groupDialedConn != 0 {
+ s += "-groupdial"
+ }
if f&inboundConn != 0 {
s += "-inbound"
}
- if f&notaryConn != 0 {
- s += "-notary"
- }
if s != "" {
s = s[1:]
}
@@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) {
}
}
-// AddTrustedPeer adds the given node to a reserved whitelist which allows the
-// node to always connect, even if the slot are full.
-func (srv *Server) AddTrustedPeer(node *enode.Node) {
+// AddDirectPeer connects to the given node and maintains the connection until the
+// server is shut down. If the connection fails for any reason, the server will
+// attempt to reconnect the peer.
+func (srv *Server) AddDirectPeer(node *enode.Node) {
select {
- case srv.addtrusted <- node:
+ case srv.adddirect <- node:
case <-srv.quit:
}
}
-// RemoveTrustedPeer removes the given node from the trusted peer set.
-func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
+// RemoveDirectPeer disconnects from the given node
+func (srv *Server) RemoveDirectPeer(node *enode.Node) {
select {
- case srv.removetrusted <- node:
+ case srv.removedirect <- node:
case <-srv.quit:
}
}
-// AddNotaryPeer connects to the given node and maintains the connection until the
-// server is shut down. If the connection fails for any reason, the server will
-// attempt to reconnect the peer.
-// AddNotaryPeer also adds the given node to a reserved whitelist which allows the
+func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) {
+ m := map[enode.ID]*enode.Node{}
+ for _, node := range nodes {
+ m[node.ID()] = node
+ }
+ g := &dialGroup{name: name, nodes: m, num: num}
+ select {
+ case srv.addgroup <- g:
+ case <-srv.quit:
+ }
+}
+
+func (srv *Server) RemoveGroup(name string) {
+ g := &dialGroup{name: name}
+ select {
+ case srv.removegroup <- g:
+ case <-srv.quit:
+ }
+}
+
+// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
-func (srv *Server) AddNotaryPeer(node *discover.Node) {
+func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
- case srv.addnotary <- node:
+ case srv.addtrusted <- node:
case <-srv.quit:
}
}
-// RemoveNotaryPeer disconnects from the given node.
-// RemoveNotaryPeer also removes the given node from the notary peer set.
-func (srv *Server) RemoveNotaryPeer(node *discover.Node) {
+// RemoveTrustedPeer removes the given node from the trusted peer set.
+func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
- case srv.removenotary <- node:
+ case srv.removetrusted <- node:
case <-srv.quit:
}
}
@@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node {
return ln.Node()
}
+func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node {
+ // If the node is running but discovery is off, manually assemble the node infos.
+ if ntab == nil {
+ addr := srv.tcpAddr(listener)
+ return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0)
+ }
+ // Otherwise return the discovery node.
+ return ntab.Self()
+}
+
+func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr {
+ addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}}
+ if listener == nil {
+ return addr // Inbound connections disabled, use zero address.
+ }
+ // Otherwise inject the listener address too.
+ if a, ok := listener.Addr().(*net.TCPAddr); ok {
+ addr = *a
+ }
+ if srv.NAT != nil {
+ if ip, err := srv.NAT.ExternalIP(); err == nil {
+ addr.IP = ip
+ }
+ }
+ if addr.IP.IsUnspecified() {
+ addr.IP = net.IP{127, 0, 0, 1}
+ }
+ return addr
+}
+
// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
@@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)
srv.removestatic = make(chan *enode.Node)
+ srv.adddirect = make(chan *enode.Node)
+ srv.removedirect = make(chan *enode.Node)
+ srv.addgroup = make(chan *dialGroup)
+ srv.removegroup = make(chan *dialGroup)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
- srv.addnotary = make(chan *enode.Node)
- srv.removenotary = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -628,6 +683,10 @@ type dialer interface {
taskDone(task, time.Time)
addStatic(*enode.Node)
removeStatic(*enode.Node)
+ addDirect(*enode.Node)
+ removeDirect(*enode.Node)
+ addGroup(*dialGroup)
+ removeGroup(*dialGroup)
}
func (srv *Server) run(dialstate dialer) {
@@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) {
defer srv.nodedb.Close()
var (
- peers = make(map[enode.ID]*Peer)
- inboundCount = 0
- trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
- notary = make(map[enode.ID]bool)
- taskdone = make(chan task, maxActiveDialTasks)
- runningTasks []task
- queuedTasks []task // tasks that can't run yet
+ peers = make(map[enode.ID]*Peer)
+ inboundCount = 0
+ trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ peerflags = make(map[enode.ID]connFlag)
+ groupRefCount = make(map[enode.ID]int32)
+ groups = make(map[string]*dialGroup)
+ taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
@@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) {
}
}
+ // remember and maintain the connection flags locally
+ setConnFlags := func(id enode.ID, f connFlag, val bool) {
+ if p, ok := peers[id]; ok {
+ p.rw.set(f, val)
+ }
+ if val {
+ peerflags[id] |= f
+ } else {
+ peerflags[id] &= ^f
+ }
+ if peerflags[id] == 0 {
+ delete(peerflags, id)
+ }
+ }
+
+ // Put trusted nodes into a map to speed up checks.
+ // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
+ for _, n := range srv.TrustedNodes {
+ setConnFlags(n.ID(), trustedConn, true)
+ }
+
+ canDisconnect := func(p *Peer) bool {
+ f, ok := peerflags[p.ID()]
+ if ok && f != 0 {
+ return false
+ }
+ return !p.rw.is(dynDialedConn | inboundConn)
+ }
+
+ removeGroup := func(g *dialGroup) {
+ if gg, ok := groups[g.name]; ok {
+ for id := range gg.nodes {
+ groupRefCount[id]--
+ if groupRefCount[id] == 0 {
+ setConnFlags(id, groupDialedConn, false)
+ delete(groupRefCount, id)
+ }
+ }
+ }
+ }
+
+ addGroup := func(g *dialGroup) {
+ if _, ok := groups[g.name]; ok {
+ removeGroup(groups[g.name])
+ }
+ for id := range g.nodes {
+ groupRefCount[id]++
+ if groupRefCount[id] > 0 {
+ setConnFlags(id, groupDialedConn, true)
+ }
+ }
+ groups[g.name] = g
+ }
+
running:
for {
scheduleTasks()
@@ -693,63 +808,59 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, true)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, false)
dialstate.removeStatic(n)
- if p, ok := peers[n.ID()]; ok {
+ if p, ok := peers[n.ID()]; ok && canDisconnect(p) {
p.Disconnect(DiscRequested)
}
+ case n := <-srv.adddirect:
+ // This channel is used by AddDirectPeer to add to the
+ // ephemeral direct peer list. Add it to the dialer,
+ // it will keep the node connected.
+ srv.log.Trace("Adding direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, true)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, true)
+ }
+ dialstate.addDirect(n)
+ case n := <-srv.removedirect:
+ // This channel is used by RemoveDirectPeer to send a
+ // disconnect request to a peer and begin the
+ // stop keeping the node connected.
+ srv.log.Trace("Removing direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, false)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, false)
+ if !p.rw.is(trustedConn | groupDialedConn) {
+ p.Disconnect(DiscRequested)
+ }
+ }
+ dialstate.removeDirect(n)
+ case g := <-srv.addgroup:
+ srv.log.Trace("Adding group", "group", g)
+ addGroup(g)
+ dialstate.addGroup(g)
+ case g := <-srv.removegroup:
+ srv.log.Trace("Removing group", "group", g)
+ removeGroup(g)
+ dialstate.removeGroup(g)
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
- trusted[n.ID()] = true
- // Mark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, true)
- }
+ setConnFlags(n.ID(), trustedConn, true)
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
- if _, ok := trusted[n.ID()]; ok {
- delete(trusted, n.ID())
- }
- // Unmark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, false)
- }
- case n := <-srv.addnotary:
- // This channel is used by AddNotaryPeer to add to the
- // ephemeral notary peer list. Add it to the dialer,
- // it will keep the node connected.
- srv.log.Trace("Adding notary node", "node", n)
- notary[n.ID] = true
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, true)
- }
- dialstate.addStatic(n)
- case n := <-srv.removenotary:
- // This channel is used by RemoveNotaryPeer to send a
- // disconnect request to a peer and begin the
- // stop keeping the node connected.
- srv.log.Trace("Removing notary node", "node", n)
- if _, ok := notary[n.ID]; ok {
- delete(notary, n.ID)
- }
-
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, false)
- }
-
- dialstate.removeStatic(n)
- if p, ok := peers[n.ID]; ok {
- p.Disconnect(DiscRequested)
- }
+ setConnFlags(n.ID(), trustedConn, false)
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -764,15 +875,9 @@ running:
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
- if trusted[c.node.ID()] {
- // Ensure that the trusted flag is set before checking against MaxPeers.
- c.flags |= trustedConn
+ if f, ok := peerflags[c.node.ID()]; ok {
+ c.flags |= f
}
-
- if notary[c.id] {
- c.flags |= notaryConn
- }
-
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
@@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
diff --git a/p2p/server_test.go b/p2p/server_test.go
index b46240722..8bd113791 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -27,7 +27,6 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/p2p/enr"
"golang.org/x/crypto/sha3"
@@ -173,14 +172,10 @@ func TestServerDial(t *testing.T) {
}
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
- // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags.
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
- if peer := srv.Peers()[0]; peer.Info().Network.Notary {
- t.Errorf("peer is notary prematurely: %v", peer)
- }
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
@@ -192,14 +187,6 @@ func TestServerDial(t *testing.T) {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
- srv.AddNotaryPeer(node)
- if peer := srv.Peers()[0]; !peer.Info().Network.Notary {
- t.Errorf("peer is not notary after AddNotaryPeer: %v", peer)
- }
- srv.RemoveNotaryPeer(node)
- if peer := srv.Peers()[0]; peer.Info().Network.Notary {
- t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer)
- }
done <- true
}()
// Trigger potential race conditions
@@ -216,70 +203,89 @@ func TestServerDial(t *testing.T) {
}
}
-// TestNotaryPeer checks that the node is added to and remove from static when
-// AddNotaryPeer and RemoveNotaryPeer is called.
-func TestNotaryPeer(t *testing.T) {
- var (
- returned = make(chan struct{})
- add, remove = make(chan *discover.Node), make(chan *discover.Node)
- tg = taskgen{
- newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
- return []task{}
- },
- doneFunc: func(t task) {},
- addFunc: func(n *discover.Node) {
- add <- n
- },
- removeFunc: func(n *discover.Node) {
- remove <- n
- },
- }
- )
-
+func TestServerPeerConnFlag(t *testing.T) {
srv := &Server{
- Config: Config{MaxPeers: 10},
- quit: make(chan struct{}),
- ntab: fakeTable{},
- addnotary: make(chan *discover.Node),
- removenotary: make(chan *discover.Node),
- running: true,
- log: log.New(),
+ Config: Config{
+ PrivateKey: newkey(),
+ MaxPeers: 10,
+ NoDial: true,
+ },
}
- srv.loopWG.Add(1)
- go func() {
- srv.run(tg)
- close(returned)
- }()
+ if err := srv.Start(); err != nil {
+ t.Fatalf("could not start: %v", err)
+ }
+ defer srv.Stop()
- notaryID := randomID()
- go srv.AddNotaryPeer(&discover.Node{ID: notaryID})
+ // inject a peer
+ key := newkey()
+ id := enode.PubkeyToIDV4(&key.PublicKey)
+ node := newNode(id, nil)
+ fd, _ := net.Pipe()
+ c := &conn{
+ node: node,
+ fd: fd,
+ transport: newTestTransport(&key.PublicKey, fd),
+ flags: inboundConn,
+ cont: make(chan error),
+ }
+ if err := srv.checkpoint(c, srv.addpeer); err != nil {
+ t.Fatalf("could not add conn: %v", err)
+ }
- select {
- case n := <-add:
- if n.ID != notaryID {
- t.Errorf("node ID mismatched: got %s, want %s",
- n.ID.String(), notaryID.String())
- }
- case <-time.After(1 * time.Second):
- t.Error("add static is not called within one second")
+ srv.AddTrustedPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn))
}
- go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
- select {
- case n := <-remove:
- if n.ID != notaryID {
- t.Errorf("node ID mismatched: got %s, want %s",
- n.ID.String(), notaryID.String())
- }
- case <-time.After(1 * time.Second):
- t.Error("remove static is not called within one second")
+ srv.AddDirectPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn | directDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn | directDialedConn))
}
- srv.Stop()
- select {
- case <-returned:
- case <-time.After(500 * time.Millisecond):
- t.Error("Server.run did not return within 500ms")
+ srv.AddGroup("g1", []*enode.Node{node}, 1)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn))
+ }
+
+ srv.AddGroup("g2", []*enode.Node{node}, 1)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn))
+ }
+
+ srv.RemoveTrustedPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | directDialedConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | directDialedConn | directDialedConn))
+ }
+
+ srv.RemoveDirectPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | directDialedConn))
+ }
+
+ srv.RemoveGroup("g1")
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | directDialedConn))
+ }
+
+ srv.RemoveGroup("g2")
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != inboundConn {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, inboundConn)
}
}
@@ -407,9 +413,6 @@ func TestServerManyTasks(t *testing.T) {
type taskgen struct {
newFunc func(running int, peers map[enode.ID]*Peer) []task
doneFunc func(task)
-
- addFunc func(*discover.Node)
- removeFunc func(*discover.Node)
}
func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task {
@@ -418,11 +421,17 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time)
func (tg taskgen) taskDone(t task, now time.Time) {
tg.doneFunc(t)
}
-func (tg taskgen) addStatic(n *enode.Node) {
- tg.addFunc(n)
+func (tg taskgen) addStatic(*enode.Node) {
+}
+func (tg taskgen) removeStatic(*enode.Node) {
}
-func (tg taskgen) removeStatic(n *enode.Node) {
- tg.removeFunc(n)
+func (tg taskgen) addDirect(*enode.Node) {
+}
+func (tg taskgen) removeDirect(*enode.Node) {
+}
+func (tg taskgen) addGroup(*dialGroup) {
+}
+func (tg taskgen) removeGroup(*dialGroup) {
}
type testTask struct {
@@ -436,11 +445,10 @@ func (t *testTask) Do(srv *Server) {
// This test checks that connections are disconnected
// just after the encryption handshake when the server is
-// at capacity. Trusted and Notary connections should still be accepted.
+// at capacity. Trusted connections should still be accepted.
func TestServerAtCap(t *testing.T) {
trustedNode := newkey()
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
- notaryID := randomID()
srv := &Server{
Config: Config{
PrivateKey: newkey(),
@@ -453,7 +461,6 @@ func TestServerAtCap(t *testing.T) {
t.Fatalf("could not start: %v", err)
}
defer srv.Stop()
- srv.AddNotaryPeer(&discover.Node{ID: notaryID})
newconn := func(id enode.ID) *conn {
fd, _ := net.Pipe()
@@ -484,15 +491,6 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}
- // Try inserting a notary connection.
- c = newconn(notaryID)
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
- t.Error("unexpected error for notary conn @posthandshake:", err)
- }
- if !c.is(notaryConn) {
- t.Error("Server did not set notary flag")
- }
-
// Remove from trusted set and try again
srv.RemoveTrustedPeer(newNode(trustedID, nil))
c = newconn(trustedID)
@@ -509,24 +507,6 @@ func TestServerAtCap(t *testing.T) {
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
-
- // Remove from notary set and try again
- srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
- c = newconn(notaryID)
- if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
- t.Error("wrong error for insert:", err)
- }
-
- // Add anotherID to notary set and try again
- anotherNotaryID := randomID()
- srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID})
- c = newconn(anotherNotaryID)
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
- t.Error("unexpected error for notary conn @posthandshake:", err)
- }
- if !c.is(notaryConn) {
- t.Error("Server did not set notary flag")
- }
}
func TestServerPeerLimits(t *testing.T) {