aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-03-16 10:07:08 +0800
committerGitHub <noreply@github.com>2019-03-16 10:07:08 +0800
commit50c9419ba04ed44854fdb1afc1ef2e865be9876f (patch)
tree2acafd84bdb61d54da5621c29c3914986df45b65 /core/syncer/consensus.go
parentb02fa5ee430cff9dafc9d9c399099a88d554a083 (diff)
downloadtangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar
tangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.gz
tangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.bz2
tangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.lz
tangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.xz
tangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.zst
tangerine-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.zip
core, syncer: integrate utils.RoundEvent (#490)
Diffstat (limited to 'core/syncer/consensus.go')
-rw-r--r--core/syncer/consensus.go137
1 files changed, 76 insertions, 61 deletions
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index fd48793..f2f8f9e 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -69,12 +69,14 @@ type Consensus struct {
configs []*types.Config
roundBeginHeights []uint64
agreementRoundCut uint64
+ heightEvt *common.Event
+ roundEvt *utils.RoundEvent
// lock for accessing all fields.
lock sync.RWMutex
duringBuffering bool
latestCRSRound uint64
- moduleWaitGroup sync.WaitGroup
+ waitGroup sync.WaitGroup
agreementWaitGroup sync.WaitGroup
pullChan chan common.Hash
receiveChan chan *types.Block
@@ -116,6 +118,7 @@ func NewConsensus(
receiveChan: make(chan *types.Block, 1000),
pullChan: make(chan common.Hash, 1000),
randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult),
+ heightEvt: common.NewEvent(),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
_, con.initChainTipHeight = db.GetCompactionChainTipInfo()
@@ -143,9 +146,73 @@ func (con *Consensus) assureBuffering() {
return
}
con.duringBuffering = true
+ // Get latest block to prepare utils.RoundEvent.
+ var (
+ err error
+ blockHash, height = con.db.GetCompactionChainTipInfo()
+ )
+ if height == 0 {
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger,
+ uint64(0), uint64(0), uint64(0), core.ConfigRoundShift)
+ } else {
+ var b types.Block
+ if b, err = con.db.GetBlock(blockHash); err == nil {
+ beginHeight := con.roundBeginHeights[b.Position.Round]
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov,
+ con.logger, b.Position.Round, beginHeight, beginHeight,
+ core.ConfigRoundShift)
+ }
+ }
+ if err != nil {
+ panic(err)
+ }
+ // Make sure con.roundEvt stopped before stopping con.agreementModule.
+ con.waitGroup.Add(1)
+ // Register a round event handler to notify CRS to agreementModule.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ con.waitGroup.Add(1)
+ go func() {
+ defer con.waitGroup.Done()
+ for _, e := range evts {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ for func() bool {
+ select {
+ case <-con.ctx.Done():
+ return false
+ case con.agreementModule.inputChan <- e.Round:
+ return false
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Warn(
+ "agreement input channel is full when putting CRS",
+ "round", e.Round,
+ )
+ return true
+ }
+ }() {
+ }
+ }
+ }()
+ })
+ // Register a round event handler to validate next round.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func(
+ blockHeight uint64) {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ con.roundEvt.ValidateNextRound(blockHeight)
+ })
+ })
+ con.roundEvt.TriggerInitEvent()
con.startAgreement()
con.startNetwork()
- con.startCRSMonitor()
}
func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
@@ -265,6 +332,7 @@ func (con *Consensus) SyncBlocks(
b.Hash, b.Finalization.Height); err != nil {
return
}
+ go con.heightEvt.NotifyHeight(b.Finalization.Height)
}
if latest {
con.assureBuffering()
@@ -346,7 +414,10 @@ func (con *Consensus) stopBuffering() {
return
}
con.logger.Trace("stop syncer modules")
- con.moduleWaitGroup.Wait()
+ con.roundEvt.Stop()
+ con.waitGroup.Done()
+ // Wait for all routines depends on con.agreementModule stopped.
+ con.waitGroup.Wait()
// Since there is no one waiting for the receive channel of fullnode, we
// need to launch a dummy receiver right away.
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
@@ -492,9 +563,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
- con.moduleWaitGroup.Add(1)
+ con.waitGroup.Add(1)
go func() {
- defer con.moduleWaitGroup.Done()
+ defer con.waitGroup.Done()
loop:
for {
select {
@@ -522,62 +593,6 @@ func (con *Consensus) startNetwork() {
}()
}
-// startCRSMonitor is the dummiest way to verify if the CRS for one round
-// is ready or not.
-func (con *Consensus) startCRSMonitor() {
- var lastNotifiedRound uint64
- // Notify all agreements for new CRS.
- notifyNewCRS := func(round uint64) {
- con.setupConfigsUntilRound(round)
- if round == lastNotifiedRound {
- return
- }
- con.logger.Debug("CRS is ready", "round", round)
- lastNotifiedRound = round
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- con.latestCRSRound = round
- }()
- for func() bool {
- select {
- case <-con.ctx.Done():
- return false
- case con.agreementModule.inputChan <- round:
- return false
- case <-time.After(500 * time.Millisecond):
- con.logger.Debug(
- "agreement input channel is full when putting CRS",
- "round", round,
- )
- return true
- }
- }() {
- }
- }
- con.moduleWaitGroup.Add(1)
- go func() {
- defer con.moduleWaitGroup.Done()
- for {
- select {
- case <-con.ctx.Done():
- return
- case <-time.After(500 * time.Millisecond):
- }
- // Notify agreement modules for the latest round that CRS is
- // available if the round is not notified yet.
- checked := lastNotifiedRound + 1
- for (con.gov.CRS(checked) != common.Hash{}) {
- checked++
- }
- checked--
- if checked > lastNotifiedRound {
- notifyNewCRS(checked)
- }
- }
- }()
-}
-
func (con *Consensus) stopAgreement() {
if con.agreementModule.inputChan != nil {
close(con.agreementModule.inputChan)