aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go199
1 files changed, 126 insertions, 73 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index bfe893cd5..a6d80371d 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -25,8 +25,8 @@ import (
"time"
"github.com/dexon-foundation/dexon-consensus/common"
- "github.com/dexon-foundation/dexon-consensus/core/blockdb"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus/core/db"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
"github.com/dexon-foundation/dexon-consensus/core/utils"
@@ -70,17 +70,23 @@ type consensusBAReceiver struct {
chainID uint32
changeNotaryTime time.Time
round uint64
- restartNotary chan bool
+ isNotary bool
+ restartNotary chan types.Position
}
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
+ if !recv.isNotary {
+ return
+ }
if err := recv.agreementModule.prepareVote(vote); err != nil {
recv.consensus.logger.Error("Failed to prepare vote", "error", err)
return
}
go func() {
if err := recv.agreementModule.processVote(vote); err != nil {
- recv.consensus.logger.Error("Failed to process vote", "error", err)
+ recv.consensus.logger.Error("Failed to process self vote",
+ "error", err,
+ "vote", vote)
return
}
recv.consensus.logger.Debug("Calling Network.BroadcastVote",
@@ -90,6 +96,9 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
}
func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
+ if !recv.isNotary {
+ return common.Hash{}
+ }
block := recv.consensus.proposeBlock(recv.chainID, recv.round)
if block == nil {
recv.consensus.logger.Error("unable to propose block")
@@ -123,7 +132,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block, exist = recv.agreementModule.findCandidateBlockNoLock(hash)
if !exist {
recv.consensus.logger.Error("Unknown block confirmed",
- "hash", hash,
+ "hash", hash.String()[:6],
"chainID", recv.chainID)
ch := make(chan *types.Block)
func() {
@@ -135,7 +144,8 @@ func (recv *consensusBAReceiver) ConfirmBlock(
go func() {
block = <-ch
recv.consensus.logger.Info("Receive unknown block",
- "hash", hash,
+ "hash", hash.String()[:6],
+ "position", &block.Position,
"chainID", recv.chainID)
recv.agreementModule.addCandidateBlock(block)
recv.agreementModule.lock.Lock()
@@ -152,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock(
parentHash := hash
for {
recv.consensus.logger.Warn("Parent block not confirmed",
- "hash", parentHash,
- "chainID", recv.chainID)
+ "parent-hash", parentHash.String()[:6],
+ "cur-position", &block.Position)
ch := make(chan *types.Block)
if !func() bool {
recv.consensus.lock.Lock()
@@ -179,11 +189,14 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
recv.consensus.logger.Info("Receive parent block",
- "hash", block.ParentHash,
+ "parent-hash", block.ParentHash.String()[:6],
+ "cur-position", &block.Position,
"chainID", recv.chainID)
recv.consensus.ccModule.registerBlock(block)
if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block", "error", err)
+ recv.consensus.logger.Error("Failed to process block",
+ "block", block,
+ "error", err)
return
}
parentHash = block.ParentHash
@@ -194,24 +207,28 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}(block.ParentHash)
}
- voteList := make([]types.Vote, 0, len(votes))
- for _, vote := range votes {
- if vote.BlockHash != hash {
- continue
+ if recv.isNotary {
+ voteList := make([]types.Vote, 0, len(votes))
+ for _, vote := range votes {
+ if vote.BlockHash != hash {
+ continue
+ }
+ voteList = append(voteList, *vote)
}
- voteList = append(voteList, *vote)
- }
- result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
- IsEmptyBlock: isEmptyBlockConfirmed,
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ }
+ recv.consensus.logger.Debug("Propose AgreementResult",
+ "result", result)
+ recv.consensus.network.BroadcastAgreementResult(result)
}
- recv.consensus.logger.Debug("Propose AgreementResult",
- "result", result)
- recv.consensus.network.BroadcastAgreementResult(result)
if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block", "error", err)
+ recv.consensus.logger.Error("Failed to process block",
+ "block", block,
+ "error", err)
return
}
// Clean the restartNotary channel so BA will not stuck by deadlock.
@@ -223,15 +240,18 @@ CleanChannelLoop:
break CleanChannelLoop
}
}
+ newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
recv.round++
- recv.restartNotary <- true
- } else {
- recv.restartNotary <- false
+ newPos.Round++
}
+ recv.restartNotary <- newPos
}
func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
+ if !recv.isNotary {
+ return
+ }
recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes)
recv.consensus.network.PullBlocks(hashes)
}
@@ -312,7 +332,7 @@ func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint(
// ProposeDKGFinalize propose a DKGFinalize message.
func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) {
if err := recv.authModule.SignDKGFinalize(final); err != nil {
- recv.logger.Error("Faield to sign DKG finalize", "error", err)
+ recv.logger.Error("Failed to sign DKG finalize", "error", err)
return
}
recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final)
@@ -340,21 +360,23 @@ type Consensus struct {
toSyncer *totalOrderingSyncer
// Interfaces.
- db blockdb.BlockDatabase
- app Application
- gov Governance
- network Network
+ db db.Database
+ app Application
+ debugApp Debug
+ gov Governance
+ network Network
// Misc.
- dMoment time.Time
- nodeSetCache *utils.NodeSetCache
- round uint64
- roundToNotify uint64
- lock sync.RWMutex
- ctx context.Context
- ctxCancel context.CancelFunc
- event *common.Event
- logger common.Logger
+ dMoment time.Time
+ nodeSetCache *utils.NodeSetCache
+ round uint64
+ roundToNotify uint64
+ lock sync.RWMutex
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ event *common.Event
+ logger common.Logger
+ nonFinalizedBlockDelivered bool
}
// NewConsensus construct an Consensus instance.
@@ -362,7 +384,7 @@ func NewConsensus(
dMoment time.Time,
app Application,
gov Governance,
- db blockdb.BlockDatabase,
+ db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
@@ -372,7 +394,10 @@ func NewConsensus(
// Setup auth module.
authModule := NewAuthenticator(prv)
// Check if the application implement Debug interface.
- debugApp, _ := app.(Debug)
+ var debugApp Debug
+ if a, ok := app.(Debug); ok {
+ debugApp = a
+ }
// Get configuration for genesis round.
var round uint64
logger.Debug("Calling Governance.Configuration", "round", round)
@@ -407,6 +432,7 @@ func NewConsensus(
ccModule: newCompactionChain(gov),
lattice: lattice,
app: newNonBlocking(app, debugApp),
+ debugApp: debugApp,
gov: gov,
db: db,
network: network,
@@ -438,7 +464,7 @@ func NewConsensusFromSyncer(
initRoundBeginTime time.Time,
app Application,
gov Governance,
- db blockdb.BlockDatabase,
+ db db.Database,
networkModule Network,
prv crypto.PrivateKey,
latticeModule *Lattice,
@@ -492,7 +518,6 @@ func NewConsensusFromSyncer(
}
// Dump all BA-confirmed blocks to the consensus instance.
for _, b := range blocks {
- con.app.BlockConfirmed(*b)
con.ccModule.registerBlock(b)
if err := con.processBlock(b); err != nil {
return nil, err
@@ -500,7 +525,7 @@ func NewConsensusFromSyncer(
}
// Dump all randomness result to the consensus instance.
for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r); err != nil {
+ if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
con.logger.Error("failed to process randomness result when syncing",
"result", r)
continue
@@ -605,21 +630,22 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) {
}
func (con *Consensus) runCRS(round uint64) {
- con.logger.Debug("Calling Governance.CRS to check if already proposed",
- "round", round+1)
- if (con.gov.CRS(round+1) != common.Hash{}) {
- con.logger.Info("CRS already proposed", "round", round+1)
- return
- }
- con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
- "round", round)
- for !con.gov.IsDKGFinal(round) {
+ for {
+ con.logger.Debug("Calling Governance.CRS to check if already proposed",
+ "round", round+1)
+ if (con.gov.CRS(round+1) != common.Hash{}) {
+ con.logger.Info("CRS already proposed", "round", round+1)
+ return
+ }
+ con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
+ "round", round)
+ if con.cfgModule.isDKGReady(round) {
+ break
+ }
con.logger.Debug("DKG is not ready for running CRS. Retry later...",
"round", round)
time.Sleep(500 * time.Millisecond)
}
- // Wait some time for DKG to recover private share.
- time.Sleep(100 * time.Millisecond)
// Start running next round CRS.
con.logger.Debug("Calling Governance.CRS", "round", round)
psig, err := con.cfgModule.preparePartialSignature(round, con.gov.CRS(round))
@@ -794,26 +820,30 @@ MessageLoop:
// For sync mode.
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
+ "block", val,
"error", err)
}
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
+ "block", val,
"error", err)
}
}
case *types.Vote:
if err := con.ProcessVote(val); err != nil {
con.logger.Error("Failed to process vote",
+ "vote", val,
"error", err)
}
case *types.AgreementResult:
if err := con.ProcessAgreementResult(val); err != nil {
con.logger.Error("Failed to process agreement result",
+ "result", val,
"error", err)
}
case *types.BlockRandomnessResult:
- if err := con.ProcessBlockRandomnessResult(val); err != nil {
+ if err := con.ProcessBlockRandomnessResult(val, true); err != nil {
con.logger.Error("Failed to process block randomness result",
"hash", val.BlockHash.String()[:6],
"position", &val.Position,
@@ -884,9 +914,7 @@ func (con *Consensus) ProcessAgreementResult(
if rand.Position.Round == 0 {
return nil
}
- if !con.ccModule.blockRegistered(rand.BlockHash) {
- return nil
- }
+ // TODO(mission): find a way to avoid spamming by older agreement results.
// Sanity check done.
if !con.cfgModule.touchTSigHash(rand.BlockHash) {
return nil
@@ -914,13 +942,16 @@ func (con *Consensus) ProcessAgreementResult(
con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
"proposer", psig.ProposerID,
"round", psig.Round,
- "hash", psig.Hash)
+ "hash", psig.Hash.String()[:6])
con.network.BroadcastDKGPartialSignature(psig)
go func() {
tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
if err != nil {
if err != ErrTSigAlreadyRunning {
- con.logger.Error("Faield to run TSIG", "error", err)
+ con.logger.Error("Failed to run TSIG",
+ "position", &rand.Position,
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
}
return
}
@@ -929,7 +960,7 @@ func (con *Consensus) ProcessAgreementResult(
Position: rand.Position,
Randomness: tsig.Signature,
}
- if err := con.ProcessBlockRandomnessResult(result); err != nil {
+ if err := con.ProcessBlockRandomnessResult(result, true); err != nil {
con.logger.Error("Failed to process randomness result",
"error", err)
return
@@ -940,27 +971,33 @@ func (con *Consensus) ProcessAgreementResult(
// ProcessBlockRandomnessResult processes the randomness result.
func (con *Consensus) ProcessBlockRandomnessResult(
- rand *types.BlockRandomnessResult) error {
+ rand *types.BlockRandomnessResult, needBroadcast bool) error {
if rand.Position.Round == 0 {
return nil
}
if err := con.ccModule.processBlockRandomnessResult(rand); err != nil {
if err == ErrBlockNotRegistered {
err = nil
+ } else {
+ return err
}
- return err
}
- con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "hash", rand.BlockHash,
- "position", &rand.Position,
- "randomness", hex.EncodeToString(rand.Randomness))
- con.network.BroadcastRandomnessResult(rand)
+ if needBroadcast {
+ con.logger.Debug("Calling Network.BroadcastRandomnessResult",
+ "hash", rand.BlockHash.String()[:6],
+ "position", &rand.Position,
+ "randomness", hex.EncodeToString(rand.Randomness))
+ con.network.BroadcastRandomnessResult(rand)
+ }
return nil
}
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
err = con.baMgr.processBlock(b)
+ if err == nil && con.debugApp != nil {
+ con.debugApp.BlockReceived(b.Hash)
+ }
return
}
@@ -999,7 +1036,7 @@ func (con *Consensus) deliverBlock(b *types.Block) {
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
- if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists {
+ if err = con.db.PutBlock(*block); err != nil && err != db.ErrBlockExists {
return
}
con.lock.Lock()
@@ -1012,6 +1049,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
}
// Pass delivered blocks to compaction chain.
for _, b := range deliveredBlocks {
+ if b.IsFinalized() {
+ if con.nonFinalizedBlockDelivered {
+ panic(fmt.Errorf("attempting to skip finalized block: %s", b))
+ }
+ con.logger.Info("skip delivery of finalized block",
+ "block", b,
+ "finalization-height", b.Finalization.Height)
+ continue
+ } else {
+ // Mark that some non-finalized block delivered. After this flag
+ // turned on, it's not allowed to deliver finalized blocks anymore.
+ con.nonFinalizedBlockDelivered = true
+ }
if err = con.ccModule.processBlock(b); err != nil {
return
}
@@ -1022,11 +1072,14 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
"delivered", con.ccModule.lastDeliveredBlock(),
"pending", con.ccModule.lastPendingBlock())
for _, b := range deliveredBlocks {
- if err = con.db.Update(*b); err != nil {
+ if err = con.db.UpdateBlock(*b); err != nil {
panic(err)
}
con.cfgModule.untouchTSigHash(b.Hash)
con.deliverBlock(b)
+ if con.debugApp != nil {
+ con.debugApp.BlockReady(b.Hash)
+ }
}
if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil {
return