aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-02-20 12:53:18 +0800
committerGitHub <noreply@github.com>2019-02-20 12:53:18 +0800
commit8ef4fc213703620fbfa13890dee042d40eea8545 (patch)
treeba9a07d2423314396e5677b7294122caa505ae9a /core/consensus.go
parent2cf18fd299ea0fc270b213343314cab652cac271 (diff)
downloadtangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.tar
tangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.tar.gz
tangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.tar.bz2
tangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.tar.lz
tangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.tar.xz
tangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.tar.zst
tangerine-consensus-8ef4fc213703620fbfa13890dee042d40eea8545.zip
core: switch round by block height (#450)
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go224
1 files changed, 111 insertions, 113 deletions
diff --git a/core/consensus.go b/core/consensus.go
index f4c0a37..b3fd312 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -58,12 +58,12 @@ var (
// consensusBAReceiver implements agreementReceiver.
type consensusBAReceiver struct {
// TODO(mission): consensus would be replaced by blockChain and network.
- consensus *Consensus
- agreementModule *agreement
- changeNotaryTime time.Time
- roundValue *atomic.Value
- isNotary bool
- restartNotary chan types.Position
+ consensus *Consensus
+ agreementModule *agreement
+ changeNotaryHeight uint64
+ roundValue *atomic.Value
+ isNotary bool
+ restartNotary chan types.Position
}
func (recv *consensusBAReceiver) round() uint64 {
@@ -241,10 +241,17 @@ CleanChannelLoop:
}
}
newPos := block.Position
- if block.Timestamp.After(recv.changeNotaryTime) {
+ if block.Position.Height+1 == recv.changeNotaryHeight {
newPos.Round++
recv.roundValue.Store(newPos.Round)
}
+ currentRound := recv.round()
+ if block.Position.Height > recv.changeNotaryHeight &&
+ block.Position.Round <= currentRound {
+ panic(fmt.Errorf(
+ "round not switch when confirmig: %s, %d, should switch at %d",
+ block, currentRound, recv.changeNotaryHeight))
+ }
recv.restartNotary <- newPos
}
@@ -383,7 +390,6 @@ type Consensus struct {
bcModule *blockChain
dMoment time.Time
nodeSetCache *utils.NodeSetCache
- round uint64
roundForNewConfig uint64
lock sync.RWMutex
ctx context.Context
@@ -412,7 +418,7 @@ func NewConsensus(
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
return newConsensusForRound(
- nil, dMoment, app, gov, db, network, prv, logger, true)
+ nil, 0, dMoment, app, gov, db, network, prv, logger, true)
}
// NewConsensusForSimulation creates an instance of Consensus for simulation,
@@ -426,7 +432,7 @@ func NewConsensusForSimulation(
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
return newConsensusForRound(
- nil, dMoment, app, gov, db, network, prv, logger, false)
+ nil, 0, dMoment, app, gov, db, network, prv, logger, false)
}
// NewConsensusFromSyncer constructs an Consensus instance from information
@@ -440,7 +446,8 @@ func NewConsensusForSimulation(
// their positions, in ascending order.
func NewConsensusFromSyncer(
initBlock *types.Block,
- initRoundBeginTime time.Time,
+ initRoundBeginHeight uint64,
+ dMoment time.Time,
app Application,
gov Governance,
db db.Database,
@@ -451,8 +458,8 @@ func NewConsensusFromSyncer(
cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
- con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
- networkModule, prv, logger, true)
+ con := newConsensusForRound(initBlock, initRoundBeginHeight, dMoment, app,
+ gov, db, networkModule, prv, logger, true)
// Launch a dummy receiver before we start receiving from network module.
con.dummyMsgBuffer = cachedMessages
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
@@ -486,9 +493,11 @@ func NewConsensusFromSyncer(
}
// newConsensusForRound creates a Consensus instance.
+// TODO(mission): remove dMoment, it's no longer one part of consensus.
func newConsensusForRound(
initBlock *types.Block,
- initRoundBeginTime time.Time,
+ initRoundBeginHeight uint64,
+ dMoment time.Time,
app Application,
gov Governance,
db db.Database,
@@ -511,6 +520,7 @@ func newConsensusForRound(
initRound = initBlock.Position.Round
}
initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
+ initCRS := utils.GetCRSWithPanic(gov, initRound, logger)
// Init configuration chain.
ID := types.NewNodeID(prv.PublicKey())
recv := &consensusDKGReceiver{
@@ -529,8 +539,8 @@ func newConsensusForRound(
}
bcConfig := blockChainConfig{}
bcConfig.fromConfig(initRound, initConfig)
- bcConfig.setRoundBeginTime(initRoundBeginTime)
- bcModule := newBlockChain(ID, initBlock, bcConfig, appModule,
+ bcConfig.setRoundBeginHeight(initRoundBeginHeight)
+ bcModule := newBlockChain(ID, dMoment, initBlock, bcConfig, appModule,
NewTSigVerifierCache(gov, 7), signer, logger)
// Construct Consensus instance.
con := &Consensus{
@@ -544,7 +554,7 @@ func newConsensusForRound(
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
bcModule: bcModule,
- dMoment: initRoundBeginTime,
+ dMoment: dMoment,
nodeSetCache: nodeSetCache,
signer: signer,
event: common.NewEvent(),
@@ -555,8 +565,15 @@ func newConsensusForRound(
processBlockChan: make(chan *types.Block, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
- if err := con.prepare(initBlock); err != nil {
+ baConfig := agreementMgrConfig{}
+ baConfig.from(initRound, initConfig, initCRS)
+ baConfig.setRoundBeginHeight(initRoundBeginHeight)
+ var err error
+ con.baMgr, err = newAgreementMgr(con, initRound, baConfig)
+ if err != nil {
+ panic(err)
+ }
+ if err = con.prepare(initRoundBeginHeight, initBlock); err != nil {
panic(err)
}
return con
@@ -566,26 +583,17 @@ func newConsensusForRound(
// 'initBlock' could be either:
// - nil
// - the last finalized block
-func (con *Consensus) prepare(initBlock *types.Block) (err error) {
+func (con *Consensus) prepare(
+ initRoundBeginHeight uint64, initBlock *types.Block) (err error) {
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
initRound := uint64(0)
if initBlock != nil {
initRound = initBlock.Position.Round
}
+ // Setup blockChain module.
con.roundForNewConfig = initRound + 1
initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger)
- // Setup context.
- con.logger.Debug("Calling Governance.CRS", "round", initRound)
- initCRS := con.gov.CRS(initRound)
- if (initCRS == common.Hash{}) {
- err = ErrCRSNotReady
- return
- }
- if err = con.baMgr.appendConfig(initRound, initConfig, initCRS); err != nil {
- return
- }
- // Setup blockChain module.
initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger)
if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil {
return
@@ -604,15 +612,16 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// DKG would success. Three is a magic number.
time.Sleep(initConfig.MinBlockInterval * 3)
con.cfgModule.registerDKG(initRound, getDKGThreshold(initConfig))
- con.event.RegisterTime(con.dMoment.Add(initConfig.RoundInterval/4),
- func(time.Time) {
+ con.event.RegisterHeight(
+ initConfig.RoundInterval/4,
+ func(uint64) {
con.runDKG(initRound, initConfig)
})
}()
}
}
// Register events.
- con.initialRound(con.dMoment, initRound, initConfig)
+ con.initialRound(initRoundBeginHeight, initRound, initConfig)
return
}
@@ -672,18 +681,11 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) {
}
con.dkgRunning = 1
go func() {
- startTime := time.Now().UTC()
defer func() {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
con.dkgReady.Broadcast()
con.dkgRunning = 2
- DKGTime := time.Now().Sub(startTime)
- if DKGTime.Nanoseconds() >=
- config.RoundInterval.Nanoseconds()/2 {
- con.logger.Warn("Your computer cannot finish DKG on time!",
- "nodeID", con.ID.String())
- }
}()
if err := con.cfgModule.runDKG(round); err != nil {
con.logger.Error("Failed to runDKG", "error", err)
@@ -739,7 +741,7 @@ func (con *Consensus) runCRS(round uint64) {
}
func (con *Consensus) initialRound(
- startTime time.Time, round uint64, config *types.Config) {
+ startHeight uint64, round uint64, config *types.Config) {
select {
case <-con.ctx.Done():
return
@@ -752,8 +754,9 @@ func (con *Consensus) initialRound(
}
// Initiate CRS routine.
if _, exist := curDkgSet[con.ID]; exist {
- con.event.RegisterTime(startTime.Add(config.RoundInterval/2),
- func(time.Time) {
+ con.event.RegisterHeight(
+ startHeight+config.RoundInterval/2,
+ func(uint64) {
go func() {
con.runCRS(round)
}()
@@ -774,76 +777,71 @@ func (con *Consensus) initialRound(
}
}
// Initiate BA modules.
- con.event.RegisterTime(
- startTime.Add(config.RoundInterval/2+config.LambdaDKG),
- func(time.Time) {
- go func(nextRound uint64) {
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for baMgr",
- "round", nextRound)
- return
- }
- // Notify BA for new round.
- nextConfig := utils.GetConfigWithPanic(
- con.gov, nextRound, con.logger)
- nextCRS := utils.GetCRSWithPanic(
- con.gov, nextRound, con.logger)
- con.logger.Info("appendConfig for baMgr", "round", nextRound)
- if err := con.baMgr.appendConfig(
- nextRound, nextConfig, nextCRS); err != nil {
- panic(err)
- }
- }(round + 1)
- })
+ con.event.RegisterHeight(startHeight+config.RoundInterval/2, func(uint64) {
+ go func(nextRound uint64) {
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Debug("unable to prepare CRS for baMgr",
+ "round", nextRound)
+ return
+ }
+ // Notify BA for new round.
+ nextConfig := utils.GetConfigWithPanic(
+ con.gov, nextRound, con.logger)
+ nextCRS := utils.GetCRSWithPanic(
+ con.gov, nextRound, con.logger)
+ con.logger.Info("appendConfig for baMgr", "round", nextRound)
+ if err := con.baMgr.appendConfig(
+ nextRound, nextConfig, nextCRS); err != nil {
+ panic(err)
+ }
+ }(round + 1)
+ })
// Initiate DKG for this round.
- con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG),
- func(time.Time) {
- go func(nextRound uint64) {
- // Normally, gov.CRS would return non-nil. Use this for in case of
- // unexpected network fluctuation and ensure the robustness.
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for DKG set",
- "round", nextRound)
- return
- }
- nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
- if err != nil {
- con.logger.Error("Error getting DKG set",
- "round", nextRound,
- "error", err)
- return
- }
- if _, exist := nextDkgSet[con.ID]; !exist {
- return
- }
- con.logger.Info("Selected as DKG set", "round", nextRound)
- con.cfgModule.registerDKG(nextRound, getDKGThreshold(config))
- con.event.RegisterTime(
- startTime.Add(config.RoundInterval*2/3),
- func(time.Time) {
- func() {
- con.dkgReady.L.Lock()
- defer con.dkgReady.L.Unlock()
- con.dkgRunning = 0
- }()
- nextConfig := utils.GetConfigWithPanic(
- con.gov, nextRound, con.logger)
- con.runDKG(nextRound, nextConfig)
- })
- }(round + 1)
- })
+ con.event.RegisterHeight(startHeight+config.RoundInterval/2, func(uint64) {
+ go func(nextRound uint64) {
+ // Normally, gov.CRS would return non-nil. Use this for in case of
+ // unexpected network fluctuation and ensure the robustness.
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Debug("unable to prepare CRS for DKG set",
+ "round", nextRound)
+ return
+ }
+ nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
+ if err != nil {
+ con.logger.Error("Error getting DKG set",
+ "round", nextRound,
+ "error", err)
+ return
+ }
+ if _, exist := nextDkgSet[con.ID]; !exist {
+ return
+ }
+ con.logger.Info("Selected as DKG set", "round", nextRound)
+ con.cfgModule.registerDKG(nextRound, getDKGThreshold(config))
+ con.event.RegisterHeight(startHeight+config.RoundInterval*2/3,
+ func(uint64) {
+ func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgRunning = 0
+ }()
+ nextConfig := utils.GetConfigWithPanic(
+ con.gov, nextRound, con.logger)
+ con.runDKG(nextRound, nextConfig)
+ })
+ }(round + 1)
+ })
// Prepare blockChain module for next round and next "initialRound" routine.
- con.event.RegisterTime(startTime.Add(config.RoundInterval),
- func(time.Time) {
- // Change round.
- // Get configuration for next round.
- nextRound := round + 1
- nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger)
- con.initialRound(
- startTime.Add(config.RoundInterval), nextRound, nextConfig)
- })
+ con.event.RegisterHeight(startHeight+config.RoundInterval, func(uint64) {
+ // Change round.
+ // Get configuration for next round.
+ nextRound := round + 1
+ nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger)
+ con.initialRound(
+ startHeight+config.RoundInterval, nextRound, nextConfig)
+ })
}
// Stop the Consensus core.
@@ -1194,7 +1192,7 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
"pending", con.bcModule.lastPendingBlock())
for _, b := range deliveredBlocks {
con.deliverBlock(b)
- go con.event.NotifyTime(b.Finalization.Timestamp)
+ go con.event.NotifyHeight(b.Finalization.Height)
}
return
}