From 7783bc4ba52bfc534d5b4d91e78abb2ddad7d078 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 26 Mar 2019 11:59:24 +0800 Subject: core: bring back agreement result (#515) * core: bring back agreement result * add logger * Fix * fixup --- core/agreement-mgr.go | 24 ++++++---- core/agreement.go | 102 ++++++++++++++++++++++++++++++++--------- core/blockchain.go | 65 ++++++++++++++++++++------ core/consensus.go | 48 +++++++++++-------- core/syncer/agreement.go | 43 +++++++++++++---- core/test/network.go | 13 ++++-- core/types/block-randomness.go | 11 ++++- 7 files changed, 230 insertions(+), 76 deletions(-) (limited to 'core') diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index bc6923a..9086e51 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -274,6 +274,9 @@ func (mgr *agreementMgr) processAgreementResult( } if result.Position == aID && !mgr.baModule.confirmed() { mgr.logger.Info("Syncing BA", "position", result.Position) + if result.Position.Round >= DKGDelayRound { + return mgr.baModule.processAgreementResult(result) + } for key := range result.Votes { if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err @@ -285,21 +288,26 @@ func (mgr *agreementMgr) processAgreementResult( if err != nil { return err } - mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA", - "hash", result.BlockHash) - mgr.network.PullBlocks(common.Hashes{result.BlockHash}) - mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) - crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) - for key := range result.Votes { - if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { - return err + if result.Position.Round < DKGDelayRound { + mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA", + "hash", result.BlockHash) + mgr.network.PullBlocks(common.Hashes{result.BlockHash}) + for key := range result.Votes { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { + return err + } } } + mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) + crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) leader, err := mgr.cache.GetLeaderNode(result.Position) if err != nil { return err } mgr.baModule.restart(nIDs, result.Position, leader, crs) + if result.Position.Round >= DKGDelayRound { + return mgr.baModule.processAgreementResult(result) + } } return nil } diff --git a/core/agreement.go b/core/agreement.go index 5e7b7de..d53440b 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -117,20 +117,21 @@ type agreementData struct { // agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. type agreement struct { - state agreementState - data *agreementData - aID *atomic.Value - doneChan chan struct{} - notarySet map[types.NodeID]struct{} - hasVoteFast bool - hasOutput bool - lock sync.RWMutex - pendingBlock []pendingBlock - pendingVote []pendingVote - candidateBlock map[common.Hash]*types.Block - fastForward chan uint64 - signer *utils.Signer - logger common.Logger + state agreementState + data *agreementData + aID *atomic.Value + doneChan chan struct{} + notarySet map[types.NodeID]struct{} + hasVoteFast bool + hasOutput bool + lock sync.RWMutex + pendingBlock []pendingBlock + pendingVote []pendingVote + pendingAgreementResult map[types.Position]*types.AgreementResult + candidateBlock map[common.Hash]*types.Block + fastForward chan uint64 + signer *utils.Signer + logger common.Logger } // newAgreement creates a agreement instance. @@ -146,11 +147,12 @@ func newAgreement( ID: ID, leader: leader, }, - aID: &atomic.Value{}, - candidateBlock: make(map[common.Hash]*types.Block), - fastForward: make(chan uint64, 1), - signer: signer, - logger: logger, + aID: &atomic.Value{}, + pendingAgreementResult: make(map[types.Position]*types.AgreementResult), + candidateBlock: make(map[common.Hash]*types.Block), + fastForward: make(chan uint64, 1), + signer: signer, + logger: logger, } agreement.stop() return agreement @@ -205,6 +207,22 @@ func (a *agreement) restart( return } + var result *types.AgreementResult + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingAgreementResult := make( + map[types.Position]*types.AgreementResult) + for pos, agr := range a.pendingAgreementResult { + if pos.Newer(aID) { + newPendingAgreementResult[pos] = agr + } else if pos == aID { + result = agr + } + } + a.pendingAgreementResult = newPendingAgreementResult + }() + expireTime := time.Now().Add(-10 * time.Second) replayBlock := make([]*types.Block, 0) func() { @@ -215,7 +233,11 @@ func (a *agreement) restart( if aID.Newer(pending.block.Position) { continue } else if pending.block.Position == aID { - replayBlock = append(replayBlock, pending.block) + if result == nil || + result.Position.Round < DKGDelayRound || + result.BlockHash == pending.block.Hash { + replayBlock = append(replayBlock, pending.block) + } } else if pending.receivedTime.After(expireTime) { newPendingBlock = append(newPendingBlock, pending) } @@ -232,7 +254,9 @@ func (a *agreement) restart( if aID.Newer(pending.vote.Position) { continue } else if pending.vote.Position == aID { - replayVote = append(replayVote, pending.vote) + if result == nil || result.Position.Round < DKGDelayRound { + replayVote = append(replayVote, pending.vote) + } } else if pending.receivedTime.After(expireTime) { newPendingVote = append(newPendingVote, pending) } @@ -247,6 +271,13 @@ func (a *agreement) restart( } } + if result != nil { + if err := a.processAgreementResult(result); err != nil { + a.logger.Error("Failed to process agreement result when retarting", + "result", result) + } + } + for _, vote := range replayVote { if err := a.processVote(vote); err != nil { a.logger.Error("Failed to process vote when restarting agreement", @@ -526,6 +557,8 @@ func (a *agreement) processFinalizedBlock(block *types.Block) { } a.addCandidateBlockNoLock(block) a.hasOutput = true + a.data.lock.Lock() + defer a.data.lock.Unlock() a.data.recv.ConfirmBlock(block.Hash, nil) if a.doneChan != nil { close(a.doneChan) @@ -533,6 +566,33 @@ func (a *agreement) processFinalizedBlock(block *types.Block) { } } +func (a *agreement) processAgreementResult(result *types.AgreementResult) error { + a.lock.Lock() + defer a.lock.Unlock() + aID := a.agreementID() + if result.Position.Older(aID) { + return nil + } else if result.Position.Newer(aID) { + a.pendingAgreementResult[result.Position] = result + return nil + } + if a.hasOutput { + return nil + } + a.data.lock.Lock() + defer a.data.lock.Unlock() + if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist { + a.data.recv.PullBlocks(common.Hashes{result.BlockHash}) + } + a.hasOutput = true + a.data.recv.ConfirmBlock(result.BlockHash, nil) + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } + return nil +} + func (a *agreement) done() <-chan struct{} { a.lock.Lock() defer a.lock.Unlock() diff --git a/core/blockchain.go b/core/blockchain.go index 798a080..283d22e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -127,18 +127,19 @@ type tsigVerifierGetter interface { } type blockChain struct { - lock sync.RWMutex - ID types.NodeID - lastConfirmed *types.Block - lastDelivered *types.Block - signer *utils.Signer - vGetter tsigVerifierGetter - app Application - logger common.Logger - configs []blockChainConfig - pendingBlocks pendingBlockRecords - confirmedBlocks types.BlocksByPosition - dMoment time.Time + lock sync.RWMutex + ID types.NodeID + lastConfirmed *types.Block + lastDelivered *types.Block + signer *utils.Signer + vGetter tsigVerifierGetter + app Application + logger common.Logger + pendingRandomnesses map[types.Position]*types.AgreementResult + configs []blockChainConfig + pendingBlocks pendingBlockRecords + confirmedBlocks types.BlocksByPosition + dMoment time.Time } func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, @@ -153,6 +154,8 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, app: app, logger: logger, dMoment: dMoment, + pendingRandomnesses: make( + map[types.Position]*types.AgreementResult), } } @@ -309,7 +312,9 @@ func (bc *blockChain) addEmptyBlock(position types.Position) ( // addBlock should be called when the block is confirmed by BA, we won't perform // sanity check against this block, it's ok to add block with skipping height. func (bc *blockChain) addBlock(b *types.Block) error { - if b.Position.Round >= DKGDelayRound && len(b.Finalization.Randomness) == 0 { + if b.Position.Round >= DKGDelayRound && + len(b.Finalization.Randomness) == 0 && + !bc.setRandomnessFromPending(b) { return ErrMissingRandomness } bc.lock.Lock() @@ -327,6 +332,7 @@ func (bc *blockChain) addBlock(b *types.Block) error { } else if b.IsGenesis() { confirmed = true } + delete(bc.pendingRandomnesses, b.Position) if !confirmed { return bc.addPendingBlockRecord(pendingBlockRecord{b.Position, b}) } @@ -606,3 +612,36 @@ func (bc *blockChain) confirmBlock(b *types.Block) { bc.confirmedBlocks = append(bc.confirmedBlocks, b) bc.purgeConfig() } + +func (bc *blockChain) setRandomnessFromPending(b *types.Block) bool { + if r, exist := bc.pendingRandomnesses[b.Position]; exist { + if !r.BlockHash.Equal(b.Hash) { + panic(fmt.Errorf("mismathed randomness: %s %s", b, r)) + } + b.Finalization.Randomness = r.Randomness + delete(bc.pendingRandomnesses, b.Position) + return true + } + return false +} + +func (bc *blockChain) processAgreementResult(result *types.AgreementResult) error { + if result.Position.Round < DKGDelayRound { + return nil + } + ok, err := bc.verifyRandomness( + result.BlockHash, result.Position.Round, result.Randomness) + if err != nil { + return err + } + if !ok { + return ErrIncorrectAgreementResult + } + bc.lock.RLock() + defer bc.lock.RUnlock() + if !result.Position.Newer(bc.lastConfirmed.Position) { + return nil + } + bc.pendingRandomnesses[result.Position] = result + return nil +} diff --git a/core/consensus.go b/core/consensus.go index 5a331d2..5106f18 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -227,6 +227,9 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } + // It's a workaround, the height for application is one-based. + block.Finalization.Height = block.Position.Height + 1 + if len(votes) == 0 && len(block.Finalization.Randomness) == 0 { recv.consensus.logger.Error("No votes to recover randomness", "block", block) @@ -258,24 +261,31 @@ func (recv *consensusBAReceiver) ConfirmBlock( block.Finalization.Randomness = rand.Signature[:] } } - // It's a workaround, the height for application is one-based. - block.Finalization.Height = block.Position.Height + 1 if recv.isNotary { - if block.Position.Round < DKGDelayRound { - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - FinalizationHeight: block.Finalization.Height, - IsEmptyBlock: isEmptyBlockConfirmed, + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + FinalizationHeight: block.Finalization.Height, + IsEmptyBlock: isEmptyBlockConfirmed, + Randomness: block.Finalization.Randomness, + } + recv.consensus.logger.Debug("Broadcast AgreementResult", + "result", result) + recv.consensus.network.BroadcastAgreementResult(result) + if block.IsEmpty() { + if err := + recv.consensus.bcModule.processAgreementResult( + result); err != nil { + recv.consensus.logger.Warn( + "Failed to process agreement result", + "result", result) } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.msgChan <- result - } else { + } + if block.Position.Round >= DKGDelayRound { recv.consensus.logger.Debug( - "Propose AgreementResult as finalized block", + "Broadcast finalized block", "block", block) recv.consensus.network.BroadcastBlock(block) } @@ -354,7 +364,7 @@ CleanChannelLoop: if block.Position.Height > changeNotaryHeight && block.Position.Round <= currentRound { panic(fmt.Errorf( - "round not switch when confirmig: %s, %d, should switch at %d, %s", + "round not switch when confirming: %s, %d, should switch at %d, %s", block, currentRound, changeNotaryHeight, newPos)) } recv.restartNotary <- newPos @@ -1207,9 +1217,6 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { // ProcessAgreementResult processes the randomness request. func (con *Consensus) ProcessAgreementResult( rand *types.AgreementResult) error { - if rand.Position.Round >= DKGDelayRound { - return nil - } if !con.baMgr.touchAgreementResult(rand) { return nil } @@ -1218,12 +1225,15 @@ func (con *Consensus) ProcessAgreementResult( con.baMgr.untouchAgreementResult(rand) return err } + if err := con.bcModule.processAgreementResult(rand); err != nil { + return err + } // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { return err } - con.logger.Debug("Broadcast AgreementResult", + con.logger.Debug("Rebroadcast AgreementResult", "result", rand) con.network.BroadcastAgreementResult(rand) diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index 98e86b1..a4a0f20 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -19,6 +19,7 @@ package syncer import ( "context" + "fmt" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -155,17 +156,9 @@ func (a *agreement) processFinalizedBlock(block *types.Block) { return } a.confirm(block) - if block.Position.Height > a.chainTip+1 { - if _, exist := a.confirmedBlocks[block.ParentHash]; !exist { - a.pullChan <- block.ParentHash - } - } } func (a *agreement) processAgreementResult(r *types.AgreementResult) { - if r.Position.Round >= core.DKGDelayRound { - return - } // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { a.logger.Trace("Agreement result already confirmed", "result", r) @@ -187,9 +180,32 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { "error", err) return } + if r.Position.Round >= core.DKGDelayRound { + verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round) + if err != nil { + a.logger.Error("error verifying agreement result randomness", + "result", r, + "error", err) + return + } + if !ok { + a.logger.Error("cannot verify agreement result randomness", "result", r) + return + } + if !verifier.VerifySignature(r.BlockHash, crypto.Signature{ + Type: "bls", + Signature: r.Randomness, + }) { + a.logger.Error("incorrect agreement result randomness", "result", r) + return + } + } if r.IsEmptyBlock { b := &types.Block{ Position: r.Position, + Finalization: types.FinalizationResult{ + Randomness: r.Randomness, + }, } // Empty blocks should be confirmed directly, they won't be sent over // the wire. @@ -198,6 +214,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { } if bs, exist := a.blocks[r.Position]; exist { if b, exist := bs[r.BlockHash]; exist { + b.Finalization.Randomness = r.Randomness a.confirm(b) return } @@ -250,6 +267,11 @@ func (a *agreement) processNewCRS(round uint64) { // confirm notifies consensus the confirmation of a block in BA. func (a *agreement) confirm(b *types.Block) { + if b.Position.Round >= core.DKGDelayRound && + len(b.Finalization.Randomness) == 0 { + panic(fmt.Errorf("confirm a block %s without randomness", b)) + } + b.Finalization.Height = b.Position.Height + 1 if _, exist := a.confirmedBlocks[b.Hash]; !exist { delete(a.blocks, b.Position) delete(a.agreementResults, b.Hash) @@ -267,4 +289,9 @@ func (a *agreement) confirm(b *types.Block) { } a.confirmedBlocks[b.Hash] = struct{}{} } + if b.Position.Height > a.chainTip+1 { + if _, exist := a.confirmedBlocks[b.ParentHash]; !exist { + a.pullChan <- b.ParentHash + } + } } diff --git a/core/test/network.go b/core/test/network.go index c903c57..b0ce3f7 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -41,6 +41,7 @@ const ( maxVoteCache = 128 // Gossiping parameter. + maxAgreementResultBroadcast = 3 gossipAgreementResultPercent = 33 ) @@ -282,9 +283,11 @@ func (n *Network) BroadcastBlock(block *types.Block) { // Avoid data race in fake transport. block = n.cloneForFake(block).(*types.Block) notarySet := n.getNotarySet(block.Position.Round) - if err := n.trans.Broadcast( - notarySet, n.config.DirectLatency, block); err != nil { - panic(err) + if !block.IsFinalized() { + if err := n.trans.Broadcast( + notarySet, n.config.DirectLatency, block); err != nil { + panic(err) + } } if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet), n.config.GossipLatency, block); err != nil { @@ -308,10 +311,10 @@ func (n *Network) BroadcastAgreementResult( n.addBlockFinalizationToCache( result.BlockHash, result.FinalizationHeight, - nil, + result.Randomness, ) notarySet := n.getNotarySet(result.Position.Round) - count := len(notarySet)*gossipAgreementResultPercent/100 + 1 + count := maxAgreementResultBroadcast for nID := range notarySet { if count--; count < 0 { break diff --git a/core/types/block-randomness.go b/core/types/block-randomness.go index a3ba631..b87e8a1 100644 --- a/core/types/block-randomness.go +++ b/core/types/block-randomness.go @@ -18,6 +18,7 @@ package types import ( + "encoding/hex" "fmt" "github.com/dexon-foundation/dexon-consensus/common" @@ -30,9 +31,15 @@ type AgreementResult struct { Votes []Vote `json:"votes"` IsEmptyBlock bool `json:"is_empty_block"` FinalizationHeight uint64 `json:"finalization_height"` + Randomness []byte `json:"randomness"` } func (r *AgreementResult) String() string { - return fmt.Sprintf("agreementResult{Block:%s Pos:%s}", - r.BlockHash.String()[:6], r.Position) + if len(r.Randomness) == 0 { + return fmt.Sprintf("agreementResult{Block:%s Pos:%s}", + r.BlockHash.String()[:6], r.Position) + } + return fmt.Sprintf("agreementResult{Block:%s Pos:%s Rand:%s}", + r.BlockHash.String()[:6], r.Position, + hex.EncodeToString(r.Randomness)[:6]) } -- cgit v1.2.3