aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-12-18 15:38:51 +0800
committerGitHub <noreply@github.com>2018-12-18 15:38:51 +0800
commit9f240d93507cdf03935ba7e4e3a7b226f150736d (patch)
tree3234b308e164aec346e6127be18060fc641867fd
parentff845cd0bddcaacb8d88da9296af22a45fef1dff (diff)
downloaddexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar
dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.gz
dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.bz2
dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.lz
dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.xz
dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.tar.zst
dexon-consensus-9f240d93507cdf03935ba7e4e3a7b226f150736d.zip
syncer: fix stuffs (#373)
* Add a new method: GetSyncedConsensus This method avoids calling BlockDelivered in SyncBlocks methods, which avoid potential deadlock. * Add a method to stop the syncer before synced * Enable nonBlockingApp for synced Consensus instance.
-rw-r--r--core/consensus.go7
-rw-r--r--core/syncer/consensus.go117
-rw-r--r--integration_test/consensus_test.go9
3 files changed, 84 insertions, 49 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 5beaf54..35f1db8 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -494,12 +494,17 @@ func NewConsensusFromSyncer(
db,
logger)
recv.cfgModule = cfgModule
+ // Check if the application implement Debug interface.
+ var debugApp Debug
+ if a, ok := app.(Debug); ok {
+ debugApp = a
+ }
// Setup Consensus instance.
con := &Consensus{
ID: ID,
ccModule: newCompactionChain(gov),
lattice: latticeModule,
- app: app,
+ app: newNonBlocking(app, debugApp),
gov: gov,
db: db,
network: networkModule,
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 901485e..eababa0 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -35,6 +35,8 @@ import (
var (
// ErrAlreadySynced is reported when syncer is synced.
ErrAlreadySynced = fmt.Errorf("already synced")
+ // ErrNotSynced is reported when syncer is not synced yet.
+ ErrNotSynced = fmt.Errorf("not synced yet")
// ErrGenesisBlockReached is reported when genesis block reached.
ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
// ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
@@ -78,7 +80,8 @@ type Consensus struct {
receiveChan chan *types.Block
ctx context.Context
ctxCancel context.CancelFunc
- isSynced bool
+ syncedLastBlock *types.Block
+ syncedConsensus *core.Consensus
}
// NewConsensus creates an instance for Consensus (syncer consensus).
@@ -100,7 +103,6 @@ func NewConsensus(
nodeSetCache: utils.NewNodeSetCache(gov),
prv: prv,
logger: logger,
- isSynced: false,
validatedChains: make(map[uint32]struct{}),
configs: []*types.Config{gov.Configuration(0)},
roundBeginTimes: []time.Time{dMoment},
@@ -385,17 +387,17 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
// many times.
// NOTICE: parameter "blocks" should be consecutive in compaction height.
func (con *Consensus) SyncBlocks(
- blocks []*types.Block, latest bool) (*core.Consensus, error) {
- if con.isSynced {
- return nil, ErrAlreadySynced
+ blocks []*types.Block, latest bool) (bool, error) {
+ if con.syncedLastBlock != nil {
+ return true, ErrAlreadySynced
}
if len(blocks) == 0 {
- return nil, nil
+ return false, nil
}
// Check if blocks are consecutive.
for i := 1; i < len(blocks); i++ {
if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
- return nil, ErrInvalidBlockOrder
+ return false, ErrInvalidBlockOrder
}
}
// Make sure the first block is the next block of current compaction chain
@@ -405,7 +407,7 @@ func (con *Consensus) SyncBlocks(
con.logger.Error("mismatched finalization height",
"now", blocks[0].Finalization.Height,
"expected", tipHeight+1)
- return nil, ErrInvalidSyncingFinalizationHeight
+ return false, ErrInvalidSyncingFinalizationHeight
}
con.logger.Info("syncBlocks",
"position", &blocks[0].Position,
@@ -423,15 +425,15 @@ func (con *Consensus) SyncBlocks(
err = con.db.UpdateBlock(*b)
}
if err != nil {
- return nil, err
+ return false, err
}
}
if err := con.db.PutCompactionChainTipInfo(
b.Hash, b.Finalization.Height); err != nil {
- return nil, err
+ return false, err
}
if err := con.processFinalizedBlock(b); err != nil {
- return nil, err
+ return false, err
}
}
if latest && con.lattice == nil {
@@ -439,7 +441,7 @@ func (con *Consensus) SyncBlocks(
// true for first time. Deliver set is found by block hashes.
syncBlock, err := con.findLatticeSyncBlock(blocks)
if err != nil {
- return nil, err
+ return false, err
}
if syncBlock != nil {
con.logger.Info("deliver set found", syncBlock)
@@ -457,13 +459,13 @@ func (con *Consensus) SyncBlocks(
}
b1, err := con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
- return nil, err
+ return false, err
}
b = &b1
}
for _, b := range blocksToProcess {
if err := con.processFinalizedBlock(b); err != nil {
- return nil, err
+ return false, err
}
}
}
@@ -472,40 +474,63 @@ func (con *Consensus) SyncBlocks(
// Check if compaction and agreements' blocks are overlapped. The
// overlapping of compaction chain and BA's oldest blocks means the
// syncing is done.
- con.isSynced = con.checkIfValidated() && con.checkIfSynced(blocks)
- }
- if con.isSynced {
- // Stop network and CRS routines, wait until they are all stoped.
- con.ctxCancel()
- con.moduleWaitGroup.Wait()
- // Stop agreements.
- con.stopAgreement()
- // flush all blocks in con.blocks into core.Consensus, and build
- // core.Consensus from syncer.
- lastBlock := blocks[len(blocks)-1]
- con.logger.Info("syncer.Consensus synced", "last-block", lastBlock)
- confirmedBlocks := []*types.Block{}
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- for _, bs := range con.blocks {
- confirmedBlocks = append(confirmedBlocks, bs...)
+ if con.checkIfValidated() && con.checkIfSynced(blocks) {
+ if err := con.Stop(); err != nil {
+ return false, err
}
- }()
- return core.NewConsensusFromSyncer(
- lastBlock,
- con.roundBeginTimes[lastBlock.Position.Round],
- con.app,
- con.gov,
- con.db,
- con.network,
- con.prv,
- con.lattice,
- confirmedBlocks,
- con.randomnessResults,
- con.logger)
+ con.syncedLastBlock = blocks[len(blocks)-1]
+ con.logger.Info("syncer.Consensus synced",
+ "last-block", con.syncedLastBlock)
+ }
}
- return nil, nil
+ return con.syncedLastBlock != nil, nil
+}
+
+// GetSyncedConsensus returns the core.Consensus instance after synced.
+func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ if con.syncedConsensus != nil {
+ return con.syncedConsensus, nil
+ }
+ if con.syncedLastBlock == nil {
+ return nil, ErrNotSynced
+ }
+ // flush all blocks in con.blocks into core.Consensus, and build
+ // core.Consensus from syncer.
+ confirmedBlocks := []*types.Block{}
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for _, bs := range con.blocks {
+ confirmedBlocks = append(confirmedBlocks, bs...)
+ }
+ }()
+ var err error
+ con.syncedConsensus, err = core.NewConsensusFromSyncer(
+ con.syncedLastBlock,
+ con.roundBeginTimes[con.syncedLastBlock.Position.Round],
+ con.app,
+ con.gov,
+ con.db,
+ con.network,
+ con.prv,
+ con.lattice,
+ confirmedBlocks,
+ con.randomnessResults,
+ con.logger)
+ return con.syncedConsensus, err
+}
+
+// Stop the syncer.
+//
+// This method is mainly for caller to stop the syncer before synced, the syncer
+// would call this method automatically after synced.
+func (con *Consensus) Stop() error {
+ // Stop network and CRS routines, wait until they are all stoped.
+ con.ctxCancel()
+ con.moduleWaitGroup.Wait()
+ // Stop agreements.
+ con.stopAgreement()
+ return nil
}
// isEmptyBlock checks if a block is an empty block by both its hash and parent
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 08a1d10..2680cd2 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -145,8 +145,13 @@ func (s *ConsensusTestSuite) syncBlocksWithSomeNode(
syncNode.gov.CatchUpWithRound(
b.Position.Round + core.ConfigRoundShift)
}
- syncedCon, err = syncerObj.SyncBlocks(compactionChainBlocks, true)
- if syncedCon != nil || err != nil {
+ var synced bool
+ synced, err = syncerObj.SyncBlocks(compactionChainBlocks, true)
+ if err != nil {
+ done = true
+ }
+ if synced {
+ syncedCon, err = syncerObj.GetSyncedConsensus()
done = true
}
compactionChainBlocks = nil