aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-12-12 16:55:19 +0800
committerGitHub <noreply@github.com>2018-12-12 16:55:19 +0800
commit338bf8676563a103cc78bbacef75fbaaac4293d7 (patch)
tree33587f90c7d7b8d61c99bebeb4ffee9c0b69668f /core/consensus.go
parentd60fedadb35d56ed873bad301cf3e5fd9a96410d (diff)
downloaddexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar
dexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.gz
dexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.bz2
dexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.lz
dexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.xz
dexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.zst
dexon-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.zip
syncer: fix stuffs (#366)
* return delivered blocks when processing finalized blocks * check deliver sequence when processing finalized blocks * skip delivery of finalized blocks * remove duplicated calls to BlockConfirmed * add numChains change in test scenario * fix the bug that restartNotary is triggered by older block than current aID.
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go76
1 files changed, 48 insertions, 28 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 11df5d4..3f4443f 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -71,7 +71,7 @@ type consensusBAReceiver struct {
changeNotaryTime time.Time
round uint64
isNotary bool
- restartNotary chan bool
+ restartNotary chan types.Position
}
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
@@ -145,6 +145,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block = <-ch
recv.consensus.logger.Info("Receive unknown block",
"hash", hash.String()[:6],
+ "position", &block.Position,
"chainID", recv.chainID)
recv.agreementModule.addCandidateBlock(block)
recv.agreementModule.lock.Lock()
@@ -161,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock(
parentHash := hash
for {
recv.consensus.logger.Warn("Parent block not confirmed",
- "hash", parentHash,
- "chainID", recv.chainID)
+ "parent-hash", parentHash.String()[:6],
+ "cur-position", &block.Position)
ch := make(chan *types.Block)
if !func() bool {
recv.consensus.lock.Lock()
@@ -188,7 +189,8 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
recv.consensus.logger.Info("Receive parent block",
- "hash", block.ParentHash.String()[:6],
+ "parent-hash", block.ParentHash.String()[:6],
+ "cur-position", &block.Position,
"chainID", recv.chainID)
recv.consensus.ccModule.registerBlock(block)
if err := recv.consensus.processBlock(block); err != nil {
@@ -238,12 +240,12 @@ CleanChannelLoop:
break CleanChannelLoop
}
}
+ newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
recv.round++
- recv.restartNotary <- true
- } else {
- recv.restartNotary <- false
+ newPos.Round++
}
+ recv.restartNotary <- newPos
}
func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
@@ -365,15 +367,16 @@ type Consensus struct {
network Network
// Misc.
- dMoment time.Time
- nodeSetCache *utils.NodeSetCache
- round uint64
- roundToNotify uint64
- lock sync.RWMutex
- ctx context.Context
- ctxCancel context.CancelFunc
- event *common.Event
- logger common.Logger
+ dMoment time.Time
+ nodeSetCache *utils.NodeSetCache
+ round uint64
+ roundToNotify uint64
+ lock sync.RWMutex
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ event *common.Event
+ logger common.Logger
+ nonFinalizedBlockDelivered bool
}
// NewConsensus construct an Consensus instance.
@@ -515,7 +518,6 @@ func NewConsensusFromSyncer(
}
// Dump all BA-confirmed blocks to the consensus instance.
for _, b := range blocks {
- con.app.BlockConfirmed(*b)
con.ccModule.registerBlock(b)
if err := con.processBlock(b); err != nil {
return nil, err
@@ -523,7 +525,7 @@ func NewConsensusFromSyncer(
}
// Dump all randomness result to the consensus instance.
for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r); err != nil {
+ if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
con.logger.Error("failed to process randomness result when syncing",
"result", r)
continue
@@ -815,27 +817,30 @@ MessageLoop:
// For sync mode.
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
+ "block", val,
"error", err)
}
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
+ "block", val,
"error", err)
}
}
case *types.Vote:
if err := con.ProcessVote(val); err != nil {
con.logger.Error("Failed to process vote",
- "error", err,
- "vote", val)
+ "vote", val,
+ "error", err)
}
case *types.AgreementResult:
if err := con.ProcessAgreementResult(val); err != nil {
con.logger.Error("Failed to process agreement result",
+ "result", val,
"error", err)
}
case *types.BlockRandomnessResult:
- if err := con.ProcessBlockRandomnessResult(val); err != nil {
+ if err := con.ProcessBlockRandomnessResult(val, true); err != nil {
con.logger.Error("Failed to process block randomness result",
"hash", val.BlockHash.String()[:6],
"position", &val.Position,
@@ -952,7 +957,7 @@ func (con *Consensus) ProcessAgreementResult(
Position: rand.Position,
Randomness: tsig.Signature,
}
- if err := con.ProcessBlockRandomnessResult(result); err != nil {
+ if err := con.ProcessBlockRandomnessResult(result, true); err != nil {
con.logger.Error("Failed to process randomness result",
"error", err)
return
@@ -963,7 +968,7 @@ func (con *Consensus) ProcessAgreementResult(
// ProcessBlockRandomnessResult processes the randomness result.
func (con *Consensus) ProcessBlockRandomnessResult(
- rand *types.BlockRandomnessResult) error {
+ rand *types.BlockRandomnessResult, needBroadcast bool) error {
if rand.Position.Round == 0 {
return nil
}
@@ -974,11 +979,13 @@ func (con *Consensus) ProcessBlockRandomnessResult(
return err
}
}
- con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "hash", rand.BlockHash.String()[:6],
- "position", &rand.Position,
- "randomness", hex.EncodeToString(rand.Randomness))
- con.network.BroadcastRandomnessResult(rand)
+ if needBroadcast {
+ con.logger.Debug("Calling Network.BroadcastRandomnessResult",
+ "hash", rand.BlockHash.String()[:6],
+ "position", &rand.Position,
+ "randomness", hex.EncodeToString(rand.Randomness))
+ con.network.BroadcastRandomnessResult(rand)
+ }
return nil
}
@@ -1039,6 +1046,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
}
// Pass delivered blocks to compaction chain.
for _, b := range deliveredBlocks {
+ if b.IsFinalized() {
+ if con.nonFinalizedBlockDelivered {
+ panic(fmt.Errorf("attempting to skip finalized block: %s", b))
+ }
+ con.logger.Info("skip delivery of finalized block",
+ "block", b,
+ "finalization-height", b.Finalization.Height)
+ continue
+ } else {
+ // Mark that some non-finalized block delivered. After this flag
+ // turned on, it's not allowed to deliver finalized blocks anymore.
+ con.nonFinalizedBlockDelivered = true
+ }
if err = con.ccModule.processBlock(b); err != nil {
return
}