aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-01-07 16:14:38 +0800
committerGitHub <noreply@github.com>2019-01-07 16:14:38 +0800
commit8d1069b61d847662d37f504937a346c56d6cb0eb (patch)
treed835081307a3c82a679a7d22ca0c9c804cbc06cd /core/syncer/consensus.go
parentd3b9af1343376862a473715e8211005cfbb5bdd7 (diff)
downloaddexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.tar
dexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.tar.gz
dexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.tar.bz2
dexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.tar.lz
dexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.tar.xz
dexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.tar.zst
dexon-consensus-8d1069b61d847662d37f504937a346c56d6cb0eb.zip
sync: add log for syncer to debug hanging issue (#407)
Besides adding logs, also include these minor fixes: * Accessing con.validatedChains under locking * Access con.latticeLastRound under locking * Fix incorrect waitGroup usage * Remove unused parameter in startAgreement
Diffstat (limited to 'core/syncer/consensus.go')
-rw-r--r--core/syncer/consensus.go153
1 files changed, 96 insertions, 57 deletions
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 92f8fd8..a9554cb 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -118,17 +118,15 @@ func NewConsensus(
}
func (con *Consensus) initConsensusObj(initBlock *types.Block) {
- var cfg *types.Config
func() {
con.lock.Lock()
defer con.lock.Unlock()
con.latticeLastRound = initBlock.Position.Round
- cfg = con.configs[con.latticeLastRound]
debugApp, _ := con.app.(core.Debug)
con.lattice = core.NewLattice(
con.roundBeginTimes[con.latticeLastRound],
con.latticeLastRound,
- cfg,
+ con.configs[con.latticeLastRound],
utils.NewSigner(con.prv),
con.app,
debugApp,
@@ -136,37 +134,49 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) {
con.logger,
)
}()
- con.startAgreement(cfg.NumChains)
+ con.startAgreement()
con.startNetwork()
con.startCRSMonitor()
}
-func (con *Consensus) checkIfValidated() bool {
+func (con *Consensus) checkIfValidated() (validated bool) {
con.lock.RLock()
defer con.lock.RUnlock()
- var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
- var validatedChainCount uint32
+ var (
+ round = con.blocks[0][0].Position.Round
+ numChains = con.configs[round].NumChains
+ validatedChainCount uint32
+ )
// Make sure we validate some block in all chains.
for chainID := range con.validatedChains {
if chainID < numChains {
validatedChainCount++
}
}
- if validatedChainCount == numChains {
- return true
- }
- con.logger.Debug("not validated yet", "validated-chain", validatedChainCount)
- return false
+ validated = validatedChainCount == numChains
+ con.logger.Debug("syncer chain-validation status",
+ "validated-chain", validatedChainCount,
+ "round", round,
+ "valid", validated)
+ return
}
-func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
+func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
con.lock.RLock()
defer con.lock.RUnlock()
var (
- numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ round = con.blocks[0][0].Position.Round
+ numChains = con.configs[round].NumChains
compactionTips = make([]*types.Block, numChains)
overlapCount = uint32(0)
)
+ defer func() {
+ con.logger.Debug("syncer synced status",
+ "overlap-count", overlapCount,
+ "num-chain", numChains,
+ "last-block", blocks[len(blocks)-1],
+ "synced", synced)
+ }()
// Find tips (newset blocks) of each chain in compaction chain.
b := blocks[len(blocks)-1]
for tipCount := uint32(0); tipCount < numChains; {
@@ -178,7 +188,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
}
}
if (b.Finalization.ParentHash == common.Hash{}) {
- return false
+ return
}
b1, err := con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
@@ -196,14 +206,8 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
}
}
}
- if overlapCount == numChains {
- return true
- }
- con.logger.Debug("not synced yet",
- "overlap-count", overlapCount,
- "num-chain", numChains,
- "last-block", blocks[len(blocks)-1])
- return false
+ synced = overlapCount == numChains
+ return
}
// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
@@ -212,6 +216,10 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
func (con *Consensus) ensureAgreementOverlapRound() bool {
con.lock.Lock()
defer con.lock.Unlock()
+ defer func() {
+ con.logger.Debug("ensureAgreementOverlapRound returned",
+ "round", con.agreementRoundCut)
+ }()
if con.agreementRoundCut > 0 {
return true
}
@@ -267,7 +275,6 @@ func (con *Consensus) ensureAgreementOverlapRound() bool {
"configs", len(con.configs))
if tipRoundMap[r] == con.configs[r].NumChains {
con.agreementRoundCut = r
- con.logger.Info("agreement round cut found, round", r)
return true
}
}
@@ -374,11 +381,13 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
if con.lattice == nil {
return nil
}
- con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
delivered, err := con.lattice.ProcessFinalizedBlock(block)
if err != nil {
return err
}
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
for idx, b := range delivered {
if con.finalizedBlockHashes[idx] != b.Hash {
return ErrMismatchBlockHashSequence
@@ -393,18 +402,27 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
// NOTICE: parameter "blocks" should be consecutive in compaction height.
+// NOTICE: this method is not expected to be called concurrently.
func (con *Consensus) SyncBlocks(
- blocks []*types.Block, latest bool) (bool, error) {
+ blocks []*types.Block, latest bool) (synced bool, err error) {
+ defer func() {
+ con.logger.Debug("SyncBlocks returned",
+ "synced", synced,
+ "error", err,
+ "last-block", con.syncedLastBlock)
+ }()
if con.syncedLastBlock != nil {
- return true, ErrAlreadySynced
+ synced, err = true, ErrAlreadySynced
+ return
}
if len(blocks) == 0 {
- return false, nil
+ return
}
// Check if blocks are consecutive.
for i := 1; i < len(blocks); i++ {
if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
- return false, ErrInvalidBlockOrder
+ err = ErrInvalidBlockOrder
+ return
}
}
// Make sure the first block is the next block of current compaction chain
@@ -414,7 +432,8 @@ func (con *Consensus) SyncBlocks(
con.logger.Error("mismatched finalization height",
"now", blocks[0].Finalization.Height,
"expected", tipHeight+1)
- return false, ErrInvalidSyncingFinalizationHeight
+ err = ErrInvalidSyncingFinalizationHeight
+ return
}
con.logger.Trace("syncBlocks",
"position", &blocks[0].Position,
@@ -425,30 +444,31 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
for _, b := range blocks {
// TODO(haoping) remove this if lattice puts blocks into db.
- if err := con.db.PutBlock(*b); err != nil {
+ if err = con.db.PutBlock(*b); err != nil {
// A block might be put into db when confirmed by BA, but not
// finalized yet.
if err == db.ErrBlockExists {
err = con.db.UpdateBlock(*b)
}
if err != nil {
- return false, err
+ return
}
}
- if err := con.db.PutCompactionChainTipInfo(
+ if err = con.db.PutCompactionChainTipInfo(
b.Hash, b.Finalization.Height); err != nil {
- return false, err
+ return
}
- if err := con.processFinalizedBlock(b); err != nil {
- return false, err
+ if err = con.processFinalizedBlock(b); err != nil {
+ return
}
}
if latest && con.lattice == nil {
// New Lattice and find the deliver set of total ordering when "latest" is
// true for first time. Deliver set is found by block hashes.
- syncBlock, err := con.findLatticeSyncBlock(blocks)
+ var syncBlock *types.Block
+ syncBlock, err = con.findLatticeSyncBlock(blocks)
if err != nil {
- return false, err
+ return
}
if syncBlock != nil {
con.logger.Debug("deliver set found", "block", syncBlock)
@@ -465,15 +485,16 @@ func (con *Consensus) SyncBlocks(
if b.Hash == syncBlock.Hash {
break
}
- b1, err := con.db.GetBlock(b.Finalization.ParentHash)
+ var b1 types.Block
+ b1, err = con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
- return false, err
+ return
}
b = &b1
}
for _, b := range blocksToProcess {
- if err := con.processFinalizedBlock(b); err != nil {
- return false, err
+ if err = con.processFinalizedBlock(b); err != nil {
+ return
}
}
}
@@ -483,15 +504,14 @@ func (con *Consensus) SyncBlocks(
// overlapping of compaction chain and BA's oldest blocks means the
// syncing is done.
if con.checkIfValidated() && con.checkIfSynced(blocks) {
- if err := con.Stop(); err != nil {
- return false, err
+ if err = con.Stop(); err != nil {
+ return
}
con.syncedLastBlock = blocks[len(blocks)-1]
- con.logger.Info("syncer.Consensus synced",
- "last-block", con.syncedLastBlock)
+ synced = con.syncedLastBlock != nil
}
}
- return con.syncedLastBlock != nil, nil
+ return
}
// GetSyncedConsensus returns the core.Consensus instance after synced.
@@ -537,11 +557,15 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
// This method is mainly for caller to stop the syncer before synced, the syncer
// would call this method automatically after synced.
func (con *Consensus) Stop() error {
+ con.logger.Trace("syncer is about to stop")
// Stop network and CRS routines, wait until they are all stoped.
con.ctxCancel()
+ con.logger.Trace("stop syncer modules")
con.moduleWaitGroup.Wait()
// Stop agreements.
+ con.logger.Trace("stop syncer agreement modules")
con.stopAgreement()
+ con.logger.Trace("syncer stopped")
return nil
}
@@ -566,6 +590,10 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) {
func() {
con.lock.Lock()
defer con.lock.Unlock()
+ con.logger.Debug("syncer setupConfigs",
+ "until-round", round,
+ "length", len(con.configs),
+ "lattice", con.latticeLastRound)
for r := uint64(len(con.configs)); r <= round; r++ {
cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
con.configs = append(con.configs, cfg)
@@ -589,6 +617,7 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) {
}
}()
con.resizeByNumChains(curMaxNumChains)
+ con.logger.Trace("setupConfgis finished", "round", round)
}
// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
@@ -601,9 +630,6 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) {
maxRound = b.Position.Round
}
}
- con.logger.Debug("syncer setupConfigs",
- "max", maxRound,
- "lattice", con.latticeLastRound)
// Get configs from governance.
//
// In fullnode, the notification of new round is yet another TX, which
@@ -635,7 +661,7 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
}
// startAgreement starts agreements for receiving votes and agreements.
-func (con *Consensus) startAgreement(numChains uint32) {
+func (con *Consensus) startAgreement() {
// Start a routine for listening receive channel and pull block channel.
go func() {
for {
@@ -690,8 +716,8 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
+ con.moduleWaitGroup.Add(1)
go func() {
- con.moduleWaitGroup.Add(1)
defer con.moduleWaitGroup.Done()
Loop:
for {
@@ -732,19 +758,32 @@ func (con *Consensus) startCRSMonitor() {
// Notify all agreements for new CRS.
notifyNewCRS := func(round uint64) {
con.setupConfigsUntilRound(round)
- con.lock.Lock()
- defer con.lock.Unlock()
if round == lastNotifiedRound {
return
}
con.logger.Debug("CRS is ready", "round", round)
lastNotifiedRound = round
- for _, a := range con.agreements {
- a.inputChan <- round
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for idx, a := range con.agreements {
+ loop:
+ for {
+ select {
+ case <-con.ctx.Done():
+ break loop
+ case a.inputChan <- round:
+ break loop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug(
+ "agreement input channel is full when putting CRS",
+ "chainID", idx,
+ "round", round)
+ }
+ }
}
}
+ con.moduleWaitGroup.Add(1)
go func() {
- con.moduleWaitGroup.Add(1)
defer con.moduleWaitGroup.Done()
for {
select {