aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go60
1 files changed, 38 insertions, 22 deletions
diff --git a/eth/handler.go b/eth/handler.go
index d6b474a91..3980a625e 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -74,14 +74,14 @@ type ProtocolManager struct {
minedBlockSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- txsyncCh chan *txsync
- quitSync chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
- wg sync.WaitGroup
- quit bool
+ wg sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
}
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkId: networkId,
- fastSync: fastSync,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chaindb: chaindb,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer, 1),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
+ networkId: networkId,
+ fastSync: fastSync,
+ eventMux: mux,
+ txpool: txpool,
+ blockchain: blockchain,
+ chaindb: chaindb,
+ peers: newPeerSet(),
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
@@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
- manager.newPeerCh <- peer
- return manager.handle(peer)
+ select {
+ case manager.newPeerCh <- peer:
+ manager.wg.Add(1)
+ defer manager.wg.Done()
+ return manager.handle(peer)
+ case <-manager.quitSync:
+ return p2p.DiscQuitting
+ }
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
@@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() {
}
func (pm *ProtocolManager) Stop() {
- // Showing a log message. During download / process this could actually
- // take between 5 to 10 seconds and therefor feedback is required.
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
- pm.quit = true
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
- close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
- // Wait for any process action
+ // Quit the sync loop.
+ // After this send has completed, no new peers will be accepted.
+ pm.noMorePeers <- struct{}{}
+
+ // Quit fetcher, txsyncLoop.
+ close(pm.quitSync)
+
+ // Disconnect existing sessions.
+ // This also closes the gate for any new registrations on the peer set.
+ // sessions which are already established but not added to pm.peers yet
+ // will exit when they try to register.
+ pm.peers.Close()
+
+ // Wait for all peer handler goroutines and the loops to come down.
pm.wg.Wait()
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")