aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go286
1 files changed, 87 insertions, 199 deletions
diff --git a/core/consensus.go b/core/consensus.go
index e09ee25..49874d3 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -56,6 +56,8 @@ var (
"incorrect vote position")
ErrIncorrectVoteProposer = fmt.Errorf(
"incorrect vote proposer")
+ ErrCRSNotReady = fmt.Errorf(
+ "CRS not ready")
)
// consensusBAReceiver implements agreementReceiver.
@@ -103,20 +105,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
func (recv *consensusBAReceiver) ConfirmBlock(
hash common.Hash, votes map[types.NodeID]*types.Vote) {
var block *types.Block
- if (hash == common.Hash{}) {
+ isEmptyBlockConfirmed := hash == common.Hash{}
+ if isEmptyBlockConfirmed {
aID := recv.agreementModule.agreementID()
recv.consensus.logger.Info("Empty block is confirmed",
"position", &aID)
var err error
- block, err = recv.consensus.proposeEmptyBlock(recv.chainID)
+ block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID)
if err != nil {
recv.consensus.logger.Error("Propose empty block failed", "error", err)
return
}
} else {
var exist bool
- block, exist = recv.consensus.baModules[recv.chainID].
- findCandidateBlockNoLock(hash)
+ block, exist = recv.agreementModule.findCandidateBlockNoLock(hash)
if !exist {
recv.consensus.logger.Error("Unknown block confirmed",
"hash", hash,
@@ -150,9 +152,10 @@ func (recv *consensusBAReceiver) ConfirmBlock(
voteList = append(voteList, *vote)
}
result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ IsEmptyBlock: isEmptyBlockConfirmed,
}
recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult",
"result", result)
@@ -273,8 +276,7 @@ type Consensus struct {
authModule *Authenticator
// BA.
- baModules []*agreement
- receivers []*consensusBAReceiver
+ baMgr *agreementMgr
baConfirmedBlock map[common.Hash]chan<- *types.Block
// DKG.
@@ -365,49 +367,8 @@ func NewConsensus(
event: common.NewEvent(),
logger: logger,
}
-
- validLeader := func(block *types.Block) (bool, error) {
- if block.Timestamp.After(time.Now()) {
- return false, nil
- }
- if err := lattice.SanityCheck(block); err != nil {
- if err == ErrRetrySanityCheckLater {
- return false, nil
- }
- return false, err
- }
- logger.Debug("Calling Application.VerifyBlock", "block", block)
- switch app.VerifyBlock(block) {
- case types.VerifyInvalidBlock:
- return false, ErrInvalidBlock
- case types.VerifyRetryLater:
- return false, nil
- default:
- }
- return true, nil
- }
-
- con.baModules = make([]*agreement, config.NumChains)
- con.receivers = make([]*consensusBAReceiver, config.NumChains)
- for i := uint32(0); i < config.NumChains; i++ {
- chainID := i
- recv := &consensusBAReceiver{
- consensus: con,
- chainID: chainID,
- restartNotary: make(chan bool, 1),
- }
- agreementModule := newAgreement(
- con.ID,
- recv,
- newLeaderSelector(validLeader, logger),
- con.authModule,
- )
- // Hacky way to make agreement module self contained.
- recv.agreementModule = agreementModule
- recv.changeNotaryTime = dMoment
- con.baModules[chainID] = agreementModule
- con.receivers[chainID] = recv
- }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ con.baMgr = newAgreementMgr(con, dMoment)
return con
}
@@ -420,14 +381,27 @@ func (con *Consensus) Run(initBlock *types.Block) {
con.logger.Debug("Calling Governance.Configuration", "round", initRound)
initConfig := con.gov.Configuration(initRound)
// Setup context.
- con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.ccModule.init(initBlock)
// TODO(jimmy-dexon): change AppendConfig to add config for specific round.
- for i := uint64(0); i <= initRound; i++ {
- con.logger.Debug("Calling Governance.Configuration", "round", i+1)
- cfg := con.gov.Configuration(i + 1)
- if err := con.lattice.AppendConfig(i+1, cfg); err != nil {
- panic(err)
+ for i := uint64(0); i <= initRound+1; i++ {
+ con.logger.Debug("Calling Governance.Configuration", "round", i)
+ cfg := con.gov.Configuration(i)
+ // 0 round is already given to core.Lattice module when constructing.
+ if i > 0 {
+ if err := con.lattice.AppendConfig(i, cfg); err != nil {
+ panic(err)
+ }
+ }
+ // Corresponding CRS might not be ready for next round to initRound.
+ if i < initRound+1 {
+ con.logger.Debug("Calling Governance.CRS", "round", i)
+ crs := con.gov.CRS(i)
+ if (crs == common.Hash{}) {
+ panic(ErrCRSNotReady)
+ }
+ if err := con.baMgr.appendConfig(i, cfg, crs); err != nil {
+ panic(err)
+ }
}
}
dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
@@ -447,103 +421,9 @@ func (con *Consensus) Run(initBlock *types.Block) {
})
}
con.initialRound(con.dMoment, initRound, initConfig)
- ticks := make([]chan struct{}, 0, initConfig.NumChains)
- for i := uint32(0); i < initConfig.NumChains; i++ {
- tick := make(chan struct{})
- ticks = append(ticks, tick)
- // TODO(jimmy-dexon): this is a temporary solution to offset BA time.
- // The complelete solution should be delivered along with config change.
- offset := time.Duration(i*uint32(4)/initConfig.NumChains) *
- initConfig.LambdaBA
- go func(chainID uint32, offset time.Duration) {
- time.Sleep(offset)
- con.runBA(chainID, tick)
- }(i, offset)
- }
-
- // Reset ticker.
- <-con.tickerObj.Tick()
- <-con.tickerObj.Tick()
- for {
- <-con.tickerObj.Tick()
- for _, tick := range ticks {
- select {
- case tick <- struct{}{}:
- default:
- }
- }
- }
-}
-
-func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) {
- // TODO(jimmy-dexon): move this function inside agreement.
- agreement := con.baModules[chainID]
- recv := con.receivers[chainID]
- recv.restartNotary <- true
- nIDs := make(map[types.NodeID]struct{})
- crs := common.Hash{}
- // Reset ticker
- <-tick
-BALoop:
- for {
- select {
- case <-con.ctx.Done():
- break BALoop
- default:
- }
- select {
- case newNotary := <-recv.restartNotary:
- if newNotary {
- con.logger.Debug("Calling Governance.CRS", "round", recv.round)
- crs = con.gov.CRS(recv.round)
- if (crs == common.Hash{}) {
- // Governance is out-of-sync.
- continue BALoop
- }
- configForNewRound := con.gov.Configuration(recv.round)
- recv.changeNotaryTime =
- recv.changeNotaryTime.Add(configForNewRound.RoundInterval)
- nodes, err := con.nodeSetCache.GetNodeSet(recv.round)
- if err != nil {
- panic(err)
- }
- con.logger.Debug("Calling Governance.Configuration",
- "round", recv.round)
- nIDs = nodes.GetSubSet(
- int(configForNewRound.NotarySetSize),
- types.NewNotarySetTarget(crs, chainID))
- }
- nextPos := con.lattice.NextPosition(chainID)
- nextPos.Round = recv.round
- agreement.restart(nIDs, nextPos, crs)
- default:
- }
- if agreement.pullVotes() {
- pos := agreement.agreementID()
- con.logger.Debug("Calling Network.PullVotes for syncing votes",
- "position", &pos)
- con.network.PullVotes(pos)
- }
- err := agreement.nextState()
- if err != nil {
- con.logger.Error("Failed to proceed to next state",
- "nodeID", con.ID.String(),
- "error", err)
- break BALoop
- }
- for i := 0; i < agreement.clocks(); i++ {
- // Priority select for agreement.done().
- select {
- case <-agreement.done():
- continue BALoop
- default:
- }
- select {
- case <-agreement.done():
- continue BALoop
- case <-tick:
- }
- }
+ // Block until done.
+ select {
+ case <-con.ctx.Done():
}
}
@@ -622,6 +502,7 @@ func (con *Consensus) initialRound(
con.logger.Error("Error getting DKG set", "round", round, "error", err)
curDkgSet = make(map[types.NodeID]struct{})
}
+ // Initiate CRS routine.
if _, exist := curDkgSet[con.ID]; exist {
con.event.RegisterTime(startTime.Add(config.RoundInterval/2),
func(time.Time) {
@@ -630,7 +511,31 @@ func (con *Consensus) initialRound(
}()
})
}
-
+ // Initiate BA modules.
+ con.event.RegisterTime(
+ startTime.Add(config.RoundInterval/2+config.LambdaDKG),
+ func(time.Time) {
+ go func(nextRound uint64) {
+ for (con.gov.CRS(nextRound) == common.Hash{}) {
+ con.logger.Info("CRS is not ready yet. Try again later...",
+ "nodeID", con.ID,
+ "round", nextRound)
+ time.Sleep(500 * time.Millisecond)
+ }
+ // Notify BA for new round.
+ con.logger.Debug("Calling Governance.Configuration",
+ "round", nextRound)
+ nextConfig := con.gov.Configuration(nextRound)
+ con.logger.Debug("Calling Governance.CRS",
+ "round", nextRound)
+ nextCRS := con.gov.CRS(nextRound)
+ if err := con.baMgr.appendConfig(
+ nextRound, nextConfig, nextCRS); err != nil {
+ panic(err)
+ }
+ }(round + 1)
+ })
+ // Initiate DKG for this round.
con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG),
func(time.Time) {
go func(nextRound uint64) {
@@ -670,6 +575,7 @@ func (con *Consensus) initialRound(
})
}(round + 1)
})
+ // Prepare lattice module for next round and next "initialRound" routine.
con.event.RegisterTime(startTime.Add(config.RoundInterval),
func(time.Time) {
// Change round.
@@ -685,9 +591,7 @@ func (con *Consensus) initialRound(
// Stop the Consensus core.
func (con *Consensus) Stop() {
- for _, a := range con.baModules {
- a.stop()
- }
+ con.baMgr.stop()
con.event.Reset()
con.ctxCancel()
}
@@ -785,9 +689,10 @@ func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block {
}
func (con *Consensus) proposeEmptyBlock(
- chainID uint32) (*types.Block, error) {
+ round uint64, chainID uint32) (*types.Block, error) {
block := &types.Block{
Position: types.Position{
+ Round: round,
ChainID: chainID,
},
}
@@ -799,15 +704,9 @@ func (con *Consensus) proposeEmptyBlock(
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
- if vote.Position.ChainID >= uint32(len(con.baModules)) {
- return nil
- }
- if isStop(con.baModules[vote.Position.ChainID].agreementID()) {
- return nil
- }
v := vote.Clone()
- err = con.baModules[v.Position.ChainID].processVote(v)
- return err
+ err = con.baMgr.processVote(v)
+ return
}
// ProcessAgreementResult processes the randomness request.
@@ -826,8 +725,14 @@ func (con *Consensus) ProcessAgreementResult(
return ErrIncorrectVoteProposer
}
for _, vote := range rand.Votes {
- if vote.BlockHash != rand.BlockHash {
- return ErrIncorrectVoteBlockHash
+ if rand.IsEmptyBlock {
+ if (vote.BlockHash != common.Hash{}) {
+ return ErrIncorrectVoteBlockHash
+ }
+ } else {
+ if vote.BlockHash != rand.BlockHash {
+ return ErrIncorrectVoteBlockHash
+ }
}
if vote.Type != types.VoteCom {
return ErrIncorrectVoteType
@@ -847,29 +752,8 @@ func (con *Consensus) ProcessAgreementResult(
}
}
// Syncing BA Module.
- agreement := con.baModules[rand.Position.ChainID]
- aID := agreement.agreementID()
- if isStop(aID) {
- return nil
- }
- if rand.Position.Newer(&aID) {
- con.logger.Info("Syncing BA", "position", &rand.Position)
- nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round)
- if err != nil {
- return err
- }
- con.logger.Debug("Calling Network.PullBlocks for syncing BA",
- "hash", rand.BlockHash)
- con.network.PullBlocks(common.Hashes{rand.BlockHash})
- con.logger.Debug("Calling Governance.CRS", "round", rand.Position.Round)
- crs := con.gov.CRS(rand.Position.Round)
- nIDs := nodes.GetSubSet(
- int(con.gov.Configuration(rand.Position.Round).NotarySetSize),
- types.NewNotarySetTarget(crs, rand.Position.ChainID))
- for _, vote := range rand.Votes {
- agreement.processVote(&vote)
- }
- agreement.restart(nIDs, rand.Position, crs)
+ if err := con.baMgr.processAgreementResult(rand); err != nil {
+ return err
}
// Calculating randomness.
if rand.Position.Round == 0 {
@@ -950,9 +834,7 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil {
- return err
- }
+ err = con.baMgr.processBlock(b)
return
}
@@ -1064,9 +946,15 @@ func (con *Consensus) prepareBlock(b *types.Block,
if err = con.lattice.PrepareBlock(b, proposeTime); err != nil {
return
}
- con.logger.Debug("Calling Governance.CRS", "round", 0)
- if err =
- con.authModule.SignCRS(b, con.gov.CRS(b.Position.Round)); err != nil {
+ con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round)
+ crs := con.gov.CRS(b.Position.Round)
+ if crs.Equal(common.Hash{}) {
+ con.logger.Error("CRS for round is not ready, unable to prepare block",
+ "position", &b.Position)
+ err = ErrCRSNotReady
+ return
+ }
+ if err = con.authModule.SignCRS(b, crs); err != nil {
return
}
return