diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 1 | ||||
-rw-r--r-- | eth/handler.go | 60 | ||||
-rw-r--r-- | eth/helper_test.go | 14 | ||||
-rw-r--r-- | eth/peer.go | 21 | ||||
-rw-r--r-- | eth/sync.go | 2 |
5 files changed, 66 insertions, 32 deletions
diff --git a/eth/backend.go b/eth/backend.go index 9722e9625..f43dea777 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -416,6 +416,7 @@ func (s *Ethereum) Stop() error { s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() + s.miner.Stop() s.eventMux.Stop() s.StopAutoDAG() diff --git a/eth/handler.go b/eth/handler.go index d6b474a91..3980a625e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -74,14 +74,14 @@ type ProtocolManager struct { minedBlockSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - quitSync chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} // wait group is used for graceful shutdowns during downloading // and processing - wg sync.WaitGroup - quit bool + wg sync.WaitGroup } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, } // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkId: networkId, - fastSync: fastSync, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chaindb: chaindb, - peers: newPeerSet(), - newPeerCh: make(chan *peer, 1), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkId: networkId, + fastSync: fastSync, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + chaindb: chaindb, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) @@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) + select { + case manager.newPeerCh <- peer: + manager.wg.Add(1) + defer manager.wg.Done() + return manager.handle(peer) + case <-manager.quitSync: + return p2p.DiscQuitting + } }, NodeInfo: func() interface{} { return manager.NodeInfo() @@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() { } func (pm *ProtocolManager) Stop() { - // Showing a log message. During download / process this could actually - // take between 5 to 10 seconds and therefor feedback is required. glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...") - pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits syncer, fetcher, txsyncLoop - // Wait for any process action + // Quit the sync loop. + // After this send has completed, no new peers will be accepted. + pm.noMorePeers <- struct{}{} + + // Quit fetcher, txsyncLoop. + close(pm.quitSync) + + // Disconnect existing sessions. + // This also closes the gate for any new registrations on the peer set. + // sessions which are already established but not added to pm.peers yet + // will exit when they try to register. + pm.peers.Close() + + // Wait for all peer handler goroutines and the loops to come down. pm.wg.Wait() glog.V(logger.Info).Infoln("Ethereum protocol handler stopped") diff --git a/eth/helper_test.go b/eth/helper_test.go index 5703d44cc..dacb1593f 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -140,14 +140,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te // Start the peer on a new thread errc := make(chan error, 1) go func() { - pm.newPeerCh <- peer - errc <- pm.handle(peer) + select { + case pm.newPeerCh <- peer: + errc <- pm.handle(peer) + case <-pm.quitSync: + errc <- p2p.DiscQuitting + } }() - tp := &testPeer{ - app: app, - net: net, - peer: peer, - } + tp := &testPeer{app: app, net: net, peer: peer} // Execute any implicitly requested handshakes and return if shake { td, head, genesis := pm.blockchain.Status() diff --git a/eth/peer.go b/eth/peer.go index 15ba22ff5..8eb41b0f9 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -34,6 +34,7 @@ import ( ) var ( + errClosed = errors.New("peer set is closed") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") ) @@ -351,8 +352,9 @@ func (p *peer) String() string { // peerSet represents the collection of active peers currently participating in // the Ethereum sub-protocol. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex + peers map[string]*peer + lock sync.RWMutex + closed bool } // newPeerSet creates a new peer set to track the active participants. @@ -368,6 +370,9 @@ func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() + if ps.closed { + return errClosed + } if _, ok := ps.peers[p.id]; ok { return errAlreadyRegistered } @@ -450,3 +455,15 @@ func (ps *peerSet) BestPeer() *peer { } return bestPeer } + +// Close disconnects all peers. +// No new peers can be registered after Close has returned. +func (ps *peerSet) Close() { + ps.lock.Lock() + defer ps.lock.Unlock() + + for _, p := range ps.peers { + p.Disconnect(p2p.DiscQuitting) + } + ps.closed = true +} diff --git a/eth/sync.go b/eth/sync.go index dd8aef8e4..69881530d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -148,7 +148,7 @@ func (pm *ProtocolManager) syncer() { // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) - case <-pm.quitSync: + case <-pm.noMorePeers: return } } |