From 8d1069b61d847662d37f504937a346c56d6cb0eb Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Mon, 7 Jan 2019 16:14:38 +0800 Subject: sync: add log for syncer to debug hanging issue (#407) Besides adding logs, also include these minor fixes: * Accessing con.validatedChains under locking * Access con.latticeLastRound under locking * Fix incorrect waitGroup usage * Remove unused parameter in startAgreement --- core/syncer/agreement.go | 66 ++++++++++++++------ core/syncer/consensus.go | 153 +++++++++++++++++++++++++++++------------------ 2 files changed, 143 insertions(+), 76 deletions(-) (limited to 'core') diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index 9b351ea..eaad860 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -18,6 +18,9 @@ package syncer import ( + "context" + "time" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core" "github.com/dexon-foundation/dexon-consensus/core/types" @@ -37,16 +40,14 @@ type agreement struct { pendings map[uint64]map[common.Hash]*types.AgreementResult logger common.Logger confirmedBlocks map[common.Hash]struct{} + ctx context.Context + ctxCancel context.CancelFunc } // newAgreement creates a new agreement instance. -func newAgreement( - ch chan<- *types.Block, - pullChan chan<- common.Hash, - cache *utils.NodeSetCache, - logger common.Logger) *agreement { - - return &agreement{ +func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash, + cache *utils.NodeSetCache, logger common.Logger) *agreement { + a := &agreement{ cache: cache, inputChan: make(chan interface{}, 1000), outputChan: ch, @@ -58,11 +59,14 @@ func newAgreement( map[uint64]map[common.Hash]*types.AgreementResult), confirmedBlocks: make(map[common.Hash]struct{}), } + a.ctx, a.ctxCancel = context.WithCancel(context.Background()) + return a } // run starts the agreement, this does not start a new routine, go a new // routine explicitly in the caller. func (a *agreement) run() { + defer a.ctxCancel() for { select { case val, ok := <-a.inputChan: @@ -119,22 +123,35 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { return } if r.IsEmptyBlock { - // Empty block is also confirmed. b := &types.Block{ Position: r.Position, } + // Empty blocks should be confirmed directly, they won't be sent over + // the wire. a.confirm(b) - } else { - needPull := true - if bs, exist := a.blocks[r.Position]; exist { - if b, exist := bs[r.BlockHash]; exist { - a.confirm(b) - needPull = false - } + return + } + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + a.confirm(b) + return } - if needPull { - a.agreementResults[r.BlockHash] = struct{}{} - a.pullChan <- r.BlockHash + } + a.agreementResults[r.BlockHash] = struct{}{} +loop: + for { + select { + case a.pullChan <- r.BlockHash: + break loop + case <-a.ctx.Done(): + a.logger.Error("pull request is not sent", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("pull request is unable to send", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) } } } @@ -168,7 +185,18 @@ func (a *agreement) confirm(b *types.Block) { if _, exist := a.confirmedBlocks[b.Hash]; !exist { delete(a.blocks, b.Position) delete(a.agreementResults, b.Hash) - a.outputChan <- b + loop: + for { + select { + case a.outputChan <- b: + break loop + case <-a.ctx.Done(): + a.logger.Error("confirmed block is not sent", "block", b) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("agreement output channel is full", "block", b) + } + } a.confirmedBlocks[b.Hash] = struct{}{} } } diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 92f8fd8..a9554cb 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -118,17 +118,15 @@ func NewConsensus( } func (con *Consensus) initConsensusObj(initBlock *types.Block) { - var cfg *types.Config func() { con.lock.Lock() defer con.lock.Unlock() con.latticeLastRound = initBlock.Position.Round - cfg = con.configs[con.latticeLastRound] debugApp, _ := con.app.(core.Debug) con.lattice = core.NewLattice( con.roundBeginTimes[con.latticeLastRound], con.latticeLastRound, - cfg, + con.configs[con.latticeLastRound], utils.NewSigner(con.prv), con.app, debugApp, @@ -136,37 +134,49 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) { con.logger, ) }() - con.startAgreement(cfg.NumChains) + con.startAgreement() con.startNetwork() con.startCRSMonitor() } -func (con *Consensus) checkIfValidated() bool { +func (con *Consensus) checkIfValidated() (validated bool) { con.lock.RLock() defer con.lock.RUnlock() - var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains - var validatedChainCount uint32 + var ( + round = con.blocks[0][0].Position.Round + numChains = con.configs[round].NumChains + validatedChainCount uint32 + ) // Make sure we validate some block in all chains. for chainID := range con.validatedChains { if chainID < numChains { validatedChainCount++ } } - if validatedChainCount == numChains { - return true - } - con.logger.Debug("not validated yet", "validated-chain", validatedChainCount) - return false + validated = validatedChainCount == numChains + con.logger.Debug("syncer chain-validation status", + "validated-chain", validatedChainCount, + "round", round, + "valid", validated) + return } -func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { +func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { con.lock.RLock() defer con.lock.RUnlock() var ( - numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + round = con.blocks[0][0].Position.Round + numChains = con.configs[round].NumChains compactionTips = make([]*types.Block, numChains) overlapCount = uint32(0) ) + defer func() { + con.logger.Debug("syncer synced status", + "overlap-count", overlapCount, + "num-chain", numChains, + "last-block", blocks[len(blocks)-1], + "synced", synced) + }() // Find tips (newset blocks) of each chain in compaction chain. b := blocks[len(blocks)-1] for tipCount := uint32(0); tipCount < numChains; { @@ -178,7 +188,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { } } if (b.Finalization.ParentHash == common.Hash{}) { - return false + return } b1, err := con.db.GetBlock(b.Finalization.ParentHash) if err != nil { @@ -196,14 +206,8 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { } } } - if overlapCount == numChains { - return true - } - con.logger.Debug("not synced yet", - "overlap-count", overlapCount, - "num-chain", numChains, - "last-block", blocks[len(blocks)-1]) - return false + synced = overlapCount == numChains + return } // ensureAgreementOverlapRound ensures the oldest blocks in each chain in @@ -212,6 +216,10 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { func (con *Consensus) ensureAgreementOverlapRound() bool { con.lock.Lock() defer con.lock.Unlock() + defer func() { + con.logger.Debug("ensureAgreementOverlapRound returned", + "round", con.agreementRoundCut) + }() if con.agreementRoundCut > 0 { return true } @@ -267,7 +275,6 @@ func (con *Consensus) ensureAgreementOverlapRound() bool { "configs", len(con.configs)) if tipRoundMap[r] == con.configs[r].NumChains { con.agreementRoundCut = r - con.logger.Info("agreement round cut found, round", r) return true } } @@ -374,11 +381,13 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error { if con.lattice == nil { return nil } - con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) delivered, err := con.lattice.ProcessFinalizedBlock(block) if err != nil { return err } + con.lock.Lock() + defer con.lock.Unlock() + con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) for idx, b := range delivered { if con.finalizedBlockHashes[idx] != b.Hash { return ErrMismatchBlockHashSequence @@ -393,18 +402,27 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error { // regards the blocks are the latest ones. Notice that latest can be true for // many times. // NOTICE: parameter "blocks" should be consecutive in compaction height. +// NOTICE: this method is not expected to be called concurrently. func (con *Consensus) SyncBlocks( - blocks []*types.Block, latest bool) (bool, error) { + blocks []*types.Block, latest bool) (synced bool, err error) { + defer func() { + con.logger.Debug("SyncBlocks returned", + "synced", synced, + "error", err, + "last-block", con.syncedLastBlock) + }() if con.syncedLastBlock != nil { - return true, ErrAlreadySynced + synced, err = true, ErrAlreadySynced + return } if len(blocks) == 0 { - return false, nil + return } // Check if blocks are consecutive. for i := 1; i < len(blocks); i++ { if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 { - return false, ErrInvalidBlockOrder + err = ErrInvalidBlockOrder + return } } // Make sure the first block is the next block of current compaction chain @@ -414,7 +432,8 @@ func (con *Consensus) SyncBlocks( con.logger.Error("mismatched finalization height", "now", blocks[0].Finalization.Height, "expected", tipHeight+1) - return false, ErrInvalidSyncingFinalizationHeight + err = ErrInvalidSyncingFinalizationHeight + return } con.logger.Trace("syncBlocks", "position", &blocks[0].Position, @@ -425,30 +444,31 @@ func (con *Consensus) SyncBlocks( con.setupConfigs(blocks) for _, b := range blocks { // TODO(haoping) remove this if lattice puts blocks into db. - if err := con.db.PutBlock(*b); err != nil { + if err = con.db.PutBlock(*b); err != nil { // A block might be put into db when confirmed by BA, but not // finalized yet. if err == db.ErrBlockExists { err = con.db.UpdateBlock(*b) } if err != nil { - return false, err + return } } - if err := con.db.PutCompactionChainTipInfo( + if err = con.db.PutCompactionChainTipInfo( b.Hash, b.Finalization.Height); err != nil { - return false, err + return } - if err := con.processFinalizedBlock(b); err != nil { - return false, err + if err = con.processFinalizedBlock(b); err != nil { + return } } if latest && con.lattice == nil { // New Lattice and find the deliver set of total ordering when "latest" is // true for first time. Deliver set is found by block hashes. - syncBlock, err := con.findLatticeSyncBlock(blocks) + var syncBlock *types.Block + syncBlock, err = con.findLatticeSyncBlock(blocks) if err != nil { - return false, err + return } if syncBlock != nil { con.logger.Debug("deliver set found", "block", syncBlock) @@ -465,15 +485,16 @@ func (con *Consensus) SyncBlocks( if b.Hash == syncBlock.Hash { break } - b1, err := con.db.GetBlock(b.Finalization.ParentHash) + var b1 types.Block + b1, err = con.db.GetBlock(b.Finalization.ParentHash) if err != nil { - return false, err + return } b = &b1 } for _, b := range blocksToProcess { - if err := con.processFinalizedBlock(b); err != nil { - return false, err + if err = con.processFinalizedBlock(b); err != nil { + return } } } @@ -483,15 +504,14 @@ func (con *Consensus) SyncBlocks( // overlapping of compaction chain and BA's oldest blocks means the // syncing is done. if con.checkIfValidated() && con.checkIfSynced(blocks) { - if err := con.Stop(); err != nil { - return false, err + if err = con.Stop(); err != nil { + return } con.syncedLastBlock = blocks[len(blocks)-1] - con.logger.Info("syncer.Consensus synced", - "last-block", con.syncedLastBlock) + synced = con.syncedLastBlock != nil } } - return con.syncedLastBlock != nil, nil + return } // GetSyncedConsensus returns the core.Consensus instance after synced. @@ -537,11 +557,15 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { // This method is mainly for caller to stop the syncer before synced, the syncer // would call this method automatically after synced. func (con *Consensus) Stop() error { + con.logger.Trace("syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. con.ctxCancel() + con.logger.Trace("stop syncer modules") con.moduleWaitGroup.Wait() // Stop agreements. + con.logger.Trace("stop syncer agreement modules") con.stopAgreement() + con.logger.Trace("syncer stopped") return nil } @@ -566,6 +590,10 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) { func() { con.lock.Lock() defer con.lock.Unlock() + con.logger.Debug("syncer setupConfigs", + "until-round", round, + "length", len(con.configs), + "lattice", con.latticeLastRound) for r := uint64(len(con.configs)); r <= round; r++ { cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) con.configs = append(con.configs, cfg) @@ -589,6 +617,7 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) { } }() con.resizeByNumChains(curMaxNumChains) + con.logger.Trace("setupConfgis finished", "round", round) } // setupConfigs is called by SyncBlocks with blocks from compaction chain. In @@ -601,9 +630,6 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) { maxRound = b.Position.Round } } - con.logger.Debug("syncer setupConfigs", - "max", maxRound, - "lattice", con.latticeLastRound) // Get configs from governance. // // In fullnode, the notification of new round is yet another TX, which @@ -635,7 +661,7 @@ func (con *Consensus) resizeByNumChains(numChains uint32) { } // startAgreement starts agreements for receiving votes and agreements. -func (con *Consensus) startAgreement(numChains uint32) { +func (con *Consensus) startAgreement() { // Start a routine for listening receive channel and pull block channel. go func() { for { @@ -690,8 +716,8 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { // startNetwork starts network for receiving blocks and agreement results. func (con *Consensus) startNetwork() { + con.moduleWaitGroup.Add(1) go func() { - con.moduleWaitGroup.Add(1) defer con.moduleWaitGroup.Done() Loop: for { @@ -732,19 +758,32 @@ func (con *Consensus) startCRSMonitor() { // Notify all agreements for new CRS. notifyNewCRS := func(round uint64) { con.setupConfigsUntilRound(round) - con.lock.Lock() - defer con.lock.Unlock() if round == lastNotifiedRound { return } con.logger.Debug("CRS is ready", "round", round) lastNotifiedRound = round - for _, a := range con.agreements { - a.inputChan <- round + con.lock.Lock() + defer con.lock.Unlock() + for idx, a := range con.agreements { + loop: + for { + select { + case <-con.ctx.Done(): + break loop + case a.inputChan <- round: + break loop + case <-time.After(500 * time.Millisecond): + con.logger.Debug( + "agreement input channel is full when putting CRS", + "chainID", idx, + "round", round) + } + } } } + con.moduleWaitGroup.Add(1) go func() { - con.moduleWaitGroup.Add(1) defer con.moduleWaitGroup.Done() for { select { -- cgit v1.2.3