diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 120 |
1 files changed, 58 insertions, 62 deletions
diff --git a/dex/handler.go b/dex/handler.go index 20df41709..8971ad500 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -87,6 +87,9 @@ const ( maxPullVotePeers = 1 pullVoteRateLimit = 10 * time.Second + + maxAgreementResultBroadcast = 3 + maxFinalizedBlockBroadcast = 3 ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -888,19 +891,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkAgreement(rlpHash(agreement)) pm.receiveCh <- &agreement - case msg.Code == RandomnessMsg: - if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { - break - } - // Broadcast this to all peer - var randomnesses []*coreTypes.BlockRandomnessResult - if err := msg.Decode(&randomnesses); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - for _, randomness := range randomnesses { - p.MarkRandomness(rlpHash(randomness)) - pm.receiveCh <- randomness - } case msg.Code == DKGPrivateShareMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -949,20 +939,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&pos); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + if block := pm.cache.finalizedBlock(pos); block != nil { + log.Debug("Push finalized block as votes", "block", block) + return p.SendCoreBlocks([]*coreTypes.Block{block}) + } votes := pm.cache.votes(pos) log.Debug("Push votes", "votes", votes) return p.SendVotes(votes) - case msg.Code == PullRandomnessMsg: - if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { - break - } - var hashes coreCommon.Hashes - if err := msg.Decode(&hashes); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - randomnesses := pm.cache.randomness(hashes) - log.Debug("Push randomness", "randomness", randomnesses) - return p.SendRandomnesses(randomnesses) case msg.Code == GetGovStateMsg: var hash common.Hash if err := msg.Decode(&hash); err != nil { @@ -1098,6 +1081,31 @@ func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { } } +// BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers. +func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) { + if len(block.Finalization.Randomness) == 0 { + log.Warn("Ignore broadcast finalized block without randomness", "block", block) + return + } + pm.cache.addFinalizedBlock(block) + + // send to notary nodes first (direct) + label := peerLabel{ + set: notaryset, + round: block.Position.Round, + } + peers := pm.peers.PeersWithLabel(label) + count := maxFinalizedBlockBroadcast + for _, peer := range peers { + if count <= 0 { + break + } else { + count-- + peer.AsyncSendCoreBlocks([]*coreTypes.Block{block}) + } + } +} + // BroadcastCoreBlock broadcasts the core block to all its peers. func (pm *ProtocolManager) BroadcastCoreBlock(block *coreTypes.Block) { pm.cache.addBlock(block) @@ -1122,39 +1130,32 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { func (pm *ProtocolManager) BroadcastAgreementResult( agreement *coreTypes.AgreementResult) { - // send to dkg nodes first (direct) - label := peerLabel{ - set: dkgset, - round: agreement.Position.Round, - } - for _, peer := range pm.peers.PeersWithLabel(label) { - if !peer.knownAgreements.Contains(rlpHash(agreement)) { - peer.AsyncSendAgreement(agreement) - } + block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}) + if len(block) != 0 { + block[0].Finalization.Height = agreement.FinalizationHeight + block[0].Finalization.Randomness = agreement.Randomness + pm.cache.addFinalizedBlock(block[0]) } - for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { - peer.AsyncSendAgreement(agreement) - } -} - -func (pm *ProtocolManager) BroadcastRandomnessResult( - randomness *coreTypes.BlockRandomnessResult) { - pm.cache.addRandomness(randomness) // send to notary nodes first (direct) label := peerLabel{ set: notaryset, - round: randomness.Position.Round, + round: agreement.Position.Round, } - randomnesses := []*coreTypes.BlockRandomnessResult{randomness} - for _, peer := range pm.peers.PeersWithLabel(label) { - if !peer.knownRandomnesses.Contains(rlpHash(randomness)) { - peer.AsyncSendRandomnesses(randomnesses) + peers := pm.peers.PeersWithLabel(label) + count := maxAgreementResultBroadcast + agrHash := rlpHash(agreement) + for _, peer := range peers { + if count <= 0 { + peer.MarkAgreement(agrHash) + } else if !peer.knownAgreements.Contains(agrHash) { + count-- + peer.AsyncSendAgreement(agreement) } } - for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) { - peer.AsyncSendRandomnesses(randomnesses) + for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { + peer.AsyncSendAgreement(agreement) } } @@ -1177,7 +1178,7 @@ func (pm *ProtocolManager) SendDKGPrivateShare( func (pm *ProtocolManager) BroadcastDKGPrivateShare( privateShare *dkgTypes.PrivateShare) { - label := peerLabel{set: dkgset, round: privateShare.Round} + label := peerLabel{set: notaryset, round: privateShare.Round} for _, peer := range pm.peers.PeersWithLabel(label) { if !peer.knownDKGPrivateShares.Contains(rlpHash(privateShare)) { peer.AsyncSendDKGPrivateShare(privateShare) @@ -1187,7 +1188,7 @@ func (pm *ProtocolManager) BroadcastDKGPrivateShare( func (pm *ProtocolManager) BroadcastDKGPartialSignature( psig *dkgTypes.PartialSignature) { - label := peerLabel{set: dkgset, round: psig.Round} + label := peerLabel{set: notaryset, round: psig.Round} for _, peer := range pm.peers.PeersWithLabel(label) { peer.AsyncSendDKGPartialSignature(psig) } @@ -1218,17 +1219,6 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } -func (pm *ProtocolManager) BroadcastPullRandomness( - hashes coreCommon.Hashes) { - // TODO(jimmy-dexon): pull from dkg set only. - for idx, peer := range pm.peers.Peers() { - if idx >= maxPullPeers { - break - } - peer.AsyncSendPullRandomness(hashes) - } -} - func (pm *ProtocolManager) txBroadcastLoop() { queueSizeMax := common.StorageSize(100 * 1024) // 100 KB currentSize := common.StorageSize(0) @@ -1321,9 +1311,15 @@ func (pm *ProtocolManager) peerSetLoop() { for i := round; i <= dexCore.DKGDelayRound; i++ { pm.peers.BuildConnection(i) } + round = dexCore.DKGDelayRound } else { pm.peers.BuildConnection(round) } + CRSRound := pm.gov.CRSRound() + if CRSRound > round { + pm.peers.BuildConnection(CRSRound) + round = CRSRound + } for { select { @@ -1340,7 +1336,7 @@ func (pm *ProtocolManager) peerSetLoop() { } log.Debug("ProtocolManager: new round", "round", newRound) - if newRound == round { + if newRound <= round { break } |