From aa34cc1069a815ed66ec8fae0988fc4f29687bfd Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Thu, 13 Dec 2018 11:12:59 +0800 Subject: vendor: sync to latest core and fix conflict --- .../dexon-consensus/core/consensus.go | 199 +++++++++++++-------- 1 file changed, 126 insertions(+), 73 deletions(-) (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index bfe893cd5..a6d80371d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -25,8 +25,8 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/blockdb" "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" "github.com/dexon-foundation/dexon-consensus/core/utils" @@ -70,17 +70,23 @@ type consensusBAReceiver struct { chainID uint32 changeNotaryTime time.Time round uint64 - restartNotary chan bool + isNotary bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { + if !recv.isNotary { + return + } if err := recv.agreementModule.prepareVote(vote); err != nil { recv.consensus.logger.Error("Failed to prepare vote", "error", err) return } go func() { if err := recv.agreementModule.processVote(vote); err != nil { - recv.consensus.logger.Error("Failed to process vote", "error", err) + recv.consensus.logger.Error("Failed to process self vote", + "error", err, + "vote", vote) return } recv.consensus.logger.Debug("Calling Network.BroadcastVote", @@ -90,6 +96,9 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { } func (recv *consensusBAReceiver) ProposeBlock() common.Hash { + if !recv.isNotary { + return common.Hash{} + } block := recv.consensus.proposeBlock(recv.chainID, recv.round) if block == nil { recv.consensus.logger.Error("unable to propose block") @@ -123,7 +132,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block, exist = recv.agreementModule.findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", - "hash", hash, + "hash", hash.String()[:6], "chainID", recv.chainID) ch := make(chan *types.Block) func() { @@ -135,7 +144,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( go func() { block = <-ch recv.consensus.logger.Info("Receive unknown block", - "hash", hash, + "hash", hash.String()[:6], + "position", &block.Position, "chainID", recv.chainID) recv.agreementModule.addCandidateBlock(block) recv.agreementModule.lock.Lock() @@ -152,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( parentHash := hash for { recv.consensus.logger.Warn("Parent block not confirmed", - "hash", parentHash, - "chainID", recv.chainID) + "parent-hash", parentHash.String()[:6], + "cur-position", &block.Position) ch := make(chan *types.Block) if !func() bool { recv.consensus.lock.Lock() @@ -179,11 +189,14 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.logger.Info("Receive parent block", - "hash", block.ParentHash, + "parent-hash", block.ParentHash.String()[:6], + "cur-position", &block.Position, "chainID", recv.chainID) recv.consensus.ccModule.registerBlock(block) if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) + recv.consensus.logger.Error("Failed to process block", + "block", block, + "error", err) return } parentHash = block.ParentHash @@ -194,24 +207,28 @@ func (recv *consensusBAReceiver) ConfirmBlock( } }(block.ParentHash) } - voteList := make([]types.Vote, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue + if recv.isNotary { + voteList := make([]types.Vote, 0, len(votes)) + for _, vote := range votes { + if vote.BlockHash != hash { + continue + } + voteList = append(voteList, *vote) } - voteList = append(voteList, *vote) - } - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - IsEmptyBlock: isEmptyBlockConfirmed, + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + IsEmptyBlock: isEmptyBlockConfirmed, + } + recv.consensus.logger.Debug("Propose AgreementResult", + "result", result) + recv.consensus.network.BroadcastAgreementResult(result) } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.network.BroadcastAgreementResult(result) if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) + recv.consensus.logger.Error("Failed to process block", + "block", block, + "error", err) return } // Clean the restartNotary channel so BA will not stuck by deadlock. @@ -223,15 +240,18 @@ CleanChannelLoop: break CleanChannelLoop } } + newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { recv.round++ - recv.restartNotary <- true - } else { - recv.restartNotary <- false + newPos.Round++ } + recv.restartNotary <- newPos } func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { + if !recv.isNotary { + return + } recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes) recv.consensus.network.PullBlocks(hashes) } @@ -312,7 +332,7 @@ func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint( // ProposeDKGFinalize propose a DKGFinalize message. func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) { if err := recv.authModule.SignDKGFinalize(final); err != nil { - recv.logger.Error("Faield to sign DKG finalize", "error", err) + recv.logger.Error("Failed to sign DKG finalize", "error", err) return } recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final) @@ -340,21 +360,23 @@ type Consensus struct { toSyncer *totalOrderingSyncer // Interfaces. - db blockdb.BlockDatabase - app Application - gov Governance - network Network + db db.Database + app Application + debugApp Debug + gov Governance + network Network // Misc. - dMoment time.Time - nodeSetCache *utils.NodeSetCache - round uint64 - roundToNotify uint64 - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc - event *common.Event - logger common.Logger + dMoment time.Time + nodeSetCache *utils.NodeSetCache + round uint64 + roundToNotify uint64 + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc + event *common.Event + logger common.Logger + nonFinalizedBlockDelivered bool } // NewConsensus construct an Consensus instance. @@ -362,7 +384,7 @@ func NewConsensus( dMoment time.Time, app Application, gov Governance, - db blockdb.BlockDatabase, + db db.Database, network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { @@ -372,7 +394,10 @@ func NewConsensus( // Setup auth module. authModule := NewAuthenticator(prv) // Check if the application implement Debug interface. - debugApp, _ := app.(Debug) + var debugApp Debug + if a, ok := app.(Debug); ok { + debugApp = a + } // Get configuration for genesis round. var round uint64 logger.Debug("Calling Governance.Configuration", "round", round) @@ -407,6 +432,7 @@ func NewConsensus( ccModule: newCompactionChain(gov), lattice: lattice, app: newNonBlocking(app, debugApp), + debugApp: debugApp, gov: gov, db: db, network: network, @@ -438,7 +464,7 @@ func NewConsensusFromSyncer( initRoundBeginTime time.Time, app Application, gov Governance, - db blockdb.BlockDatabase, + db db.Database, networkModule Network, prv crypto.PrivateKey, latticeModule *Lattice, @@ -492,7 +518,6 @@ func NewConsensusFromSyncer( } // Dump all BA-confirmed blocks to the consensus instance. for _, b := range blocks { - con.app.BlockConfirmed(*b) con.ccModule.registerBlock(b) if err := con.processBlock(b); err != nil { return nil, err @@ -500,7 +525,7 @@ func NewConsensusFromSyncer( } // Dump all randomness result to the consensus instance. for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r); err != nil { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { con.logger.Error("failed to process randomness result when syncing", "result", r) continue @@ -605,21 +630,22 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) { } func (con *Consensus) runCRS(round uint64) { - con.logger.Debug("Calling Governance.CRS to check if already proposed", - "round", round+1) - if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Info("CRS already proposed", "round", round+1) - return - } - con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", - "round", round) - for !con.gov.IsDKGFinal(round) { + for { + con.logger.Debug("Calling Governance.CRS to check if already proposed", + "round", round+1) + if (con.gov.CRS(round+1) != common.Hash{}) { + con.logger.Info("CRS already proposed", "round", round+1) + return + } + con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", + "round", round) + if con.cfgModule.isDKGReady(round) { + break + } con.logger.Debug("DKG is not ready for running CRS. Retry later...", "round", round) time.Sleep(500 * time.Millisecond) } - // Wait some time for DKG to recover private share. - time.Sleep(100 * time.Millisecond) // Start running next round CRS. con.logger.Debug("Calling Governance.CRS", "round", round) psig, err := con.cfgModule.preparePartialSignature(round, con.gov.CRS(round)) @@ -794,26 +820,30 @@ MessageLoop: // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", + "block", val, "error", err) } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", + "block", val, "error", err) } } case *types.Vote: if err := con.ProcessVote(val); err != nil { con.logger.Error("Failed to process vote", + "vote", val, "error", err) } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { con.logger.Error("Failed to process agreement result", + "result", val, "error", err) } case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val); err != nil { + if err := con.ProcessBlockRandomnessResult(val, true); err != nil { con.logger.Error("Failed to process block randomness result", "hash", val.BlockHash.String()[:6], "position", &val.Position, @@ -884,9 +914,7 @@ func (con *Consensus) ProcessAgreementResult( if rand.Position.Round == 0 { return nil } - if !con.ccModule.blockRegistered(rand.BlockHash) { - return nil - } + // TODO(mission): find a way to avoid spamming by older agreement results. // Sanity check done. if !con.cfgModule.touchTSigHash(rand.BlockHash) { return nil @@ -914,13 +942,16 @@ func (con *Consensus) ProcessAgreementResult( con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", "proposer", psig.ProposerID, "round", psig.Round, - "hash", psig.Hash) + "hash", psig.Hash.String()[:6]) con.network.BroadcastDKGPartialSignature(psig) go func() { tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) if err != nil { if err != ErrTSigAlreadyRunning { - con.logger.Error("Faield to run TSIG", "error", err) + con.logger.Error("Failed to run TSIG", + "position", &rand.Position, + "hash", rand.BlockHash.String()[:6], + "error", err) } return } @@ -929,7 +960,7 @@ func (con *Consensus) ProcessAgreementResult( Position: rand.Position, Randomness: tsig.Signature, } - if err := con.ProcessBlockRandomnessResult(result); err != nil { + if err := con.ProcessBlockRandomnessResult(result, true); err != nil { con.logger.Error("Failed to process randomness result", "error", err) return @@ -940,27 +971,33 @@ func (con *Consensus) ProcessAgreementResult( // ProcessBlockRandomnessResult processes the randomness result. func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult) error { + rand *types.BlockRandomnessResult, needBroadcast bool) error { if rand.Position.Round == 0 { return nil } if err := con.ccModule.processBlockRandomnessResult(rand); err != nil { if err == ErrBlockNotRegistered { err = nil + } else { + return err } - return err } - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash, - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) - con.network.BroadcastRandomnessResult(rand) + if needBroadcast { + con.logger.Debug("Calling Network.BroadcastRandomnessResult", + "hash", rand.BlockHash.String()[:6], + "position", &rand.Position, + "randomness", hex.EncodeToString(rand.Randomness)) + con.network.BroadcastRandomnessResult(rand) + } return nil } // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { err = con.baMgr.processBlock(b) + if err == nil && con.debugApp != nil { + con.debugApp.BlockReceived(b.Hash) + } return } @@ -999,7 +1036,7 @@ func (con *Consensus) deliverBlock(b *types.Block) { // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { - if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists { + if err = con.db.PutBlock(*block); err != nil && err != db.ErrBlockExists { return } con.lock.Lock() @@ -1012,6 +1049,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } // Pass delivered blocks to compaction chain. for _, b := range deliveredBlocks { + if b.IsFinalized() { + if con.nonFinalizedBlockDelivered { + panic(fmt.Errorf("attempting to skip finalized block: %s", b)) + } + con.logger.Info("skip delivery of finalized block", + "block", b, + "finalization-height", b.Finalization.Height) + continue + } else { + // Mark that some non-finalized block delivered. After this flag + // turned on, it's not allowed to deliver finalized blocks anymore. + con.nonFinalizedBlockDelivered = true + } if err = con.ccModule.processBlock(b); err != nil { return } @@ -1022,11 +1072,14 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { "delivered", con.ccModule.lastDeliveredBlock(), "pending", con.ccModule.lastPendingBlock()) for _, b := range deliveredBlocks { - if err = con.db.Update(*b); err != nil { + if err = con.db.UpdateBlock(*b); err != nil { panic(err) } con.cfgModule.untouchTSigHash(b.Hash) con.deliverBlock(b) + if con.debugApp != nil { + con.debugApp.BlockReady(b.Hash) + } } if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil { return -- cgit v1.2.3