From bdd9130c05828a158cb8088eec3561f8b1d0f956 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Tue, 12 Mar 2019 11:51:26 +0800 Subject: core/syncer: fix syncer deadlock (#479) * Fix dead lock * core/syncer: prevent selecting on a nil channel * Remove unnecessary reader lock holding --- core/syncer/consensus.go | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) (limited to 'core') diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 3053396..25911ce 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -250,11 +250,6 @@ func (con *Consensus) SyncBlocks( // syncing is done. if con.checkIfSynced(blocks) { con.stopBuffering() - con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( - context.Background(), con.network.ReceiveChan(), - func(msg interface{}) { - con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) - }) con.syncedLastBlock = blocks[len(blocks)-1] synced = true } @@ -302,24 +297,38 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { // This method is mainly for caller to stop the syncer before synced, the syncer // would call this method automatically after being synced. func (con *Consensus) stopBuffering() { - if func() bool { + if func() (notBuffering bool) { con.lock.RLock() defer con.lock.RUnlock() - return !con.duringBuffering + notBuffering = !con.duringBuffering + return }() { return } - con.lock.Lock() - defer con.lock.Unlock() - if !con.duringBuffering { + if func() (alreadyCanceled bool) { + con.lock.Lock() + defer con.lock.Unlock() + if !con.duringBuffering { + alreadyCanceled = true + return + } + con.duringBuffering = false + con.logger.Trace("syncer is about to stop") + // Stop network and CRS routines, wait until they are all stoped. + con.ctxCancel() + return + }() { return } - con.duringBuffering = false - con.logger.Trace("syncer is about to stop") - // Stop network and CRS routines, wait until they are all stoped. - con.ctxCancel() con.logger.Trace("stop syncer modules") con.moduleWaitGroup.Wait() + // Since there is no one waiting for the receive channel of fullnode, we + // need to launch a dummy receiver right away. + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) // Stop agreements. con.logger.Trace("stop syncer agreement modules") con.stopAgreement() @@ -506,11 +515,6 @@ func (con *Consensus) startCRSMonitor() { con.latestCRSRound = round }() for func() bool { - con.lock.RLock() - defer con.lock.RUnlock() - if !con.duringBuffering { - return false - } select { case <-con.ctx.Done(): return false @@ -552,9 +556,9 @@ func (con *Consensus) startCRSMonitor() { func (con *Consensus) stopAgreement() { if con.agreementModule.inputChan != nil { close(con.agreementModule.inputChan) - con.agreementModule.inputChan = nil } con.agreementWaitGroup.Wait() + con.agreementModule.inputChan = nil close(con.receiveChan) close(con.pullChan) } -- cgit v1.2.3