aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-12-12 16:55:19 +0800
committerGitHub <noreply@github.com>2018-12-12 16:55:19 +0800
commit338bf8676563a103cc78bbacef75fbaaac4293d7 (patch)
tree33587f90c7d7b8d61c99bebeb4ffee9c0b69668f
parentd60fedadb35d56ed873bad301cf3e5fd9a96410d (diff)
downloadtangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar
tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.gz
tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.bz2
tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.lz
tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.xz
tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.zst
tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.zip
syncer: fix stuffs (#366)
* return delivered blocks when processing finalized blocks * check deliver sequence when processing finalized blocks * skip delivery of finalized blocks * remove duplicated calls to BlockConfirmed * add numChains change in test scenario * fix the bug that restartNotary is triggered by older block than current aID.
-rw-r--r--core/agreement-mgr.go24
-rw-r--r--core/consensus.go76
-rw-r--r--core/consensus_test.go11
-rw-r--r--core/lattice.go12
-rw-r--r--core/syncer/consensus.go89
-rw-r--r--core/total-ordering.go13
-rw-r--r--integration_test/consensus_test.go8
7 files changed, 160 insertions, 73 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index c95f913..4cb47b1 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -20,6 +20,7 @@ package core
import (
"context"
"errors"
+ "math"
"sync"
"time"
@@ -174,7 +175,7 @@ func (mgr *agreementMgr) appendConfig(
recv := &consensusBAReceiver{
consensus: mgr.con,
chainID: i,
- restartNotary: make(chan bool, 1),
+ restartNotary: make(chan types.Position, 1),
}
agrModule := newAgreement(
mgr.con.ID,
@@ -252,7 +253,9 @@ func (mgr *agreementMgr) processAgreementResult(
int(mgr.gov.Configuration(result.Position.Round).NotarySetSize),
types.NewNotarySetTarget(crs, result.Position.ChainID))
for key := range result.Votes {
- agreement.processVote(&result.Votes[key])
+ if err := agreement.processVote(&result.Votes[key]); err != nil {
+ return err
+ }
}
agreement.restart(nIDs, result.Position, crs)
}
@@ -388,7 +391,7 @@ Loop:
// Run BA for this round.
recv.round = currentRound
recv.changeNotaryTime = roundEndTime
- recv.restartNotary <- false
+ recv.restartNotary <- types.Position{ChainID: math.MaxUint32}
if err := mgr.baRoutineForOneRound(&setting); err != nil {
mgr.logger.Error("BA routine failed",
"error", err,
@@ -412,10 +415,17 @@ Loop:
default:
}
select {
- case newNotary := <-recv.restartNotary:
- if newNotary {
- // This round is finished.
- break Loop
+ case restartPos := <-recv.restartNotary:
+ if !isStop(restartPos) {
+ if restartPos.Round > oldPos.Round {
+ // This round is finished.
+ break Loop
+ }
+ if restartPos.Older(&oldPos) {
+ // The restartNotary event is triggered by 'BlockConfirmed'
+ // of some older block.
+ break
+ }
}
var nextHeight uint64
for {
diff --git a/core/consensus.go b/core/consensus.go
index 11df5d4..3f4443f 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -71,7 +71,7 @@ type consensusBAReceiver struct {
changeNotaryTime time.Time
round uint64
isNotary bool
- restartNotary chan bool
+ restartNotary chan types.Position
}
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
@@ -145,6 +145,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block = <-ch
recv.consensus.logger.Info("Receive unknown block",
"hash", hash.String()[:6],
+ "position", &block.Position,
"chainID", recv.chainID)
recv.agreementModule.addCandidateBlock(block)
recv.agreementModule.lock.Lock()
@@ -161,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock(
parentHash := hash
for {
recv.consensus.logger.Warn("Parent block not confirmed",
- "hash", parentHash,
- "chainID", recv.chainID)
+ "parent-hash", parentHash.String()[:6],
+ "cur-position", &block.Position)
ch := make(chan *types.Block)
if !func() bool {
recv.consensus.lock.Lock()
@@ -188,7 +189,8 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
recv.consensus.logger.Info("Receive parent block",
- "hash", block.ParentHash.String()[:6],
+ "parent-hash", block.ParentHash.String()[:6],
+ "cur-position", &block.Position,
"chainID", recv.chainID)
recv.consensus.ccModule.registerBlock(block)
if err := recv.consensus.processBlock(block); err != nil {
@@ -238,12 +240,12 @@ CleanChannelLoop:
break CleanChannelLoop
}
}
+ newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
recv.round++
- recv.restartNotary <- true
- } else {
- recv.restartNotary <- false
+ newPos.Round++
}
+ recv.restartNotary <- newPos
}
func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
@@ -365,15 +367,16 @@ type Consensus struct {
network Network
// Misc.
- dMoment time.Time
- nodeSetCache *utils.NodeSetCache
- round uint64
- roundToNotify uint64
- lock sync.RWMutex
- ctx context.Context
- ctxCancel context.CancelFunc
- event *common.Event
- logger common.Logger
+ dMoment time.Time
+ nodeSetCache *utils.NodeSetCache
+ round uint64
+ roundToNotify uint64
+ lock sync.RWMutex
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ event *common.Event
+ logger common.Logger
+ nonFinalizedBlockDelivered bool
}
// NewConsensus construct an Consensus instance.
@@ -515,7 +518,6 @@ func NewConsensusFromSyncer(
}
// Dump all BA-confirmed blocks to the consensus instance.
for _, b := range blocks {
- con.app.BlockConfirmed(*b)
con.ccModule.registerBlock(b)
if err := con.processBlock(b); err != nil {
return nil, err
@@ -523,7 +525,7 @@ func NewConsensusFromSyncer(
}
// Dump all randomness result to the consensus instance.
for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r); err != nil {
+ if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
con.logger.Error("failed to process randomness result when syncing",
"result", r)
continue
@@ -815,27 +817,30 @@ MessageLoop:
// For sync mode.
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
+ "block", val,
"error", err)
}
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
+ "block", val,
"error", err)
}
}
case *types.Vote:
if err := con.ProcessVote(val); err != nil {
con.logger.Error("Failed to process vote",
- "error", err,
- "vote", val)
+ "vote", val,
+ "error", err)
}
case *types.AgreementResult:
if err := con.ProcessAgreementResult(val); err != nil {
con.logger.Error("Failed to process agreement result",
+ "result", val,
"error", err)
}
case *types.BlockRandomnessResult:
- if err := con.ProcessBlockRandomnessResult(val); err != nil {
+ if err := con.ProcessBlockRandomnessResult(val, true); err != nil {
con.logger.Error("Failed to process block randomness result",
"hash", val.BlockHash.String()[:6],
"position", &val.Position,
@@ -952,7 +957,7 @@ func (con *Consensus) ProcessAgreementResult(
Position: rand.Position,
Randomness: tsig.Signature,
}
- if err := con.ProcessBlockRandomnessResult(result); err != nil {
+ if err := con.ProcessBlockRandomnessResult(result, true); err != nil {
con.logger.Error("Failed to process randomness result",
"error", err)
return
@@ -963,7 +968,7 @@ func (con *Consensus) ProcessAgreementResult(
// ProcessBlockRandomnessResult processes the randomness result.
func (con *Consensus) ProcessBlockRandomnessResult(
- rand *types.BlockRandomnessResult) error {
+ rand *types.BlockRandomnessResult, needBroadcast bool) error {
if rand.Position.Round == 0 {
return nil
}
@@ -974,11 +979,13 @@ func (con *Consensus) ProcessBlockRandomnessResult(
return err
}
}
- con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "hash", rand.BlockHash.String()[:6],
- "position", &rand.Position,
- "randomness", hex.EncodeToString(rand.Randomness))
- con.network.BroadcastRandomnessResult(rand)
+ if needBroadcast {
+ con.logger.Debug("Calling Network.BroadcastRandomnessResult",
+ "hash", rand.BlockHash.String()[:6],
+ "position", &rand.Position,
+ "randomness", hex.EncodeToString(rand.Randomness))
+ con.network.BroadcastRandomnessResult(rand)
+ }
return nil
}
@@ -1039,6 +1046,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
}
// Pass delivered blocks to compaction chain.
for _, b := range deliveredBlocks {
+ if b.IsFinalized() {
+ if con.nonFinalizedBlockDelivered {
+ panic(fmt.Errorf("attempting to skip finalized block: %s", b))
+ }
+ con.logger.Info("skip delivery of finalized block",
+ "block", b,
+ "finalization-height", b.Finalization.Height)
+ continue
+ } else {
+ // Mark that some non-finalized block delivered. After this flag
+ // turned on, it's not allowed to deliver finalized blocks anymore.
+ con.nonFinalizedBlockDelivered = true
+ }
if err = con.ccModule.processBlock(b); err != nil {
return
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 5fb804b..c1cdca8 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -155,7 +155,7 @@ func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) {
case *types.AgreementResult:
err = con.ProcessAgreementResult(val)
case *types.BlockRandomnessResult:
- err = con.ProcessBlockRandomnessResult(val)
+ err = con.ProcessBlockRandomnessResult(val, true)
case *typesDKG.PrivateShare:
err = con.cfgModule.processPrivateShare(val)
case *typesDKG.PartialSignature:
@@ -267,11 +267,12 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
// It's a helper function to emit one block
// to all core.Consensus objects.
broadcast := func(b *types.Block) {
+ h := common.NewRandomHash()
+ b.Finalization.Randomness = h[:]
for _, obj := range objs {
- h := common.NewRandomHash()
- b.Finalization.Randomness = h[:]
- obj.con.ccModule.registerBlock(b)
- req.Nil(obj.con.processBlock(b))
+ copied := b.Clone()
+ obj.con.ccModule.registerBlock(copied)
+ req.Nil(obj.con.processBlock(copied))
}
}
// Genesis blocks
diff --git a/core/lattice.go b/core/lattice.go
index e578e3f..6ea5f8b 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -304,21 +304,21 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
}
// ProcessFinalizedBlock is used for syncing lattice data.
-func (l *Lattice) ProcessFinalizedBlock(b *types.Block) {
+func (l *Lattice) ProcessFinalizedBlock(b *types.Block) ([]*types.Block, error) {
l.lock.Lock()
defer l.lock.Unlock()
// Syncing state for core.latticeData module.
if err := l.data.addFinalizedBlock(b); err != nil {
- panic(err)
+ return nil, err
}
l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
// Syncing state for core.totalOrdering module.
toDelivered, deliveredMode, err := l.toModule.processBlock(b)
if err != nil {
- panic(err)
+ return nil, err
}
if len(toDelivered) == 0 {
- return
+ return nil, nil
}
hashes := make(common.Hashes, len(toDelivered))
for idx := range toDelivered {
@@ -329,7 +329,7 @@ func (l *Lattice) ProcessFinalizedBlock(b *types.Block) {
}
// Sync core.consensusTimestamp module.
if err = l.ctModule.processBlocks(toDelivered); err != nil {
- panic(err)
+ return nil, err
}
- return
+ return toDelivered, nil
}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 176673b..dca6112 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -39,6 +39,9 @@ var (
ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
// ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
+ // ErrMismatchBlockHashSequence means the delivering sequence is not
+ // correct, compared to finalized blocks.
+ ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence")
)
// Consensus is for syncing consensus module.
@@ -52,14 +55,16 @@ type Consensus struct {
network core.Network
nodeSetCache *utils.NodeSetCache
- lattice *core.Lattice
- latticeLastRound uint64
- randomnessResults []*types.BlockRandomnessResult
- blocks []types.ByPosition
- agreements []*agreement
- configs []*types.Config
- roundBeginTimes []time.Time
- agreementRoundCut uint64
+ lattice *core.Lattice
+ validatedChains map[uint32]struct{}
+ finalizedBlockHashes common.Hashes
+ latticeLastRound uint64
+ randomnessResults []*types.BlockRandomnessResult
+ blocks []types.ByPosition
+ agreements []*agreement
+ configs []*types.Config
+ roundBeginTimes []time.Time
+ agreementRoundCut uint64
// lock for accessing all fields.
lock sync.RWMutex
@@ -92,6 +97,7 @@ func NewConsensus(
prv: prv,
logger: logger,
isSynced: false,
+ validatedChains: make(map[uint32]struct{}),
configs: []*types.Config{gov.Configuration(0)},
roundBeginTimes: []time.Time{dMoment},
receiveChan: make(chan *types.Block, 1000),
@@ -125,7 +131,27 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) {
con.startCRSMonitor()
}
-func (con *Consensus) checkIfSynced(blocks []*types.Block) {
+func (con *Consensus) checkIfValidated() bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ var validatedChainCount uint32
+ // Make sure we validate some block in all chains.
+ for chainID := range con.validatedChains {
+ if chainID < numChains {
+ validatedChainCount++
+ }
+ }
+ if validatedChainCount == numChains {
+ return true
+ }
+ con.logger.Info("not validated yet", "validated-chain", validatedChainCount)
+ return false
+}
+
+func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
var (
numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
compactionTips = make([]*types.Block, numChains)
@@ -142,7 +168,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) {
}
}
if (b.Finalization.ParentHash == common.Hash{}) {
- return
+ return false
}
b1, err := con.db.Get(b.Finalization.ParentHash)
if err != nil {
@@ -153,8 +179,6 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) {
// Check if chain tips of compaction chain and current cached confirmed
// blocks are overlapped on each chain, numChains is decided by the round
// of last block we seen on compaction chain.
- con.lock.RLock()
- defer con.lock.RUnlock()
for chainID, b := range compactionTips {
if len(con.blocks[chainID]) > 0 {
if !b.Position.Older(&con.blocks[chainID][0].Position) {
@@ -163,13 +187,13 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) {
}
}
if overlapCount == numChains {
- con.isSynced = true
- } else {
- con.logger.Info("not overlap yet",
- "overlap-count", overlapCount,
- "num-chain", numChains,
- "last-block", blocks[len(blocks)-1])
+ return true
}
+ con.logger.Info("not synced yet",
+ "overlap-count", overlapCount,
+ "num-chain", numChains,
+ "last-block", blocks[len(blocks)-1])
+ return false
}
// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
@@ -333,6 +357,25 @@ func (con *Consensus) findLatticeSyncBlock(
}
}
+func (con *Consensus) processFinalizedBlock(block *types.Block) error {
+ if con.lattice == nil {
+ return nil
+ }
+ con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
+ delivered, err := con.lattice.ProcessFinalizedBlock(block)
+ if err != nil {
+ return err
+ }
+ for idx, b := range delivered {
+ if con.finalizedBlockHashes[idx] != b.Hash {
+ return ErrMismatchBlockHashSequence
+ }
+ con.validatedChains[b.Position.ChainID] = struct{}{}
+ }
+ con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):]
+ return nil
+}
+
// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
@@ -365,8 +408,8 @@ func (con *Consensus) SyncBlocks(
return nil, err
}
}
- if con.lattice != nil {
- con.lattice.ProcessFinalizedBlock(b)
+ if err := con.processFinalizedBlock(b); err != nil {
+ return nil, err
}
}
if latest && con.lattice == nil {
@@ -397,7 +440,9 @@ func (con *Consensus) SyncBlocks(
b = &b1
}
for _, b := range blocksToProcess {
- con.lattice.ProcessFinalizedBlock(b)
+ if err := con.processFinalizedBlock(b); err != nil {
+ return nil, err
+ }
}
}
}
@@ -405,7 +450,7 @@ func (con *Consensus) SyncBlocks(
// Check if compaction and agreements' blocks are overlapped. The
// overlapping of compaction chain and BA's oldest blocks means the
// syncing is done.
- con.checkIfSynced(blocks)
+ con.isSynced = con.checkIfValidated() && con.checkIfSynced(blocks)
}
if con.isSynced {
// Stop network and CRS routines, wait until they are all stoped.
diff --git a/core/total-ordering.go b/core/total-ordering.go
index 52f9270..3bf6946 100644
--- a/core/total-ordering.go
+++ b/core/total-ordering.go
@@ -535,11 +535,13 @@ type totalOrderingGlobalVector struct {
cachedCandidateInfo *totalOrderingCandidateInfo
}
-func newTotalOrderingGlobalVector(numChains uint32) *totalOrderingGlobalVector {
+func newTotalOrderingGlobalVector(
+ initRound uint64, numChains uint32) *totalOrderingGlobalVector {
return &totalOrderingGlobalVector{
blocks: make([][]*types.Block, numChains),
tips: make([]*types.Block, numChains),
breakpoints: make([][]*totalOrderingBreakpoint, numChains),
+ curRound: initRound,
}
}
@@ -792,14 +794,14 @@ type totalOrdering struct {
}
// newTotalOrdering constructs an totalOrdering instance.
-func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering {
+func newTotalOrdering(
+ dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering {
config := &totalOrderingConfig{}
config.fromConfig(round, cfg)
config.setRoundBeginTime(dMoment)
candidates := make([]*totalOrderingCandidateInfo, config.numChains)
to := &totalOrdering{
pendings: make(map[common.Hash]*types.Block),
- globalVector: newTotalOrderingGlobalVector(config.numChains),
dirtyChainIDs: make([]int, 0, config.numChains),
acked: make(map[common.Hash]map[common.Hash]struct{}),
objCache: newTotalOrderingObjectCache(config.numChains),
@@ -807,6 +809,8 @@ func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *total
candidates: candidates,
candidateChainIDs: make([]uint32, 0, config.numChains),
curRound: config.roundID,
+ globalVector: newTotalOrderingGlobalVector(
+ config.roundID, config.numChains),
}
to.configs = []*totalOrderingConfig{config}
return to
@@ -898,7 +902,8 @@ func (to *totalOrdering) clean(b *types.Block) {
}
// updateVectors is a helper function to update all cached vectors.
-func (to *totalOrdering) updateVectors(b *types.Block) (isOldest bool, err error) {
+func (to *totalOrdering) updateVectors(
+ b *types.Block) (isOldest bool, err error) {
var (
candidateHash common.Hash
chainID uint32
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 16cfa8f..6d693c8 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -322,6 +322,12 @@ func (s *ConsensusTestSuite) TestSync() {
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
test.StateChangeRoundInterval, 50*time.Second))
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeNumChains, uint32(5)))
+ seedGov.CatchUpWithRound(0)
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeNumChains, uint32(4)))
+ seedGov.CatchUpWithRound(1)
// A short round interval.
nodes := s.setupNodes(dMoment, prvKeys, seedGov)
// Choose the first node as "syncNode" that its consensus' Run() is called
@@ -444,7 +450,7 @@ ReachAlive:
// Stop a node, we should still be able to proceed.
stoppedNode.con.Stop()
stoppedNode.con = nil
- fmt.Println("one node stopped")
+ fmt.Println("one node stopped", stoppedNode.ID)
// Initiate a dummy routine to consume the receive channel.
go func() {
for {