diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-12-18 15:38:51 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-18 15:38:51 +0800 |
commit | 9f240d93507cdf03935ba7e4e3a7b226f150736d (patch) | |
tree | 3234b308e164aec346e6127be18060fc641867fd | |
parent | ff845cd0bddcaacb8d88da9296af22a45fef1dff (diff) | |
download | dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.gz dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.bz2 dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.lz dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.xz dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.zst dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.zip |
syncer: fix stuffs (#373)
* Add a new method: GetSyncedConsensus
This method avoids calling BlockDelivered
in SyncBlocks methods, which avoid potential
deadlock.
* Add a method to stop the syncer before synced
* Enable nonBlockingApp for synced Consensus
instance.
-rw-r--r-- | core/consensus.go | 7 | ||||
-rw-r--r-- | core/syncer/consensus.go | 117 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 9 |
3 files changed, 84 insertions, 49 deletions
diff --git a/core/consensus.go b/core/consensus.go index 5beaf54..35f1db8 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -494,12 +494,17 @@ func NewConsensusFromSyncer( db, logger) recv.cfgModule = cfgModule + // Check if the application implement Debug interface. + var debugApp Debug + if a, ok := app.(Debug); ok { + debugApp = a + } // Setup Consensus instance. con := &Consensus{ ID: ID, ccModule: newCompactionChain(gov), lattice: latticeModule, - app: app, + app: newNonBlocking(app, debugApp), gov: gov, db: db, network: networkModule, diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 901485e..eababa0 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -35,6 +35,8 @@ import ( var ( // ErrAlreadySynced is reported when syncer is synced. ErrAlreadySynced = fmt.Errorf("already synced") + // ErrNotSynced is reported when syncer is not synced yet. + ErrNotSynced = fmt.Errorf("not synced yet") // ErrGenesisBlockReached is reported when genesis block reached. ErrGenesisBlockReached = fmt.Errorf("genesis block reached") // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. @@ -78,7 +80,8 @@ type Consensus struct { receiveChan chan *types.Block ctx context.Context ctxCancel context.CancelFunc - isSynced bool + syncedLastBlock *types.Block + syncedConsensus *core.Consensus } // NewConsensus creates an instance for Consensus (syncer consensus). @@ -100,7 +103,6 @@ func NewConsensus( nodeSetCache: utils.NewNodeSetCache(gov), prv: prv, logger: logger, - isSynced: false, validatedChains: make(map[uint32]struct{}), configs: []*types.Config{gov.Configuration(0)}, roundBeginTimes: []time.Time{dMoment}, @@ -385,17 +387,17 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error { // many times. // NOTICE: parameter "blocks" should be consecutive in compaction height. func (con *Consensus) SyncBlocks( - blocks []*types.Block, latest bool) (*core.Consensus, error) { - if con.isSynced { - return nil, ErrAlreadySynced + blocks []*types.Block, latest bool) (bool, error) { + if con.syncedLastBlock != nil { + return true, ErrAlreadySynced } if len(blocks) == 0 { - return nil, nil + return false, nil } // Check if blocks are consecutive. for i := 1; i < len(blocks); i++ { if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 { - return nil, ErrInvalidBlockOrder + return false, ErrInvalidBlockOrder } } // Make sure the first block is the next block of current compaction chain @@ -405,7 +407,7 @@ func (con *Consensus) SyncBlocks( con.logger.Error("mismatched finalization height", "now", blocks[0].Finalization.Height, "expected", tipHeight+1) - return nil, ErrInvalidSyncingFinalizationHeight + return false, ErrInvalidSyncingFinalizationHeight } con.logger.Info("syncBlocks", "position", &blocks[0].Position, @@ -423,15 +425,15 @@ func (con *Consensus) SyncBlocks( err = con.db.UpdateBlock(*b) } if err != nil { - return nil, err + return false, err } } if err := con.db.PutCompactionChainTipInfo( b.Hash, b.Finalization.Height); err != nil { - return nil, err + return false, err } if err := con.processFinalizedBlock(b); err != nil { - return nil, err + return false, err } } if latest && con.lattice == nil { @@ -439,7 +441,7 @@ func (con *Consensus) SyncBlocks( // true for first time. Deliver set is found by block hashes. syncBlock, err := con.findLatticeSyncBlock(blocks) if err != nil { - return nil, err + return false, err } if syncBlock != nil { con.logger.Info("deliver set found", syncBlock) @@ -457,13 +459,13 @@ func (con *Consensus) SyncBlocks( } b1, err := con.db.GetBlock(b.Finalization.ParentHash) if err != nil { - return nil, err + return false, err } b = &b1 } for _, b := range blocksToProcess { if err := con.processFinalizedBlock(b); err != nil { - return nil, err + return false, err } } } @@ -472,40 +474,63 @@ func (con *Consensus) SyncBlocks( // Check if compaction and agreements' blocks are overlapped. The // overlapping of compaction chain and BA's oldest blocks means the // syncing is done. - con.isSynced = con.checkIfValidated() && con.checkIfSynced(blocks) - } - if con.isSynced { - // Stop network and CRS routines, wait until they are all stoped. - con.ctxCancel() - con.moduleWaitGroup.Wait() - // Stop agreements. - con.stopAgreement() - // flush all blocks in con.blocks into core.Consensus, and build - // core.Consensus from syncer. - lastBlock := blocks[len(blocks)-1] - con.logger.Info("syncer.Consensus synced", "last-block", lastBlock) - confirmedBlocks := []*types.Block{} - func() { - con.lock.Lock() - defer con.lock.Unlock() - for _, bs := range con.blocks { - confirmedBlocks = append(confirmedBlocks, bs...) + if con.checkIfValidated() && con.checkIfSynced(blocks) { + if err := con.Stop(); err != nil { + return false, err } - }() - return core.NewConsensusFromSyncer( - lastBlock, - con.roundBeginTimes[lastBlock.Position.Round], - con.app, - con.gov, - con.db, - con.network, - con.prv, - con.lattice, - confirmedBlocks, - con.randomnessResults, - con.logger) + con.syncedLastBlock = blocks[len(blocks)-1] + con.logger.Info("syncer.Consensus synced", + "last-block", con.syncedLastBlock) + } } - return nil, nil + return con.syncedLastBlock != nil, nil +} + +// GetSyncedConsensus returns the core.Consensus instance after synced. +func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + if con.syncedConsensus != nil { + return con.syncedConsensus, nil + } + if con.syncedLastBlock == nil { + return nil, ErrNotSynced + } + // flush all blocks in con.blocks into core.Consensus, and build + // core.Consensus from syncer. + confirmedBlocks := []*types.Block{} + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, bs := range con.blocks { + confirmedBlocks = append(confirmedBlocks, bs...) + } + }() + var err error + con.syncedConsensus, err = core.NewConsensusFromSyncer( + con.syncedLastBlock, + con.roundBeginTimes[con.syncedLastBlock.Position.Round], + con.app, + con.gov, + con.db, + con.network, + con.prv, + con.lattice, + confirmedBlocks, + con.randomnessResults, + con.logger) + return con.syncedConsensus, err +} + +// Stop the syncer. +// +// This method is mainly for caller to stop the syncer before synced, the syncer +// would call this method automatically after synced. +func (con *Consensus) Stop() error { + // Stop network and CRS routines, wait until they are all stoped. + con.ctxCancel() + con.moduleWaitGroup.Wait() + // Stop agreements. + con.stopAgreement() + return nil } // isEmptyBlock checks if a block is an empty block by both its hash and parent diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 08a1d10..2680cd2 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -145,8 +145,13 @@ func (s *ConsensusTestSuite) syncBlocksWithSomeNode( syncNode.gov.CatchUpWithRound( b.Position.Round + core.ConfigRoundShift) } - syncedCon, err = syncerObj.SyncBlocks(compactionChainBlocks, true) - if syncedCon != nil || err != nil { + var synced bool + synced, err = syncerObj.SyncBlocks(compactionChainBlocks, true) + if err != nil { + done = true + } + if synced { + syncedCon, err = syncerObj.GetSyncedConsensus() done = true } compactionChainBlocks = nil |