aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go285
1 files changed, 186 insertions, 99 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
index 92f8fd8d0..75c106793 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
@@ -18,7 +18,6 @@
package syncer
import (
- "bytes"
"context"
"fmt"
"sort"
@@ -40,7 +39,8 @@ var (
ErrNotSynced = fmt.Errorf("not synced yet")
// ErrGenesisBlockReached is reported when genesis block reached.
ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
- // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered
+ // blocks.
ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
// ErrMismatchBlockHashSequence means the delivering sequence is not
// correct, compared to finalized blocks.
@@ -61,6 +61,7 @@ type Consensus struct {
prv crypto.PrivateKey
network core.Network
nodeSetCache *utils.NodeSetCache
+ tsigVerifier *core.TSigVerifierCache
lattice *core.Lattice
validatedChains map[uint32]struct{}
@@ -83,6 +84,9 @@ type Consensus struct {
ctxCancel context.CancelFunc
syncedLastBlock *types.Block
syncedConsensus *core.Consensus
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus creates an instance for Consensus (syncer consensus).
@@ -102,6 +106,7 @@ func NewConsensus(
db: db,
network: network,
nodeSetCache: utils.NewNodeSetCache(gov),
+ tsigVerifier: core.NewTSigVerifierCache(gov, 7),
prv: prv,
logger: logger,
validatedChains: make(map[uint32]struct{}),
@@ -118,17 +123,15 @@ func NewConsensus(
}
func (con *Consensus) initConsensusObj(initBlock *types.Block) {
- var cfg *types.Config
func() {
con.lock.Lock()
defer con.lock.Unlock()
con.latticeLastRound = initBlock.Position.Round
- cfg = con.configs[con.latticeLastRound]
debugApp, _ := con.app.(core.Debug)
con.lattice = core.NewLattice(
con.roundBeginTimes[con.latticeLastRound],
con.latticeLastRound,
- cfg,
+ con.configs[con.latticeLastRound],
utils.NewSigner(con.prv),
con.app,
debugApp,
@@ -136,37 +139,49 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) {
con.logger,
)
}()
- con.startAgreement(cfg.NumChains)
+ con.startAgreement()
con.startNetwork()
con.startCRSMonitor()
}
-func (con *Consensus) checkIfValidated() bool {
+func (con *Consensus) checkIfValidated() (validated bool) {
con.lock.RLock()
defer con.lock.RUnlock()
- var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
- var validatedChainCount uint32
+ var (
+ round = con.blocks[0][0].Position.Round
+ numChains = con.configs[round].NumChains
+ validatedChainCount uint32
+ )
// Make sure we validate some block in all chains.
for chainID := range con.validatedChains {
if chainID < numChains {
validatedChainCount++
}
}
- if validatedChainCount == numChains {
- return true
- }
- con.logger.Debug("not validated yet", "validated-chain", validatedChainCount)
- return false
+ validated = validatedChainCount == numChains
+ con.logger.Debug("syncer chain-validation status",
+ "validated-chain", validatedChainCount,
+ "round", round,
+ "valid", validated)
+ return
}
-func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
+func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
con.lock.RLock()
defer con.lock.RUnlock()
var (
- numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ round = con.blocks[0][0].Position.Round
+ numChains = con.configs[round].NumChains
compactionTips = make([]*types.Block, numChains)
overlapCount = uint32(0)
)
+ defer func() {
+ con.logger.Debug("syncer synced status",
+ "overlap-count", overlapCount,
+ "num-chain", numChains,
+ "last-block", blocks[len(blocks)-1],
+ "synced", synced)
+ }()
// Find tips (newset blocks) of each chain in compaction chain.
b := blocks[len(blocks)-1]
for tipCount := uint32(0); tipCount < numChains; {
@@ -178,7 +193,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
}
}
if (b.Finalization.ParentHash == common.Hash{}) {
- return false
+ return
}
b1, err := con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
@@ -196,14 +211,8 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
}
}
}
- if overlapCount == numChains {
- return true
- }
- con.logger.Debug("not synced yet",
- "overlap-count", overlapCount,
- "num-chain", numChains,
- "last-block", blocks[len(blocks)-1])
- return false
+ synced = overlapCount == numChains
+ return
}
// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
@@ -212,6 +221,10 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
func (con *Consensus) ensureAgreementOverlapRound() bool {
con.lock.Lock()
defer con.lock.Unlock()
+ defer func() {
+ con.logger.Debug("ensureAgreementOverlapRound returned",
+ "round", con.agreementRoundCut)
+ }()
if con.agreementRoundCut > 0 {
return true
}
@@ -267,7 +280,6 @@ func (con *Consensus) ensureAgreementOverlapRound() bool {
"configs", len(con.configs))
if tipRoundMap[r] == con.configs[r].NumChains {
con.agreementRoundCut = r
- con.logger.Info("agreement round cut found, round", r)
return true
}
}
@@ -278,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock(
blocks []*types.Block) (*types.Block, error) {
lastBlock := blocks[len(blocks)-1]
round := lastBlock.Position.Round
+ isConfigChanged := func(prev, cur *types.Config) bool {
+ return prev.K != cur.K ||
+ prev.NumChains != cur.NumChains ||
+ prev.PhiRatio != cur.PhiRatio
+ }
for {
// Find round r which r-1, r, r+1 are all in same total ordering config.
for {
- sameAsPrevRound := round == 0 || !con.isConfigChanged(
+ sameAsPrevRound := round == 0 || !isConfigChanged(
con.configs[round-1], con.configs[round])
- sameAsNextRound := !con.isConfigChanged(
+ sameAsNextRound := !isConfigChanged(
con.configs[round], con.configs[round+1])
if sameAsPrevRound && sameAsNextRound {
break
@@ -306,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock(
lastBlock = &b
}
// Find the deliver set by hash for two times. Blocks in a deliver set
- // returned by total ordering is sorted by hash. If a block's parent hash
- // is greater than its hash means there is a cut between deliver sets.
+ // returned by total ordering is sorted by hash. If a block's parent
+ // hash is greater than its hash means there is a cut between deliver
+ // sets.
var curBlock, prevBlock *types.Block
var deliverSetFirstBlock, deliverSetLastBlock *types.Block
curBlock = lastBlock
@@ -374,11 +392,13 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
if con.lattice == nil {
return nil
}
- con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
delivered, err := con.lattice.ProcessFinalizedBlock(block)
if err != nil {
return err
}
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
for idx, b := range delivered {
if con.finalizedBlockHashes[idx] != b.Hash {
return ErrMismatchBlockHashSequence
@@ -393,18 +413,27 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
// NOTICE: parameter "blocks" should be consecutive in compaction height.
+// NOTICE: this method is not expected to be called concurrently.
func (con *Consensus) SyncBlocks(
- blocks []*types.Block, latest bool) (bool, error) {
+ blocks []*types.Block, latest bool) (synced bool, err error) {
+ defer func() {
+ con.logger.Debug("SyncBlocks returned",
+ "synced", synced,
+ "error", err,
+ "last-block", con.syncedLastBlock)
+ }()
if con.syncedLastBlock != nil {
- return true, ErrAlreadySynced
+ synced, err = true, ErrAlreadySynced
+ return
}
if len(blocks) == 0 {
- return false, nil
+ return
}
// Check if blocks are consecutive.
for i := 1; i < len(blocks); i++ {
if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
- return false, ErrInvalidBlockOrder
+ err = ErrInvalidBlockOrder
+ return
}
}
// Make sure the first block is the next block of current compaction chain
@@ -414,7 +443,8 @@ func (con *Consensus) SyncBlocks(
con.logger.Error("mismatched finalization height",
"now", blocks[0].Finalization.Height,
"expected", tipHeight+1)
- return false, ErrInvalidSyncingFinalizationHeight
+ err = ErrInvalidSyncingFinalizationHeight
+ return
}
con.logger.Trace("syncBlocks",
"position", &blocks[0].Position,
@@ -425,30 +455,35 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
for _, b := range blocks {
// TODO(haoping) remove this if lattice puts blocks into db.
- if err := con.db.PutBlock(*b); err != nil {
+ if err = con.db.PutBlock(*b); err != nil {
// A block might be put into db when confirmed by BA, but not
// finalized yet.
if err == db.ErrBlockExists {
err = con.db.UpdateBlock(*b)
}
if err != nil {
- return false, err
+ return
}
}
- if err := con.db.PutCompactionChainTipInfo(
+ if err = con.db.PutCompactionChainTipInfo(
b.Hash, b.Finalization.Height); err != nil {
- return false, err
+ return
}
- if err := con.processFinalizedBlock(b); err != nil {
- return false, err
+ if err = con.processFinalizedBlock(b); err != nil {
+ return
}
}
if latest && con.lattice == nil {
- // New Lattice and find the deliver set of total ordering when "latest" is
- // true for first time. Deliver set is found by block hashes.
- syncBlock, err := con.findLatticeSyncBlock(blocks)
+ // New Lattice and find the deliver set of total ordering when "latest"
+ // is true for first time. Deliver set is found by block hashes.
+ var syncBlock *types.Block
+ syncBlock, err = con.findLatticeSyncBlock(blocks)
if err != nil {
- return false, err
+ if err == ErrGenesisBlockReached {
+ con.logger.Debug("SyncBlocks skip error", "error", err)
+ err = nil
+ }
+ return
}
if syncBlock != nil {
con.logger.Debug("deliver set found", "block", syncBlock)
@@ -457,7 +492,8 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
// Process blocks from syncBlock to blocks' last block.
b := blocks[len(blocks)-1]
- blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
+ blocksCount :=
+ b.Finalization.Height - syncBlock.Finalization.Height + 1
blocksToProcess := make([]*types.Block, blocksCount)
for {
blocksToProcess[blocksCount-1] = b
@@ -465,15 +501,16 @@ func (con *Consensus) SyncBlocks(
if b.Hash == syncBlock.Hash {
break
}
- b1, err := con.db.GetBlock(b.Finalization.ParentHash)
+ var b1 types.Block
+ b1, err = con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
- return false, err
+ return
}
b = &b1
}
for _, b := range blocksToProcess {
- if err := con.processFinalizedBlock(b); err != nil {
- return false, err
+ if err = con.processFinalizedBlock(b); err != nil {
+ return
}
}
}
@@ -483,19 +520,25 @@ func (con *Consensus) SyncBlocks(
// overlapping of compaction chain and BA's oldest blocks means the
// syncing is done.
if con.checkIfValidated() && con.checkIfSynced(blocks) {
- if err := con.Stop(); err != nil {
- return false, err
+ if err = con.Stop(); err != nil {
+ return
}
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
con.syncedLastBlock = blocks[len(blocks)-1]
- con.logger.Info("syncer.Consensus synced",
- "last-block", con.syncedLastBlock)
+ synced = true
}
}
- return con.syncedLastBlock != nil, nil
+ return
}
// GetSyncedConsensus returns the core.Consensus instance after synced.
func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
if con.syncedConsensus != nil {
return con.syncedConsensus, nil
}
@@ -504,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
}
// flush all blocks in con.blocks into core.Consensus, and build
// core.Consensus from syncer.
- confirmedBlocks := []*types.Block{}
+ confirmedBlocks := make([][]*types.Block, len(con.blocks))
+ for i, bs := range con.blocks {
+ confirmedBlocks[i] = []*types.Block(bs)
+ }
randomnessResults := []*types.BlockRandomnessResult{}
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- for _, bs := range con.blocks {
- confirmedBlocks = append(confirmedBlocks, bs...)
- }
- for _, r := range con.randomnessResults {
- randomnessResults = append(randomnessResults, r)
- }
- }()
+ for _, r := range con.randomnessResults {
+ randomnessResults = append(randomnessResults, r)
+ }
+ con.dummyCancel()
+ <-con.dummyFinished
var err error
con.syncedConsensus, err = core.NewConsensusFromSyncer(
con.syncedLastBlock,
@@ -528,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.lattice,
confirmedBlocks,
randomnessResults,
+ con.dummyMsgBuffer,
con.logger)
return con.syncedConsensus, err
}
@@ -535,13 +577,17 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
// Stop the syncer.
//
// This method is mainly for caller to stop the syncer before synced, the syncer
-// would call this method automatically after synced.
+// would call this method automatically after being synced.
func (con *Consensus) Stop() error {
+ con.logger.Trace("syncer is about to stop")
// Stop network and CRS routines, wait until they are all stoped.
con.ctxCancel()
+ con.logger.Trace("stop syncer modules")
con.moduleWaitGroup.Wait()
// Stop agreements.
+ con.logger.Trace("stop syncer agreement modules")
con.stopAgreement()
+ con.logger.Trace("syncer stopped")
return nil
}
@@ -566,6 +612,10 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) {
func() {
con.lock.Lock()
defer con.lock.Unlock()
+ con.logger.Debug("syncer setupConfigs",
+ "until-round", round,
+ "length", len(con.configs),
+ "lattice", con.latticeLastRound)
for r := uint64(len(con.configs)); r <= round; r++ {
cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
con.configs = append(con.configs, cfg)
@@ -589,6 +639,7 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) {
}
}()
con.resizeByNumChains(curMaxNumChains)
+ con.logger.Trace("setupConfgis finished", "round", round)
}
// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
@@ -601,9 +652,6 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) {
maxRound = b.Position.Round
}
}
- con.logger.Debug("syncer setupConfigs",
- "max", maxRound,
- "lattice", con.latticeLastRound)
// Get configs from governance.
//
// In fullnode, the notification of new round is yet another TX, which
@@ -623,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
// Resize the pool of blocks.
con.blocks = append(con.blocks, types.ByPosition{})
// Resize agreement modules.
- a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ a := newAgreement(
+ con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
con.agreements = append(con.agreements, a)
con.agreementWaitGroup.Add(1)
go func() {
@@ -635,7 +684,7 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
}
// startAgreement starts agreements for receiving votes and agreements.
-func (con *Consensus) startAgreement(numChains uint32) {
+func (con *Consensus) startAgreement() {
// Start a routine for listening receive channel and pull block channel.
go func() {
for {
@@ -648,8 +697,8 @@ func (con *Consensus) startAgreement(numChains uint32) {
func() {
con.lock.Lock()
defer con.lock.Unlock()
- // If round is cut in agreements, do not add blocks with round less
- // then cut round.
+ // If round is cut in agreements, do not add blocks with
+ // round less then cut round.
if b.Position.Round < con.agreementRoundCut {
return
}
@@ -667,6 +716,10 @@ func (con *Consensus) startAgreement(numChains uint32) {
}
func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
+ // There is no block randomness at round-0.
+ if r.Position.Round == 0 {
+ return
+ }
// We only have to cache randomness result after cutting round.
if r.Position.Round < func() uint64 {
con.lock.RLock()
@@ -675,23 +728,43 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
}() {
return
}
- con.lock.Lock()
- defer con.lock.Unlock()
- if old, exists := con.randomnessResults[r.BlockHash]; exists {
- if bytes.Compare(old.Randomness, r.Randomness) != 0 {
- panic(fmt.Errorf("receive different randomness result: %s, %s",
- r.BlockHash.String()[:6], &r.Position))
- }
- // We don't have to assign the map again.
+ if func() (exists bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ _, exists = con.randomnessResults[r.BlockHash]
+ return
+ }() {
+ return
+ }
+ v, ok, err := con.tsigVerifier.UpdateAndGet(r.Position.Round)
+ if err != nil {
+ con.logger.Error("Unable to get tsig verifier",
+ "hash", r.BlockHash.String()[:6],
+ "position", &r.Position,
+ "error", err)
+ return
+ }
+ if !ok {
+ con.logger.Error("Tsig is not ready", "position", &r.Position)
return
}
+ if !v.VerifySignature(r.BlockHash, crypto.Signature{
+ Type: "bls",
+ Signature: r.Randomness}) {
+ con.logger.Info("Block randomness is not valid",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
+ return
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
con.randomnessResults[r.BlockHash] = r
}
// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
+ con.moduleWaitGroup.Add(1)
go func() {
- con.moduleWaitGroup.Add(1)
defer con.moduleWaitGroup.Done()
Loop:
for {
@@ -709,15 +782,22 @@ func (con *Consensus) startNetwork() {
default:
continue Loop
}
- func() {
+ if func() bool {
con.lock.RLock()
defer con.lock.RUnlock()
if pos.ChainID >= uint32(len(con.agreements)) {
- con.logger.Error("Unknown chainID message received (syncer)",
+ // This error might be easily encountered when the
+ // "latest" parameter of SyncBlocks is turned on too
+ // early.
+ con.logger.Error(
+ "Unknown chainID message received (syncer)",
"position", &pos)
+ return false
}
- }()
- con.agreements[pos.ChainID].inputChan <- val
+ return true
+ }() {
+ con.agreements[pos.ChainID].inputChan <- val
+ }
case <-con.ctx.Done():
return
}
@@ -732,19 +812,32 @@ func (con *Consensus) startCRSMonitor() {
// Notify all agreements for new CRS.
notifyNewCRS := func(round uint64) {
con.setupConfigsUntilRound(round)
- con.lock.Lock()
- defer con.lock.Unlock()
if round == lastNotifiedRound {
return
}
con.logger.Debug("CRS is ready", "round", round)
lastNotifiedRound = round
- for _, a := range con.agreements {
- a.inputChan <- round
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for idx, a := range con.agreements {
+ loop:
+ for {
+ select {
+ case <-con.ctx.Done():
+ break loop
+ case a.inputChan <- round:
+ break loop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug(
+ "agreement input channel is full when putting CRS",
+ "chainID", idx,
+ "round", round)
+ }
+ }
}
}
+ con.moduleWaitGroup.Add(1)
go func() {
- con.moduleWaitGroup.Add(1)
defer con.moduleWaitGroup.Done()
for {
select {
@@ -781,9 +874,3 @@ func (con *Consensus) stopAgreement() {
close(con.receiveChan)
close(con.pullChan)
}
-
-func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool {
- return prev.K != cur.K ||
- prev.NumChains != cur.NumChains ||
- prev.PhiRatio != cur.PhiRatio
-}