diff options
author | Felix Lange <fjl@twurst.com> | 2016-03-29 09:08:16 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-05-09 19:03:08 +0800 |
commit | 56ed6152a11592d20220daf6322e94a009e6236d (patch) | |
tree | 0a0d5985832e32fdd1d9c3dc1deff89a85811099 /eth/handler.go | |
parent | f821b0188a27bca08cada87c5b746ef9455a2e96 (diff) | |
download | go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.tar go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.tar.gz go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.tar.bz2 go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.tar.lz go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.tar.xz go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.tar.zst go-tangerine-56ed6152a11592d20220daf6322e94a009e6236d.zip |
core, eth, miner: improve shutdown synchronisation
Shutting down geth prints hundreds of annoying error messages in some
cases. The errors appear because the Stop method of eth.ProtocolManager,
miner.Miner and core.TxPool is asynchronous. Left over peer sessions
generate events which are processed after Stop even though the database
has already been closed.
The fix is to make Stop synchronous using sync.WaitGroup.
For eth.ProtocolManager, in order to make use of WaitGroup safe, we need
a way to stop new peer sessions from being added while waiting on the
WaitGroup. The eth protocol Run function now selects on a signaling
channel and adds to the WaitGroup only if ProtocolManager is not
shutting down.
For miner.worker and core.TxPool the number of goroutines is static,
WaitGroup can be used in the usual way without additional
synchronisation.
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 60 |
1 files changed, 38 insertions, 22 deletions
diff --git a/eth/handler.go b/eth/handler.go index d6b474a91..3980a625e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -74,14 +74,14 @@ type ProtocolManager struct { minedBlockSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - quitSync chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} // wait group is used for graceful shutdowns during downloading // and processing - wg sync.WaitGroup - quit bool + wg sync.WaitGroup } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, } // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkId: networkId, - fastSync: fastSync, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chaindb: chaindb, - peers: newPeerSet(), - newPeerCh: make(chan *peer, 1), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkId: networkId, + fastSync: fastSync, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + chaindb: chaindb, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) @@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) + select { + case manager.newPeerCh <- peer: + manager.wg.Add(1) + defer manager.wg.Done() + return manager.handle(peer) + case <-manager.quitSync: + return p2p.DiscQuitting + } }, NodeInfo: func() interface{} { return manager.NodeInfo() @@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() { } func (pm *ProtocolManager) Stop() { - // Showing a log message. During download / process this could actually - // take between 5 to 10 seconds and therefor feedback is required. glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...") - pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits syncer, fetcher, txsyncLoop - // Wait for any process action + // Quit the sync loop. + // After this send has completed, no new peers will be accepted. + pm.noMorePeers <- struct{}{} + + // Quit fetcher, txsyncLoop. + close(pm.quitSync) + + // Disconnect existing sessions. + // This also closes the gate for any new registrations on the peer set. + // sessions which are already established but not added to pm.peers yet + // will exit when they try to register. + pm.peers.Close() + + // Wait for all peer handler goroutines and the loops to come down. pm.wg.Wait() glog.V(logger.Info).Infoln("Ethereum protocol handler stopped") |