diff options
Diffstat (limited to 'dex/peer.go')
-rw-r--r-- | dex/peer.go | 463 |
1 files changed, 165 insertions, 298 deletions
diff --git a/dex/peer.go b/dex/peer.go index 0a67db688..67a59348d 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -37,7 +37,6 @@ import ( "encoding/hex" "errors" "fmt" - "net" "sync" "time" @@ -105,7 +104,8 @@ const ( handshakeTimeout = 5 * time.Second - groupNodeNum = 3 + groupConnNum = 3 + groupConnTimeout = 3 * time.Minute ) // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known @@ -129,6 +129,17 @@ type peerLabel struct { round uint64 } +func (p peerLabel) String() string { + var t string + switch p.set { + case dkgset: + t = fmt.Sprintf("DKGSet round: %d", p.round) + case notaryset: + t = fmt.Sprintf("NotarySet round: %d chain: %d", p.round, p.chainID) + } + return t +} + type peer struct { id string @@ -711,28 +722,27 @@ type peerSet struct { tab *nodeTable selfPK string - srvr p2pServer - gov governance - peer2Labels map[string]map[peerLabel]struct{} - label2Peers map[peerLabel]map[string]struct{} - history map[uint64]struct{} - notaryHistory map[uint64]struct{} - dkgHistory map[uint64]struct{} + srvr p2pServer + gov governance + + label2Nodes map[peerLabel]map[string]*enode.Node + directConn map[peerLabel]struct{} + groupConnPeers map[peerLabel]map[string]time.Time + allDirectPeers map[string]map[peerLabel]struct{} } // newPeerSet creates a new peer set to track the active participants. func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { return &peerSet{ - peers: make(map[string]*peer), - gov: gov, - srvr: srvr, - tab: tab, - selfPK: hex.EncodeToString(crypto.FromECDSAPub(&srvr.GetPrivateKey().PublicKey)), - peer2Labels: make(map[string]map[peerLabel]struct{}), - label2Peers: make(map[peerLabel]map[string]struct{}), - history: make(map[uint64]struct{}), - notaryHistory: make(map[uint64]struct{}), - dkgHistory: make(map[uint64]struct{}), + peers: make(map[string]*peer), + gov: gov, + srvr: srvr, + tab: tab, + selfPK: hex.EncodeToString(crypto.FromECDSAPub(&srvr.GetPrivateKey().PublicKey)), + label2Nodes: make(map[peerLabel]map[string]*enode.Node), + directConn: make(map[peerLabel]struct{}), + groupConnPeers: make(map[peerLabel]map[string]time.Time), + allDirectPeers: make(map[string]map[peerLabel]struct{}), } } @@ -832,8 +842,8 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.label2Peers[label])) - for id := range ps.label2Peers[label] { + list := make([]*peer, 0, len(ps.label2Nodes[label])) + for id := range ps.label2Nodes[label] { if p, ok := ps.peers[id]; ok { list = append(list, p) } @@ -845,8 +855,8 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.label2Peers[label])) - for id := range ps.label2Peers[label] { + list := make([]*peer, 0, len(ps.label2Nodes[label])) + for id := range ps.label2Nodes[label] { if p, ok := ps.peers[id]; ok { if !p.knownVotes.Contains(hash) { list = append(list, p) @@ -948,345 +958,178 @@ func (ps *peerSet) Close() { } func (ps *peerSet) BuildConnection(round uint64) { - ps.lock.Lock() - defer ps.lock.Unlock() - defer ps.dumpPeerLabel(fmt.Sprintf("BuildConnection: %d", round)) - - ps.history[round] = struct{}{} + dkgLabel := peerLabel{set: dkgset, round: round} + if _, ok := ps.label2Nodes[dkgLabel]; !ok { + dkgPKs, err := ps.gov.DKGSet(round) + if err != nil { + log.Error("get DKG set fail", "round", round, "err", err) + } - dkgPKs, err := ps.gov.DKGSet(round) - if err != nil { - log.Error("get dkg set fail", "round", round, "err", err) - } + nodes := ps.pksToNodes(dkgPKs) + ps.label2Nodes[dkgLabel] = nodes - // build dkg connection - _, inDKGSet := dkgPKs[ps.selfPK] - if inDKGSet { - delete(dkgPKs, ps.selfPK) - dkgLabel := peerLabel{set: dkgset, round: round} - for pk := range dkgPKs { - ps.addDirectPeer(pk, dkgLabel) + if _, exists := nodes[ps.srvr.Self().ID().String()]; exists { + ps.buildDirectConn(dkgLabel) + } else { + ps.buildGroupConn(dkgLabel) } } - var inOneNotarySet bool - for cid := uint32(0); cid < ps.gov.GetNumChains(round); cid++ { - notaryPKs, err := ps.gov.NotarySet(round, cid) - if err != nil { - log.Error("get notary set fail", - "round", round, "chain id", cid, "err", err) - continue - } - label := peerLabel{set: notaryset, chainID: cid, round: round} - // not in notary set, add group - if _, ok := notaryPKs[ps.selfPK]; !ok { - var nodes []*enode.Node - for pk := range notaryPKs { - node := ps.newNode(pk) - nodes = append(nodes, node) - ps.addLabel(node, label) + for chainID := uint32(0); chainID < ps.gov.GetNumChains(round); chainID++ { + notaryLabel := peerLabel{set: notaryset, chainID: chainID, round: round} + if _, ok := ps.label2Nodes[notaryLabel]; !ok { + notaryPKs, err := ps.gov.NotarySet(round, chainID) + if err != nil { + log.Error("get notary set fail", + "round", round, "chainID", chainID, "err", err) + continue } - ps.srvr.AddGroup(notarySetName(cid, round), nodes, groupNodeNum) - continue - } - delete(notaryPKs, ps.selfPK) - for pk := range notaryPKs { - ps.addDirectPeer(pk, label) - } - inOneNotarySet = true - } + nodes := ps.pksToNodes(notaryPKs) + ps.label2Nodes[notaryLabel] = nodes - // build some connections to DKG nodes - if !inDKGSet && inOneNotarySet { - var nodes []*enode.Node - label := peerLabel{set: dkgset, round: round} - for pk := range dkgPKs { - node := ps.newNode(pk) - nodes = append(nodes, node) - ps.addLabel(node, label) + if _, exists := nodes[ps.srvr.Self().ID().String()]; exists { + ps.buildDirectConn(notaryLabel) + } else { + ps.buildGroupConn(notaryLabel) + } } - ps.srvr.AddGroup(dkgSetName(round), nodes, groupNodeNum) } } func (ps *peerSet) ForgetConnection(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - defer ps.dumpPeerLabel(fmt.Sprintf("ForgetConnection: %d", round)) - for r := range ps.history { - if r <= round { - ps.forgetConnection(round) - delete(ps.history, r) + for label := range ps.directConn { + if label.round <= round { + ps.forgetDirectConn(label) } } -} - -func (ps *peerSet) forgetConnection(round uint64) { - dkgPKs, err := ps.gov.DKGSet(round) - if err != nil { - log.Error("get dkg set fail", "round", round, "err", err) - } - _, inDKGSet := dkgPKs[ps.selfPK] - if inDKGSet { - delete(dkgPKs, ps.selfPK) - label := peerLabel{set: dkgset, round: round} - for id := range dkgPKs { - ps.removeDirectPeer(id, label) + for label := range ps.groupConnPeers { + if label.round <= round { + ps.forgetGroupConn(label) } } - var inOneNotarySet bool - for cid := uint32(0); cid < ps.gov.GetNumChains(round); cid++ { - notaryPKs, err := ps.gov.NotarySet(round, cid) - if err != nil { - log.Error("get notary set fail", - "round", round, "chain id", cid, "err", err) - continue + for label := range ps.label2Nodes { + if label.round <= round { + delete(ps.label2Nodes, label) } - - label := peerLabel{set: notaryset, chainID: cid, round: round} - - // not in notary set, add group - if _, ok := notaryPKs[ps.selfPK]; !ok { - var nodes []*enode.Node - for id := range notaryPKs { - node := ps.newNode(id) - nodes = append(nodes, node) - ps.removeLabel(node, label) - } - ps.srvr.RemoveGroup(notarySetName(cid, round)) - continue - } - - delete(notaryPKs, ps.selfPK) - for pk := range notaryPKs { - ps.removeDirectPeer(pk, label) - } - inOneNotarySet = true - } - - // build some connections to DKG nodes - if !inDKGSet && inOneNotarySet { - var nodes []*enode.Node - label := peerLabel{set: dkgset, round: round} - for id := range dkgPKs { - node := ps.newNode(id) - nodes = append(nodes, node) - ps.removeLabel(node, label) - } - ps.srvr.RemoveGroup(dkgSetName(round)) } } -func (ps *peerSet) BuildNotaryConn(round uint64) { +func (ps *peerSet) EnsureGroupConn() { ps.lock.Lock() defer ps.lock.Unlock() - defer ps.dumpPeerLabel(fmt.Sprintf("BuildNotaryConn: %d", round)) - - if _, ok := ps.notaryHistory[round]; ok { - return - } - - ps.notaryHistory[round] = struct{}{} - for chainID := uint32(0); chainID < ps.gov.GetNumChains(round); chainID++ { - s, err := ps.gov.NotarySet(round, chainID) - if err != nil { - log.Error("get notary set fail", - "round", round, "chain id", chainID, "err", err) - continue - } - - // not in notary set, add group - if _, ok := s[ps.selfPK]; !ok { - var nodes []*enode.Node - for id := range s { - nodes = append(nodes, ps.newNode(id)) + now := time.Now() + for label, peers := range ps.groupConnPeers { + // Remove timeout group conn peer. + for id, addtime := range peers { + if ps.peers[id] == nil && time.Since(addtime) > groupConnTimeout { + ps.removeDirectPeer(id, label) + delete(ps.groupConnPeers[label], id) } - ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum) - continue - } - - label := peerLabel{ - set: notaryset, - chainID: chainID, - round: round, } - delete(s, ps.selfPK) - for pk := range s { - ps.addDirectPeer(pk, label) - } - } -} -func (ps *peerSet) dumpPeerLabel(s string) { - log.Debug(s, "peer num", len(ps.peers)) - for id, labels := range ps.peer2Labels { - _, ok := ps.peers[id] - for label := range labels { - log.Debug(s, "connected", ok, "id", id[:16], - "round", label.round, "cid", label.chainID, "set", label.set) + // Add new group conn peer. + for id := range ps.label2Nodes[label] { + if len(ps.groupConnPeers[label]) >= groupConnNum { + break + } + ps.groupConnPeers[label][id] = now + ps.addDirectPeer(id, label) } } } -func (ps *peerSet) ForgetNotaryConn(round uint64) { +func (ps *peerSet) Refresh() { ps.lock.Lock() defer ps.lock.Unlock() - defer ps.dumpPeerLabel(fmt.Sprintf("ForgetNotaryConn: %d", round)) - - // forget all the rounds before the given round - for r := range ps.notaryHistory { - if r <= round { - ps.forgetNotaryConn(r) - delete(ps.notaryHistory, r) + for id := range ps.allDirectPeers { + if ps.peers[id] == nil { + if node := ps.tab.GetNode(enode.HexID(id)); node != nil { + ps.srvr.AddDirectPeer(node) + } } } } -func (ps *peerSet) forgetNotaryConn(round uint64) { - for chainID := uint32(0); chainID < ps.gov.GetNumChains(round); chainID++ { - s, err := ps.gov.NotarySet(round, chainID) - if err != nil { - log.Error("get notary set fail", - "round", round, "chain id", chainID, "err", err) - continue - } - if _, ok := s[ps.selfPK]; !ok { - ps.srvr.RemoveGroup(notarySetName(chainID, round)) - continue - } - - label := peerLabel{ - set: notaryset, - chainID: chainID, - round: round, - } - delete(s, ps.selfPK) - for pk := range s { - ps.removeDirectPeer(pk, label) - } +func (ps *peerSet) buildDirectConn(label peerLabel) { + ps.directConn[label] = struct{}{} + for id := range ps.label2Nodes[label] { + ps.addDirectPeer(id, label) } } -func notarySetName(chainID uint32, round uint64) string { - return fmt.Sprintf("%d-%d-notaryset", chainID, round) -} - -func dkgSetName(round uint64) string { - return fmt.Sprintf("%d-dkgset", round) -} - -func (ps *peerSet) BuildDKGConn(round uint64) { - ps.lock.Lock() - defer ps.lock.Unlock() - defer ps.dumpPeerLabel(fmt.Sprintf("BuildDKGConn: %d", round)) - s, err := ps.gov.DKGSet(round) - if err != nil { - log.Error("get dkg set fail", "round", round) - return - } - - if _, ok := s[ps.selfPK]; !ok { - return - } - ps.dkgHistory[round] = struct{}{} - - delete(s, ps.selfPK) - for pk := range s { - ps.addDirectPeer(pk, peerLabel{ - set: dkgset, - round: round, - }) +func (ps *peerSet) forgetDirectConn(label peerLabel) { + for id := range ps.label2Nodes[label] { + ps.removeDirectPeer(id, label) } + delete(ps.directConn, label) } -func (ps *peerSet) ForgetDKGConn(round uint64) { - ps.lock.Lock() - defer ps.lock.Unlock() - defer ps.dumpPeerLabel(fmt.Sprintf("ForgetDKGConn: %d", round)) - - // forget all the rounds before the given round - for r := range ps.dkgHistory { - if r <= round { - ps.forgetDKGConn(r) - delete(ps.dkgHistory, r) +func (ps *peerSet) buildGroupConn(label peerLabel) { + peers := make(map[string]time.Time) + now := time.Now() + for id := range ps.label2Nodes[label] { + peers[id] = now + ps.addDirectPeer(id, label) + if len(peers) >= groupConnNum { + break } } + ps.groupConnPeers[label] = peers } -func (ps *peerSet) forgetDKGConn(round uint64) { - s, err := ps.gov.DKGSet(round) - if err != nil { - log.Error("get dkg set fail", "round", round) - return +func (ps *peerSet) forgetGroupConn(label peerLabel) { + for id := range ps.groupConnPeers[label] { + ps.removeDirectPeer(id, label) } - if _, ok := s[ps.selfPK]; !ok { + delete(ps.groupConnPeers, label) +} + +func (ps *peerSet) addDirectPeer(id string, label peerLabel) { + if len(ps.allDirectPeers[id]) > 0 { + ps.allDirectPeers[id][label] = struct{}{} return } + ps.allDirectPeers[id] = map[peerLabel]struct{}{label: {}} - delete(s, ps.selfPK) - label := peerLabel{ - set: dkgset, - round: round, + node := ps.tab.GetNode(enode.HexID(id)) + if node == nil { + node = ps.label2Nodes[label][id] } - for pk := range s { - ps.removeDirectPeer(pk, label) - } -} - -// make sure the ps.lock is held -func (ps *peerSet) addDirectPeer(pk string, label peerLabel) { - node := ps.newNode(pk) - ps.addLabel(node, label) ps.srvr.AddDirectPeer(node) } -// make sure the ps.lock is held -func (ps *peerSet) removeDirectPeer(pk string, label peerLabel) { - node := ps.newNode(pk) - ps.removeLabel(node, label) - if len(ps.peer2Labels[node.ID().String()]) == 0 { - ps.srvr.RemoveDirectPeer(node) +func (ps *peerSet) removeDirectPeer(id string, label peerLabel) { + if len(ps.allDirectPeers[id]) == 0 { + return } -} - -// make sure the ps.lock is held -func (ps *peerSet) addLabel(node *enode.Node, label peerLabel) { - id := node.ID().String() - if _, ok := ps.peer2Labels[id]; !ok { - ps.peer2Labels[id] = make(map[peerLabel]struct{}) + delete(ps.allDirectPeers[id], label) + if len(ps.allDirectPeers[id]) == 0 { + ps.srvr.RemoveDirectPeer(ps.label2Nodes[label][id]) + delete(ps.allDirectPeers, id) } - if _, ok := ps.label2Peers[label]; !ok { - ps.label2Peers[label] = make(map[string]struct{}) - } - ps.peer2Labels[id][label] = struct{}{} - ps.label2Peers[label][id] = struct{}{} } -// make sure the ps.lock is held -func (ps *peerSet) removeLabel(node *enode.Node, label peerLabel) { - id := node.ID().String() - - delete(ps.peer2Labels[id], label) - delete(ps.label2Peers[label], id) - if len(ps.peer2Labels[id]) == 0 { - delete(ps.peer2Labels, id) - } - if len(ps.label2Peers[label]) == 0 { - delete(ps.label2Peers, label) +func (ps *peerSet) pksToNodes(pks map[string]struct{}) map[string]*enode.Node { + nodes := map[string]*enode.Node{} + for pk := range pks { + n := ps.newEmptyNode(pk) + if n.ID() == ps.srvr.Self().ID() { + n = ps.srvr.Self() + } + nodes[n.ID().String()] = n } + return nodes } -// TODO: improve this by not using pk. -func (ps *peerSet) newNode(pk string) *enode.Node { - var ip net.IP - var tcp, udp int - +func (ps *peerSet) newEmptyNode(pk string) *enode.Node { b, err := hex.DecodeString(pk) if err != nil { panic(err) @@ -1296,10 +1139,34 @@ func (ps *peerSet) newNode(pk string) *enode.Node { if err != nil { panic(err) } + return enode.NewV4(pubkey, nil, 0, 0) +} + +func (ps *peerSet) Status() { + ps.lock.Lock() + defer ps.lock.Unlock() + for label := range ps.directConn { + l := label.String() + for id := range ps.label2Nodes[label] { + _, ok := ps.peers[id] + log.Debug("direct conn", "label", l, "id", id, "connected", ok) + } + } + + for label, peers := range ps.groupConnPeers { + l := label.String() + for id := range peers { + _, ok := ps.peers[id] + log.Debug("group conn", "label", l, "id", id, "connected", ok) + } + } - node := ps.tab.GetNode(enode.PubkeyToIDV4(pubkey)) - if node != nil { - return node + connected := 0 + for id := range ps.allDirectPeers { + if _, ok := ps.peers[id]; ok { + connected++ + } } - return enode.NewV4(pubkey, ip, tcp, udp) + log.Debug("all direct peers", + "connected", connected, "all", len(ps.allDirectPeers)) } |