diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-01-11 12:58:30 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-11 12:58:30 +0800 |
commit | 809e8def862fdfa792061a448f952747f1af4d3c (patch) | |
tree | bd038971e65a09bc9bb399f03a37b194ce67ae3c /core/consensus.go | |
parent | fa25817354d5b7d40f5911004232392acfe7fe53 (diff) | |
download | dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.gz dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.bz2 dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.lz dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.xz dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.zst dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.zip |
syncer: fix issues when switching to core.Consensus (#418)
- when confirmed blocks passed to core.Consensus
aren't continuous in position in some chain, the
pulling would skip those missing blocks.
- fix: when some block is missing, avoid adding it
and all blocks after it to core.Consensus.
- we need to avoid the receive channel of network
module full.
- fix: during switching to core.Consensus, we
need to launch a dummy receiver to receive
from receive channel of network module.
- fix: between the period during core.Consensus
created and before running, a dummy receiver is
also required to receive from receive channel of
network module.
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 129 |
1 files changed, 119 insertions, 10 deletions
diff --git a/core/consensus.go b/core/consensus.go index ad1efeb..2770018 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -415,6 +415,13 @@ type Consensus struct { nonFinalizedBlockDelivered bool resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} + msgChan chan interface{} + waitGroup sync.WaitGroup + + // Context of Dummy receiver during switching from syncer. + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus construct an Consensus instance. @@ -450,6 +457,9 @@ func NewConsensusForSimulation( // You need to provide the initial block for this newly created Consensus // instance to bootstrap with. A proper choice is the last finalized block you // delivered to syncer. +// +// NOTE: those confirmed blocks should be organized by chainID and sorted by +// their positions, in ascending order. func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginTime time.Time, @@ -459,17 +469,43 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, latticeModule *Lattice, - blocks []*types.Block, + confirmedBlocks [][]*types.Block, randomnessResults []*types.BlockRandomnessResult, + cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, networkModule, prv, logger, latticeModule, true) - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err + // Launch a dummy receiver before we start receiving from network module. + con.dummyMsgBuffer = cachedMessages + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + con.ctx, networkModule.ReceiveChan(), func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + // Dump all BA-confirmed blocks to the consensus instance, make sure these + // added blocks forming a DAG. + for { + updated := false + for idx, bs := range confirmedBlocks { + for bIdx, b := range bs { + // Only when its parent block is already added to lattice, we can + // then add this block. If not, our pulling mechanism would stop at + // the block we added, and lost its parent block forever. + if !latticeModule.Exist(b.ParentHash) { + logger.Debug("Skip discontinuous confirmed block", + "from", b, + "until", bs[len(bs)-1]) + confirmedBlocks[idx] = bs[bIdx:] + break + } + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + } + if !updated { + break } } // Dump all randomness result to the consensus instance. @@ -549,6 +585,7 @@ func newConsensusForRound( logger: logger, resetRandomnessTicker: make(chan struct{}), resetDeliveryGuardTicker: make(chan struct{}), + msgChan: make(chan interface{}, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) @@ -614,12 +651,39 @@ func (con *Consensus) Run() { con.baMgr.run() // Launch network handler. con.logger.Debug("Calling Network.ReceiveChan") - go con.processMsg(con.network.ReceiveChan()) + con.waitGroup.Add(1) + go con.deliverNetworkMsg() + con.waitGroup.Add(1) + go con.processMsg() // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Take some time to bootstrap. time.Sleep(3 * time.Second) + con.waitGroup.Add(1) go con.pullRandomness() + // Stop dummy receiver if launched. + if con.dummyCancel != nil { + con.logger.Trace("Stop dummy receiver") + con.dummyCancel() + <-con.dummyFinished + // Replay those cached messages. + con.logger.Trace("Dummy receiver stoped, start dumping cached messages", + "count", len(con.dummyMsgBuffer)) + for _, msg := range con.dummyMsgBuffer { + loop: + for { + select { + case con.msgChan <- msg: + break loop + case <-time.After(50 * time.Millisecond): + con.logger.Debug( + "internal message channel is full when syncing") + } + } + } + con.logger.Trace("Finish dumping cached messages") + } + con.waitGroup.Add(1) go con.deliveryGuard() // Block until done. select { @@ -815,18 +879,51 @@ func (con *Consensus) Stop() { con.ctxCancel() con.baMgr.stop() con.event.Reset() + con.waitGroup.Wait() +} + +func (con *Consensus) deliverNetworkMsg() { + defer con.waitGroup.Done() + recv := con.network.ReceiveChan() + for { + select { + case <-con.ctx.Done(): + return + default: + } + select { + case msg := <-recv: + innerLoop: + for { + select { + case con.msgChan <- msg: + break innerLoop + case <-time.After(500 * time.Millisecond): + con.logger.Debug("internal message channel is full", + "pending", msg) + } + } + case <-con.ctx.Done(): + return + } + } } -func (con *Consensus) processMsg(msgChan <-chan interface{}) { +func (con *Consensus) processMsg() { + defer con.waitGroup.Done() MessageLoop: for { + select { + case <-con.ctx.Done(): + return + default: + } var msg interface{} select { - case msg = <-msgChan: + case msg = <-con.msgChan: case <-con.ctx.Done(): return } - switch val := msg.(type) { case *types.Block: if ch, exist := func() (chan<- *types.Block, bool) { @@ -1037,10 +1134,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { } func (con *Consensus) pullRandomness() { + defer con.waitGroup.Done() for { select { case <-con.ctx.Done(): return + default: + } + select { + case <-con.ctx.Done(): + return case <-con.resetRandomnessTicker: case <-time.After(1500 * time.Millisecond): // TODO(jimmy): pulling period should be related to lambdaBA. @@ -1055,6 +1158,7 @@ func (con *Consensus) pullRandomness() { } func (con *Consensus) deliveryGuard() { + defer con.waitGroup.Done() time.Sleep(con.dMoment.Sub(time.Now())) // Node takes time to start. time.Sleep(60 * time.Second) @@ -1062,6 +1166,11 @@ func (con *Consensus) deliveryGuard() { select { case <-con.ctx.Done(): return + default: + } + select { + case <-con.ctx.Done(): + return case <-con.resetDeliveryGuardTicker: case <-time.After(60 * time.Second): con.logger.Error("no blocks delivered for too long", "ID", con.ID) |