From 0c63646ca8b06bb527737cd6e2a7fe58f169efff Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Wed, 27 Mar 2019 20:47:32 +0800 Subject: core: merge notarySet and DKGSet (#265) * vendor: sync to latest core * core: merge notarySet and dkgSet * dex: optimize network traffic for finalized block --- dex/peer.go | 101 ++++++++++-------------------------------------------------- 1 file changed, 17 insertions(+), 84 deletions(-) (limited to 'dex/peer.go') diff --git a/dex/peer.go b/dex/peer.go index 0fa1ac61d..0d23e630f 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -67,7 +67,6 @@ const ( maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) maxKnownAgreements = 10240 - maxKnownRandomnesses = 10240 maxKnownDKGPrivateShares = 1024 // this related to DKG Size // maxQueuedTxs is the maximum number of transaction lists to queue up before @@ -90,7 +89,6 @@ const ( maxQueuedCoreBlocks = 16 maxQueuedVotes = 128 maxQueuedAgreements = 16 - maxQueuedRandomnesses = 16 maxQueuedDKGPrivateShare = 16 maxQueuedDKGParitialSignature = 16 maxQueuedPullBlocks = 128 @@ -114,8 +112,7 @@ type PeerInfo struct { type setType uint32 const ( - dkgset = iota - notaryset + notaryset = iota ) type peerLabel struct { @@ -126,8 +123,6 @@ type peerLabel struct { func (p peerLabel) String() string { var t string switch p.set { - case dkgset: - t = fmt.Sprintf("DKGSet round: %d", p.round) case notaryset: t = fmt.Sprintf("NotarySet round: %d", p.round) } @@ -150,7 +145,6 @@ type peer struct { knownRecords mapset.Set // Set of node record known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer knownAgreements mapset.Set - knownRandomnesses mapset.Set knownDKGPrivateShares mapset.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer @@ -159,7 +153,6 @@ type peer struct { queuedCoreBlocks chan []*coreTypes.Block queuedVotes chan []*coreTypes.Vote queuedAgreements chan *coreTypes.AgreementResult - queuedRandomnesses chan []*coreTypes.BlockRandomnessResult queuedDKGPrivateShares chan *dkgTypes.PrivateShare queuedDKGPartialSignatures chan *dkgTypes.PartialSignature queuedPullBlocks chan coreCommon.Hashes @@ -178,7 +171,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { knownRecords: mapset.NewSet(), knownBlocks: mapset.NewSet(), knownAgreements: mapset.NewSet(), - knownRandomnesses: mapset.NewSet(), knownDKGPrivateShares: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedRecords: make(chan []*enr.Record, maxQueuedRecords), @@ -187,7 +179,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { queuedCoreBlocks: make(chan []*coreTypes.Block, maxQueuedCoreBlocks), queuedVotes: make(chan []*coreTypes.Vote, maxQueuedVotes), queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements), - queuedRandomnesses: make(chan []*coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare), queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), @@ -252,11 +243,6 @@ func (p *peer) broadcast() { return } p.Log().Trace("Broadcast agreement") - case randomnesses := <-p.queuedRandomnesses: - if err := p.SendRandomnesses(randomnesses); err != nil { - return - } - p.Log().Trace("Broadcast randomnesses", "count", len(randomnesses)) case privateShare := <-p.queuedDKGPrivateShares: if err := p.SendDKGPrivateShare(privateShare); err != nil { return @@ -277,11 +263,6 @@ func (p *peer) broadcast() { return } p.Log().Trace("Pulling Votes", "position", pos) - case hashes := <-p.queuedPullRandomness: - if err := p.SendPullRandomness(hashes); err != nil { - return - } - p.Log().Trace("Pulling Randomnesses", "hashes", hashes) case <-p.term: return case <-time.After(100 * time.Millisecond): @@ -366,13 +347,6 @@ func (p *peer) MarkAgreement(hash common.Hash) { p.knownAgreements.Add(hash) } -func (p *peer) MarkRandomness(hash common.Hash) { - for p.knownRandomnesses.Cardinality() >= maxKnownRandomnesses { - p.knownRandomnesses.Pop() - } - p.knownRandomnesses.Add(hash) -} - func (p *peer) MarkDKGPrivateShares(hash common.Hash) { for p.knownDKGPrivateShares.Cardinality() >= maxKnownDKGPrivateShares { p.knownDKGPrivateShares.Pop() @@ -513,24 +487,6 @@ func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { } } -func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) error { - for _, randomness := range randomnesses { - p.knownRandomnesses.Add(rlpHash(randomness)) - } - return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg) -} - -func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) { - select { - case p.queuedRandomnesses <- randomnesses: - for _, randomness := range randomnesses { - p.knownRandomnesses.Add(rlpHash(randomness)) - } - default: - p.Log().Debug("Dropping randomness result") - } -} - func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error { p.knownDKGPrivateShares.Add(rlpHash(privateShare)) return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg) @@ -581,18 +537,6 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { } } -func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error { - return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg) -} - -func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) { - select { - case p.queuedPullRandomness <- hashes: - default: - p.Log().Debug("Dropping Pull Randomness") - } -} - // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error { return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg) @@ -871,38 +815,43 @@ func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer { return list } -// PeersWithoutNodeRecord retrieves a list of peers that do not have a -// given record in their set of known hashes. -func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutLabel(label peerLabel) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.knownRecords.Contains(hash) { + length := len(ps.peers) - len(ps.label2Nodes[label]) + if length <= 0 { + return []*peer{} + } + list := make([]*peer, 0, len(ps.peers)-len(ps.label2Nodes[label])) + peersWithLabel := ps.label2Nodes[label] + for id, p := range ps.peers { + if _, exist := peersWithLabel[id]; !exist { list = append(list, p) } } return list } -func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { +// PeersWithoutNodeRecord retrieves a list of peers that do not have a +// given record in their set of known hashes. +func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownAgreements.Contains(hash) { + if !p.knownRecords.Contains(hash) { list = append(list, p) } } return list } -func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownRandomnesses.Contains(hash) { + if !p.knownAgreements.Contains(hash) { list = append(list, p) } } @@ -956,23 +905,6 @@ func (ps *peerSet) BuildConnection(round uint64) { log.Info("Build connection", "round", round) - dkgLabel := peerLabel{set: dkgset, round: round} - if _, ok := ps.label2Nodes[dkgLabel]; !ok { - dkgPKs, err := ps.gov.DKGSet(round) - if err != nil { - log.Error("get DKG set fail", "round", round, "err", err) - } - - nodes := ps.pksToNodes(dkgPKs) - ps.label2Nodes[dkgLabel] = nodes - - if _, exists := nodes[ps.srvr.Self().ID().String()]; exists { - ps.buildDirectConn(dkgLabel) - } else { - ps.buildGroupConn(dkgLabel) - } - } - notaryLabel := peerLabel{set: notaryset, round: round} if _, ok := ps.label2Nodes[notaryLabel]; !ok { notaryPKs, err := ps.gov.NotarySet(round) @@ -990,6 +922,7 @@ func (ps *peerSet) BuildConnection(round uint64) { ps.buildGroupConn(notaryLabel) } } + } func (ps *peerSet) ForgetConnection(round uint64) { -- cgit v1.2.3