aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer
diff options
context:
space:
mode:
Diffstat (limited to 'core/syncer')
-rw-r--r--core/syncer/agreement.go2
-rw-r--r--core/syncer/consensus.go87
2 files changed, 54 insertions, 35 deletions
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index eaad860..acc4f1c 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -1,4 +1,4 @@
-// Copyright 2018 The dexon-consensus-core Authors
+// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 306017f..75c1067 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -39,7 +39,8 @@ var (
ErrNotSynced = fmt.Errorf("not synced yet")
// ErrGenesisBlockReached is reported when genesis block reached.
ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
- // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered
+ // blocks.
ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
// ErrMismatchBlockHashSequence means the delivering sequence is not
// correct, compared to finalized blocks.
@@ -83,6 +84,9 @@ type Consensus struct {
ctxCancel context.CancelFunc
syncedLastBlock *types.Block
syncedConsensus *core.Consensus
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus creates an instance for Consensus (syncer consensus).
@@ -286,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock(
blocks []*types.Block) (*types.Block, error) {
lastBlock := blocks[len(blocks)-1]
round := lastBlock.Position.Round
+ isConfigChanged := func(prev, cur *types.Config) bool {
+ return prev.K != cur.K ||
+ prev.NumChains != cur.NumChains ||
+ prev.PhiRatio != cur.PhiRatio
+ }
for {
// Find round r which r-1, r, r+1 are all in same total ordering config.
for {
- sameAsPrevRound := round == 0 || !con.isConfigChanged(
+ sameAsPrevRound := round == 0 || !isConfigChanged(
con.configs[round-1], con.configs[round])
- sameAsNextRound := !con.isConfigChanged(
+ sameAsNextRound := !isConfigChanged(
con.configs[round], con.configs[round+1])
if sameAsPrevRound && sameAsNextRound {
break
@@ -314,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock(
lastBlock = &b
}
// Find the deliver set by hash for two times. Blocks in a deliver set
- // returned by total ordering is sorted by hash. If a block's parent hash
- // is greater than its hash means there is a cut between deliver sets.
+ // returned by total ordering is sorted by hash. If a block's parent
+ // hash is greater than its hash means there is a cut between deliver
+ // sets.
var curBlock, prevBlock *types.Block
var deliverSetFirstBlock, deliverSetLastBlock *types.Block
curBlock = lastBlock
@@ -464,8 +474,8 @@ func (con *Consensus) SyncBlocks(
}
}
if latest && con.lattice == nil {
- // New Lattice and find the deliver set of total ordering when "latest" is
- // true for first time. Deliver set is found by block hashes.
+ // New Lattice and find the deliver set of total ordering when "latest"
+ // is true for first time. Deliver set is found by block hashes.
var syncBlock *types.Block
syncBlock, err = con.findLatticeSyncBlock(blocks)
if err != nil {
@@ -482,7 +492,8 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
// Process blocks from syncBlock to blocks' last block.
b := blocks[len(blocks)-1]
- blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
+ blocksCount :=
+ b.Finalization.Height - syncBlock.Finalization.Height + 1
blocksToProcess := make([]*types.Block, blocksCount)
for {
blocksToProcess[blocksCount-1] = b
@@ -512,8 +523,13 @@ func (con *Consensus) SyncBlocks(
if err = con.Stop(); err != nil {
return
}
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
con.syncedLastBlock = blocks[len(blocks)-1]
- synced = con.syncedLastBlock != nil
+ synced = true
}
}
return
@@ -521,6 +537,8 @@ func (con *Consensus) SyncBlocks(
// GetSyncedConsensus returns the core.Consensus instance after synced.
func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
if con.syncedConsensus != nil {
return con.syncedConsensus, nil
}
@@ -529,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
}
// flush all blocks in con.blocks into core.Consensus, and build
// core.Consensus from syncer.
- confirmedBlocks := []*types.Block{}
+ confirmedBlocks := make([][]*types.Block, len(con.blocks))
+ for i, bs := range con.blocks {
+ confirmedBlocks[i] = []*types.Block(bs)
+ }
randomnessResults := []*types.BlockRandomnessResult{}
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- for _, bs := range con.blocks {
- confirmedBlocks = append(confirmedBlocks, bs...)
- }
- for _, r := range con.randomnessResults {
- randomnessResults = append(randomnessResults, r)
- }
- }()
+ for _, r := range con.randomnessResults {
+ randomnessResults = append(randomnessResults, r)
+ }
+ con.dummyCancel()
+ <-con.dummyFinished
var err error
con.syncedConsensus, err = core.NewConsensusFromSyncer(
con.syncedLastBlock,
@@ -553,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.lattice,
confirmedBlocks,
randomnessResults,
+ con.dummyMsgBuffer,
con.logger)
return con.syncedConsensus, err
}
@@ -560,7 +577,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
// Stop the syncer.
//
// This method is mainly for caller to stop the syncer before synced, the syncer
-// would call this method automatically after synced.
+// would call this method automatically after being synced.
func (con *Consensus) Stop() error {
con.logger.Trace("syncer is about to stop")
// Stop network and CRS routines, wait until they are all stoped.
@@ -654,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
// Resize the pool of blocks.
con.blocks = append(con.blocks, types.ByPosition{})
// Resize agreement modules.
- a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ a := newAgreement(
+ con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
con.agreements = append(con.agreements, a)
con.agreementWaitGroup.Add(1)
go func() {
@@ -679,8 +697,8 @@ func (con *Consensus) startAgreement() {
func() {
con.lock.Lock()
defer con.lock.Unlock()
- // If round is cut in agreements, do not add blocks with round less
- // then cut round.
+ // If round is cut in agreements, do not add blocks with
+ // round less then cut round.
if b.Position.Round < con.agreementRoundCut {
return
}
@@ -764,15 +782,22 @@ func (con *Consensus) startNetwork() {
default:
continue Loop
}
- func() {
+ if func() bool {
con.lock.RLock()
defer con.lock.RUnlock()
if pos.ChainID >= uint32(len(con.agreements)) {
- con.logger.Error("Unknown chainID message received (syncer)",
+ // This error might be easily encountered when the
+ // "latest" parameter of SyncBlocks is turned on too
+ // early.
+ con.logger.Error(
+ "Unknown chainID message received (syncer)",
"position", &pos)
+ return false
}
- }()
- con.agreements[pos.ChainID].inputChan <- val
+ return true
+ }() {
+ con.agreements[pos.ChainID].inputChan <- val
+ }
case <-con.ctx.Done():
return
}
@@ -849,9 +874,3 @@ func (con *Consensus) stopAgreement() {
close(con.receiveChan)
close(con.pullChan)
}
-
-func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool {
- return prev.K != cur.K ||
- prev.NumChains != cur.NumChains ||
- prev.PhiRatio != cur.PhiRatio
-}