aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go463
1 files changed, 165 insertions, 298 deletions
diff --git a/dex/peer.go b/dex/peer.go
index 0a67db688..67a59348d 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -37,7 +37,6 @@ import (
"encoding/hex"
"errors"
"fmt"
- "net"
"sync"
"time"
@@ -105,7 +104,8 @@ const (
handshakeTimeout = 5 * time.Second
- groupNodeNum = 3
+ groupConnNum = 3
+ groupConnTimeout = 3 * time.Minute
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
@@ -129,6 +129,17 @@ type peerLabel struct {
round uint64
}
+func (p peerLabel) String() string {
+ var t string
+ switch p.set {
+ case dkgset:
+ t = fmt.Sprintf("DKGSet round: %d", p.round)
+ case notaryset:
+ t = fmt.Sprintf("NotarySet round: %d chain: %d", p.round, p.chainID)
+ }
+ return t
+}
+
type peer struct {
id string
@@ -711,28 +722,27 @@ type peerSet struct {
tab *nodeTable
selfPK string
- srvr p2pServer
- gov governance
- peer2Labels map[string]map[peerLabel]struct{}
- label2Peers map[peerLabel]map[string]struct{}
- history map[uint64]struct{}
- notaryHistory map[uint64]struct{}
- dkgHistory map[uint64]struct{}
+ srvr p2pServer
+ gov governance
+
+ label2Nodes map[peerLabel]map[string]*enode.Node
+ directConn map[peerLabel]struct{}
+ groupConnPeers map[peerLabel]map[string]time.Time
+ allDirectPeers map[string]map[peerLabel]struct{}
}
// newPeerSet creates a new peer set to track the active participants.
func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
- gov: gov,
- srvr: srvr,
- tab: tab,
- selfPK: hex.EncodeToString(crypto.FromECDSAPub(&srvr.GetPrivateKey().PublicKey)),
- peer2Labels: make(map[string]map[peerLabel]struct{}),
- label2Peers: make(map[peerLabel]map[string]struct{}),
- history: make(map[uint64]struct{}),
- notaryHistory: make(map[uint64]struct{}),
- dkgHistory: make(map[uint64]struct{}),
+ peers: make(map[string]*peer),
+ gov: gov,
+ srvr: srvr,
+ tab: tab,
+ selfPK: hex.EncodeToString(crypto.FromECDSAPub(&srvr.GetPrivateKey().PublicKey)),
+ label2Nodes: make(map[peerLabel]map[string]*enode.Node),
+ directConn: make(map[peerLabel]struct{}),
+ groupConnPeers: make(map[peerLabel]map[string]time.Time),
+ allDirectPeers: make(map[string]map[peerLabel]struct{}),
}
}
@@ -832,8 +842,8 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.label2Peers[label]))
- for id := range ps.label2Peers[label] {
+ list := make([]*peer, 0, len(ps.label2Nodes[label]))
+ for id := range ps.label2Nodes[label] {
if p, ok := ps.peers[id]; ok {
list = append(list, p)
}
@@ -845,8 +855,8 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.label2Peers[label]))
- for id := range ps.label2Peers[label] {
+ list := make([]*peer, 0, len(ps.label2Nodes[label]))
+ for id := range ps.label2Nodes[label] {
if p, ok := ps.peers[id]; ok {
if !p.knownVotes.Contains(hash) {
list = append(list, p)
@@ -948,345 +958,178 @@ func (ps *peerSet) Close() {
}
func (ps *peerSet) BuildConnection(round uint64) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
- defer ps.dumpPeerLabel(fmt.Sprintf("BuildConnection: %d", round))
-
- ps.history[round] = struct{}{}
+ dkgLabel := peerLabel{set: dkgset, round: round}
+ if _, ok := ps.label2Nodes[dkgLabel]; !ok {
+ dkgPKs, err := ps.gov.DKGSet(round)
+ if err != nil {
+ log.Error("get DKG set fail", "round", round, "err", err)
+ }
- dkgPKs, err := ps.gov.DKGSet(round)
- if err != nil {
- log.Error("get dkg set fail", "round", round, "err", err)
- }
+ nodes := ps.pksToNodes(dkgPKs)
+ ps.label2Nodes[dkgLabel] = nodes
- // build dkg connection
- _, inDKGSet := dkgPKs[ps.selfPK]
- if inDKGSet {
- delete(dkgPKs, ps.selfPK)
- dkgLabel := peerLabel{set: dkgset, round: round}
- for pk := range dkgPKs {
- ps.addDirectPeer(pk, dkgLabel)
+ if _, exists := nodes[ps.srvr.Self().ID().String()]; exists {
+ ps.buildDirectConn(dkgLabel)
+ } else {
+ ps.buildGroupConn(dkgLabel)
}
}
- var inOneNotarySet bool
- for cid := uint32(0); cid < ps.gov.GetNumChains(round); cid++ {
- notaryPKs, err := ps.gov.NotarySet(round, cid)
- if err != nil {
- log.Error("get notary set fail",
- "round", round, "chain id", cid, "err", err)
- continue
- }
- label := peerLabel{set: notaryset, chainID: cid, round: round}
- // not in notary set, add group
- if _, ok := notaryPKs[ps.selfPK]; !ok {
- var nodes []*enode.Node
- for pk := range notaryPKs {
- node := ps.newNode(pk)
- nodes = append(nodes, node)
- ps.addLabel(node, label)
+ for chainID := uint32(0); chainID < ps.gov.GetNumChains(round); chainID++ {
+ notaryLabel := peerLabel{set: notaryset, chainID: chainID, round: round}
+ if _, ok := ps.label2Nodes[notaryLabel]; !ok {
+ notaryPKs, err := ps.gov.NotarySet(round, chainID)
+ if err != nil {
+ log.Error("get notary set fail",
+ "round", round, "chainID", chainID, "err", err)
+ continue
}
- ps.srvr.AddGroup(notarySetName(cid, round), nodes, groupNodeNum)
- continue
- }
- delete(notaryPKs, ps.selfPK)
- for pk := range notaryPKs {
- ps.addDirectPeer(pk, label)
- }
- inOneNotarySet = true
- }
+ nodes := ps.pksToNodes(notaryPKs)
+ ps.label2Nodes[notaryLabel] = nodes
- // build some connections to DKG nodes
- if !inDKGSet && inOneNotarySet {
- var nodes []*enode.Node
- label := peerLabel{set: dkgset, round: round}
- for pk := range dkgPKs {
- node := ps.newNode(pk)
- nodes = append(nodes, node)
- ps.addLabel(node, label)
+ if _, exists := nodes[ps.srvr.Self().ID().String()]; exists {
+ ps.buildDirectConn(notaryLabel)
+ } else {
+ ps.buildGroupConn(notaryLabel)
+ }
}
- ps.srvr.AddGroup(dkgSetName(round), nodes, groupNodeNum)
}
}
func (ps *peerSet) ForgetConnection(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- defer ps.dumpPeerLabel(fmt.Sprintf("ForgetConnection: %d", round))
- for r := range ps.history {
- if r <= round {
- ps.forgetConnection(round)
- delete(ps.history, r)
+ for label := range ps.directConn {
+ if label.round <= round {
+ ps.forgetDirectConn(label)
}
}
-}
-
-func (ps *peerSet) forgetConnection(round uint64) {
- dkgPKs, err := ps.gov.DKGSet(round)
- if err != nil {
- log.Error("get dkg set fail", "round", round, "err", err)
- }
- _, inDKGSet := dkgPKs[ps.selfPK]
- if inDKGSet {
- delete(dkgPKs, ps.selfPK)
- label := peerLabel{set: dkgset, round: round}
- for id := range dkgPKs {
- ps.removeDirectPeer(id, label)
+ for label := range ps.groupConnPeers {
+ if label.round <= round {
+ ps.forgetGroupConn(label)
}
}
- var inOneNotarySet bool
- for cid := uint32(0); cid < ps.gov.GetNumChains(round); cid++ {
- notaryPKs, err := ps.gov.NotarySet(round, cid)
- if err != nil {
- log.Error("get notary set fail",
- "round", round, "chain id", cid, "err", err)
- continue
+ for label := range ps.label2Nodes {
+ if label.round <= round {
+ delete(ps.label2Nodes, label)
}
-
- label := peerLabel{set: notaryset, chainID: cid, round: round}
-
- // not in notary set, add group
- if _, ok := notaryPKs[ps.selfPK]; !ok {
- var nodes []*enode.Node
- for id := range notaryPKs {
- node := ps.newNode(id)
- nodes = append(nodes, node)
- ps.removeLabel(node, label)
- }
- ps.srvr.RemoveGroup(notarySetName(cid, round))
- continue
- }
-
- delete(notaryPKs, ps.selfPK)
- for pk := range notaryPKs {
- ps.removeDirectPeer(pk, label)
- }
- inOneNotarySet = true
- }
-
- // build some connections to DKG nodes
- if !inDKGSet && inOneNotarySet {
- var nodes []*enode.Node
- label := peerLabel{set: dkgset, round: round}
- for id := range dkgPKs {
- node := ps.newNode(id)
- nodes = append(nodes, node)
- ps.removeLabel(node, label)
- }
- ps.srvr.RemoveGroup(dkgSetName(round))
}
}
-func (ps *peerSet) BuildNotaryConn(round uint64) {
+func (ps *peerSet) EnsureGroupConn() {
ps.lock.Lock()
defer ps.lock.Unlock()
- defer ps.dumpPeerLabel(fmt.Sprintf("BuildNotaryConn: %d", round))
-
- if _, ok := ps.notaryHistory[round]; ok {
- return
- }
-
- ps.notaryHistory[round] = struct{}{}
- for chainID := uint32(0); chainID < ps.gov.GetNumChains(round); chainID++ {
- s, err := ps.gov.NotarySet(round, chainID)
- if err != nil {
- log.Error("get notary set fail",
- "round", round, "chain id", chainID, "err", err)
- continue
- }
-
- // not in notary set, add group
- if _, ok := s[ps.selfPK]; !ok {
- var nodes []*enode.Node
- for id := range s {
- nodes = append(nodes, ps.newNode(id))
+ now := time.Now()
+ for label, peers := range ps.groupConnPeers {
+ // Remove timeout group conn peer.
+ for id, addtime := range peers {
+ if ps.peers[id] == nil && time.Since(addtime) > groupConnTimeout {
+ ps.removeDirectPeer(id, label)
+ delete(ps.groupConnPeers[label], id)
}
- ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum)
- continue
- }
-
- label := peerLabel{
- set: notaryset,
- chainID: chainID,
- round: round,
}
- delete(s, ps.selfPK)
- for pk := range s {
- ps.addDirectPeer(pk, label)
- }
- }
-}
-func (ps *peerSet) dumpPeerLabel(s string) {
- log.Debug(s, "peer num", len(ps.peers))
- for id, labels := range ps.peer2Labels {
- _, ok := ps.peers[id]
- for label := range labels {
- log.Debug(s, "connected", ok, "id", id[:16],
- "round", label.round, "cid", label.chainID, "set", label.set)
+ // Add new group conn peer.
+ for id := range ps.label2Nodes[label] {
+ if len(ps.groupConnPeers[label]) >= groupConnNum {
+ break
+ }
+ ps.groupConnPeers[label][id] = now
+ ps.addDirectPeer(id, label)
}
}
}
-func (ps *peerSet) ForgetNotaryConn(round uint64) {
+func (ps *peerSet) Refresh() {
ps.lock.Lock()
defer ps.lock.Unlock()
- defer ps.dumpPeerLabel(fmt.Sprintf("ForgetNotaryConn: %d", round))
-
- // forget all the rounds before the given round
- for r := range ps.notaryHistory {
- if r <= round {
- ps.forgetNotaryConn(r)
- delete(ps.notaryHistory, r)
+ for id := range ps.allDirectPeers {
+ if ps.peers[id] == nil {
+ if node := ps.tab.GetNode(enode.HexID(id)); node != nil {
+ ps.srvr.AddDirectPeer(node)
+ }
}
}
}
-func (ps *peerSet) forgetNotaryConn(round uint64) {
- for chainID := uint32(0); chainID < ps.gov.GetNumChains(round); chainID++ {
- s, err := ps.gov.NotarySet(round, chainID)
- if err != nil {
- log.Error("get notary set fail",
- "round", round, "chain id", chainID, "err", err)
- continue
- }
- if _, ok := s[ps.selfPK]; !ok {
- ps.srvr.RemoveGroup(notarySetName(chainID, round))
- continue
- }
-
- label := peerLabel{
- set: notaryset,
- chainID: chainID,
- round: round,
- }
- delete(s, ps.selfPK)
- for pk := range s {
- ps.removeDirectPeer(pk, label)
- }
+func (ps *peerSet) buildDirectConn(label peerLabel) {
+ ps.directConn[label] = struct{}{}
+ for id := range ps.label2Nodes[label] {
+ ps.addDirectPeer(id, label)
}
}
-func notarySetName(chainID uint32, round uint64) string {
- return fmt.Sprintf("%d-%d-notaryset", chainID, round)
-}
-
-func dkgSetName(round uint64) string {
- return fmt.Sprintf("%d-dkgset", round)
-}
-
-func (ps *peerSet) BuildDKGConn(round uint64) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
- defer ps.dumpPeerLabel(fmt.Sprintf("BuildDKGConn: %d", round))
- s, err := ps.gov.DKGSet(round)
- if err != nil {
- log.Error("get dkg set fail", "round", round)
- return
- }
-
- if _, ok := s[ps.selfPK]; !ok {
- return
- }
- ps.dkgHistory[round] = struct{}{}
-
- delete(s, ps.selfPK)
- for pk := range s {
- ps.addDirectPeer(pk, peerLabel{
- set: dkgset,
- round: round,
- })
+func (ps *peerSet) forgetDirectConn(label peerLabel) {
+ for id := range ps.label2Nodes[label] {
+ ps.removeDirectPeer(id, label)
}
+ delete(ps.directConn, label)
}
-func (ps *peerSet) ForgetDKGConn(round uint64) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
- defer ps.dumpPeerLabel(fmt.Sprintf("ForgetDKGConn: %d", round))
-
- // forget all the rounds before the given round
- for r := range ps.dkgHistory {
- if r <= round {
- ps.forgetDKGConn(r)
- delete(ps.dkgHistory, r)
+func (ps *peerSet) buildGroupConn(label peerLabel) {
+ peers := make(map[string]time.Time)
+ now := time.Now()
+ for id := range ps.label2Nodes[label] {
+ peers[id] = now
+ ps.addDirectPeer(id, label)
+ if len(peers) >= groupConnNum {
+ break
}
}
+ ps.groupConnPeers[label] = peers
}
-func (ps *peerSet) forgetDKGConn(round uint64) {
- s, err := ps.gov.DKGSet(round)
- if err != nil {
- log.Error("get dkg set fail", "round", round)
- return
+func (ps *peerSet) forgetGroupConn(label peerLabel) {
+ for id := range ps.groupConnPeers[label] {
+ ps.removeDirectPeer(id, label)
}
- if _, ok := s[ps.selfPK]; !ok {
+ delete(ps.groupConnPeers, label)
+}
+
+func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ if len(ps.allDirectPeers[id]) > 0 {
+ ps.allDirectPeers[id][label] = struct{}{}
return
}
+ ps.allDirectPeers[id] = map[peerLabel]struct{}{label: {}}
- delete(s, ps.selfPK)
- label := peerLabel{
- set: dkgset,
- round: round,
+ node := ps.tab.GetNode(enode.HexID(id))
+ if node == nil {
+ node = ps.label2Nodes[label][id]
}
- for pk := range s {
- ps.removeDirectPeer(pk, label)
- }
-}
-
-// make sure the ps.lock is held
-func (ps *peerSet) addDirectPeer(pk string, label peerLabel) {
- node := ps.newNode(pk)
- ps.addLabel(node, label)
ps.srvr.AddDirectPeer(node)
}
-// make sure the ps.lock is held
-func (ps *peerSet) removeDirectPeer(pk string, label peerLabel) {
- node := ps.newNode(pk)
- ps.removeLabel(node, label)
- if len(ps.peer2Labels[node.ID().String()]) == 0 {
- ps.srvr.RemoveDirectPeer(node)
+func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
+ if len(ps.allDirectPeers[id]) == 0 {
+ return
}
-}
-
-// make sure the ps.lock is held
-func (ps *peerSet) addLabel(node *enode.Node, label peerLabel) {
- id := node.ID().String()
- if _, ok := ps.peer2Labels[id]; !ok {
- ps.peer2Labels[id] = make(map[peerLabel]struct{})
+ delete(ps.allDirectPeers[id], label)
+ if len(ps.allDirectPeers[id]) == 0 {
+ ps.srvr.RemoveDirectPeer(ps.label2Nodes[label][id])
+ delete(ps.allDirectPeers, id)
}
- if _, ok := ps.label2Peers[label]; !ok {
- ps.label2Peers[label] = make(map[string]struct{})
- }
- ps.peer2Labels[id][label] = struct{}{}
- ps.label2Peers[label][id] = struct{}{}
}
-// make sure the ps.lock is held
-func (ps *peerSet) removeLabel(node *enode.Node, label peerLabel) {
- id := node.ID().String()
-
- delete(ps.peer2Labels[id], label)
- delete(ps.label2Peers[label], id)
- if len(ps.peer2Labels[id]) == 0 {
- delete(ps.peer2Labels, id)
- }
- if len(ps.label2Peers[label]) == 0 {
- delete(ps.label2Peers, label)
+func (ps *peerSet) pksToNodes(pks map[string]struct{}) map[string]*enode.Node {
+ nodes := map[string]*enode.Node{}
+ for pk := range pks {
+ n := ps.newEmptyNode(pk)
+ if n.ID() == ps.srvr.Self().ID() {
+ n = ps.srvr.Self()
+ }
+ nodes[n.ID().String()] = n
}
+ return nodes
}
-// TODO: improve this by not using pk.
-func (ps *peerSet) newNode(pk string) *enode.Node {
- var ip net.IP
- var tcp, udp int
-
+func (ps *peerSet) newEmptyNode(pk string) *enode.Node {
b, err := hex.DecodeString(pk)
if err != nil {
panic(err)
@@ -1296,10 +1139,34 @@ func (ps *peerSet) newNode(pk string) *enode.Node {
if err != nil {
panic(err)
}
+ return enode.NewV4(pubkey, nil, 0, 0)
+}
+
+func (ps *peerSet) Status() {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+ for label := range ps.directConn {
+ l := label.String()
+ for id := range ps.label2Nodes[label] {
+ _, ok := ps.peers[id]
+ log.Debug("direct conn", "label", l, "id", id, "connected", ok)
+ }
+ }
+
+ for label, peers := range ps.groupConnPeers {
+ l := label.String()
+ for id := range peers {
+ _, ok := ps.peers[id]
+ log.Debug("group conn", "label", l, "id", id, "connected", ok)
+ }
+ }
- node := ps.tab.GetNode(enode.PubkeyToIDV4(pubkey))
- if node != nil {
- return node
+ connected := 0
+ for id := range ps.allDirectPeers {
+ if _, ok := ps.peers[id]; ok {
+ connected++
+ }
}
- return enode.NewV4(pubkey, ip, tcp, udp)
+ log.Debug("all direct peers",
+ "connected", connected, "all", len(ps.allDirectPeers))
}