From 809e8def862fdfa792061a448f952747f1af4d3c Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Fri, 11 Jan 2019 12:58:30 +0800 Subject: syncer: fix issues when switching to core.Consensus (#418) - when confirmed blocks passed to core.Consensus aren't continuous in position in some chain, the pulling would skip those missing blocks. - fix: when some block is missing, avoid adding it and all blocks after it to core.Consensus. - we need to avoid the receive channel of network module full. - fix: during switching to core.Consensus, we need to launch a dummy receiver to receive from receive channel of network module. - fix: between the period during core.Consensus created and before running, a dummy receiver is also required to receive from receive channel of network module. --- core/consensus.go | 129 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 10 deletions(-) (limited to 'core/consensus.go') diff --git a/core/consensus.go b/core/consensus.go index ad1efeb..2770018 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -415,6 +415,13 @@ type Consensus struct { nonFinalizedBlockDelivered bool resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} + msgChan chan interface{} + waitGroup sync.WaitGroup + + // Context of Dummy receiver during switching from syncer. + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus construct an Consensus instance. @@ -450,6 +457,9 @@ func NewConsensusForSimulation( // You need to provide the initial block for this newly created Consensus // instance to bootstrap with. A proper choice is the last finalized block you // delivered to syncer. +// +// NOTE: those confirmed blocks should be organized by chainID and sorted by +// their positions, in ascending order. func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginTime time.Time, @@ -459,17 +469,43 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, latticeModule *Lattice, - blocks []*types.Block, + confirmedBlocks [][]*types.Block, randomnessResults []*types.BlockRandomnessResult, + cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, networkModule, prv, logger, latticeModule, true) - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err + // Launch a dummy receiver before we start receiving from network module. + con.dummyMsgBuffer = cachedMessages + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + con.ctx, networkModule.ReceiveChan(), func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + // Dump all BA-confirmed blocks to the consensus instance, make sure these + // added blocks forming a DAG. + for { + updated := false + for idx, bs := range confirmedBlocks { + for bIdx, b := range bs { + // Only when its parent block is already added to lattice, we can + // then add this block. If not, our pulling mechanism would stop at + // the block we added, and lost its parent block forever. + if !latticeModule.Exist(b.ParentHash) { + logger.Debug("Skip discontinuous confirmed block", + "from", b, + "until", bs[len(bs)-1]) + confirmedBlocks[idx] = bs[bIdx:] + break + } + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + } + if !updated { + break } } // Dump all randomness result to the consensus instance. @@ -549,6 +585,7 @@ func newConsensusForRound( logger: logger, resetRandomnessTicker: make(chan struct{}), resetDeliveryGuardTicker: make(chan struct{}), + msgChan: make(chan interface{}, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) @@ -614,12 +651,39 @@ func (con *Consensus) Run() { con.baMgr.run() // Launch network handler. con.logger.Debug("Calling Network.ReceiveChan") - go con.processMsg(con.network.ReceiveChan()) + con.waitGroup.Add(1) + go con.deliverNetworkMsg() + con.waitGroup.Add(1) + go con.processMsg() // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Take some time to bootstrap. time.Sleep(3 * time.Second) + con.waitGroup.Add(1) go con.pullRandomness() + // Stop dummy receiver if launched. + if con.dummyCancel != nil { + con.logger.Trace("Stop dummy receiver") + con.dummyCancel() + <-con.dummyFinished + // Replay those cached messages. + con.logger.Trace("Dummy receiver stoped, start dumping cached messages", + "count", len(con.dummyMsgBuffer)) + for _, msg := range con.dummyMsgBuffer { + loop: + for { + select { + case con.msgChan <- msg: + break loop + case <-time.After(50 * time.Millisecond): + con.logger.Debug( + "internal message channel is full when syncing") + } + } + } + con.logger.Trace("Finish dumping cached messages") + } + con.waitGroup.Add(1) go con.deliveryGuard() // Block until done. select { @@ -815,18 +879,51 @@ func (con *Consensus) Stop() { con.ctxCancel() con.baMgr.stop() con.event.Reset() + con.waitGroup.Wait() +} + +func (con *Consensus) deliverNetworkMsg() { + defer con.waitGroup.Done() + recv := con.network.ReceiveChan() + for { + select { + case <-con.ctx.Done(): + return + default: + } + select { + case msg := <-recv: + innerLoop: + for { + select { + case con.msgChan <- msg: + break innerLoop + case <-time.After(500 * time.Millisecond): + con.logger.Debug("internal message channel is full", + "pending", msg) + } + } + case <-con.ctx.Done(): + return + } + } } -func (con *Consensus) processMsg(msgChan <-chan interface{}) { +func (con *Consensus) processMsg() { + defer con.waitGroup.Done() MessageLoop: for { + select { + case <-con.ctx.Done(): + return + default: + } var msg interface{} select { - case msg = <-msgChan: + case msg = <-con.msgChan: case <-con.ctx.Done(): return } - switch val := msg.(type) { case *types.Block: if ch, exist := func() (chan<- *types.Block, bool) { @@ -1037,7 +1134,13 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { } func (con *Consensus) pullRandomness() { + defer con.waitGroup.Done() for { + select { + case <-con.ctx.Done(): + return + default: + } select { case <-con.ctx.Done(): return @@ -1055,10 +1158,16 @@ func (con *Consensus) pullRandomness() { } func (con *Consensus) deliveryGuard() { + defer con.waitGroup.Done() time.Sleep(con.dMoment.Sub(time.Now())) // Node takes time to start. time.Sleep(60 * time.Second) for { + select { + case <-con.ctx.Done(): + return + default: + } select { case <-con.ctx.Done(): return -- cgit v1.2.3