aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2016-05-09 20:59:41 +0800
committerPéter Szilágyi <peterke@gmail.com>2016-05-09 20:59:41 +0800
commit756b62988c15afc748c529610f29769a89f86c35 (patch)
tree4828236bb18861a9527f184d33f8df410d37ae19
parentdc7f202ecd39db925c2d425fea36084efcda5ecc (diff)
parent56ed6152a11592d20220daf6322e94a009e6236d (diff)
downloadgo-tangerine-756b62988c15afc748c529610f29769a89f86c35.tar
go-tangerine-756b62988c15afc748c529610f29769a89f86c35.tar.gz
go-tangerine-756b62988c15afc748c529610f29769a89f86c35.tar.bz2
go-tangerine-756b62988c15afc748c529610f29769a89f86c35.tar.lz
go-tangerine-756b62988c15afc748c529610f29769a89f86c35.tar.xz
go-tangerine-756b62988c15afc748c529610f29769a89f86c35.tar.zst
go-tangerine-756b62988c15afc748c529610f29769a89f86c35.zip
Merge pull request #2523 from fjl/shutdown
core, eth, miner: improve shutdown synchronisation
-rw-r--r--core/tx_pool.go11
-rw-r--r--eth/backend.go1
-rw-r--r--eth/handler.go60
-rw-r--r--eth/helper_test.go14
-rw-r--r--eth/peer.go21
-rw-r--r--eth/sync.go2
-rw-r--r--miner/worker.go62
7 files changed, 101 insertions, 70 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index e997e8cd0..f2eb2bbdd 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -60,8 +60,7 @@ type stateFn func() (*state.StateDB, error)
// two states over time as they are received and processed.
type TxPool struct {
config *ChainConfig
- quit chan bool // Quitting channel
- currentState stateFn // The state function which will allow us to do some pre checks
+ currentState stateFn // The state function which will allow us to do some pre checks
pendingState *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
minGasPrice *big.Int
@@ -72,6 +71,8 @@ type TxPool struct {
pending map[common.Hash]*types.Transaction // processable transactions
queue map[common.Address]map[common.Hash]*types.Transaction
+ wg sync.WaitGroup // for shutdown sync
+
homestead bool
}
@@ -80,7 +81,6 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
config: config,
pending: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
- quit: make(chan bool),
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
@@ -90,12 +90,15 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
}
+ pool.wg.Add(1)
go pool.eventLoop()
return pool
}
func (pool *TxPool) eventLoop() {
+ defer pool.wg.Done()
+
// Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
@@ -155,8 +158,8 @@ func (pool *TxPool) resetState() {
}
func (pool *TxPool) Stop() {
- close(pool.quit)
pool.events.Unsubscribe()
+ pool.wg.Wait()
glog.V(logger.Info).Infoln("Transaction pool stopped")
}
diff --git a/eth/backend.go b/eth/backend.go
index 9722e9625..f43dea777 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -416,6 +416,7 @@ func (s *Ethereum) Stop() error {
s.blockchain.Stop()
s.protocolManager.Stop()
s.txPool.Stop()
+ s.miner.Stop()
s.eventMux.Stop()
s.StopAutoDAG()
diff --git a/eth/handler.go b/eth/handler.go
index d6b474a91..3980a625e 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -74,14 +74,14 @@ type ProtocolManager struct {
minedBlockSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- txsyncCh chan *txsync
- quitSync chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
- wg sync.WaitGroup
- quit bool
+ wg sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
}
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkId: networkId,
- fastSync: fastSync,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chaindb: chaindb,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer, 1),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
+ networkId: networkId,
+ fastSync: fastSync,
+ eventMux: mux,
+ txpool: txpool,
+ blockchain: blockchain,
+ chaindb: chaindb,
+ peers: newPeerSet(),
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
@@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
- manager.newPeerCh <- peer
- return manager.handle(peer)
+ select {
+ case manager.newPeerCh <- peer:
+ manager.wg.Add(1)
+ defer manager.wg.Done()
+ return manager.handle(peer)
+ case <-manager.quitSync:
+ return p2p.DiscQuitting
+ }
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
@@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() {
}
func (pm *ProtocolManager) Stop() {
- // Showing a log message. During download / process this could actually
- // take between 5 to 10 seconds and therefor feedback is required.
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
- pm.quit = true
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
- close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
- // Wait for any process action
+ // Quit the sync loop.
+ // After this send has completed, no new peers will be accepted.
+ pm.noMorePeers <- struct{}{}
+
+ // Quit fetcher, txsyncLoop.
+ close(pm.quitSync)
+
+ // Disconnect existing sessions.
+ // This also closes the gate for any new registrations on the peer set.
+ // sessions which are already established but not added to pm.peers yet
+ // will exit when they try to register.
+ pm.peers.Close()
+
+ // Wait for all peer handler goroutines and the loops to come down.
pm.wg.Wait()
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 5703d44cc..dacb1593f 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -140,14 +140,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
// Start the peer on a new thread
errc := make(chan error, 1)
go func() {
- pm.newPeerCh <- peer
- errc <- pm.handle(peer)
+ select {
+ case pm.newPeerCh <- peer:
+ errc <- pm.handle(peer)
+ case <-pm.quitSync:
+ errc <- p2p.DiscQuitting
+ }
}()
- tp := &testPeer{
- app: app,
- net: net,
- peer: peer,
- }
+ tp := &testPeer{app: app, net: net, peer: peer}
// Execute any implicitly requested handshakes and return
if shake {
td, head, genesis := pm.blockchain.Status()
diff --git a/eth/peer.go b/eth/peer.go
index 15ba22ff5..8eb41b0f9 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -34,6 +34,7 @@ import (
)
var (
+ errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
)
@@ -351,8 +352,9 @@ func (p *peer) String() string {
// peerSet represents the collection of active peers currently participating in
// the Ethereum sub-protocol.
type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
+ peers map[string]*peer
+ lock sync.RWMutex
+ closed bool
}
// newPeerSet creates a new peer set to track the active participants.
@@ -368,6 +370,9 @@ func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
+ if ps.closed {
+ return errClosed
+ }
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
}
@@ -450,3 +455,15 @@ func (ps *peerSet) BestPeer() *peer {
}
return bestPeer
}
+
+// Close disconnects all peers.
+// No new peers can be registered after Close has returned.
+func (ps *peerSet) Close() {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ for _, p := range ps.peers {
+ p.Disconnect(p2p.DiscQuitting)
+ }
+ ps.closed = true
+}
diff --git a/eth/sync.go b/eth/sync.go
index dd8aef8e4..69881530d 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -148,7 +148,7 @@ func (pm *ProtocolManager) syncer() {
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
- case <-pm.quitSync:
+ case <-pm.noMorePeers:
return
}
}
diff --git a/miner/worker.go b/miner/worker.go
index 21588e310..3d1928bf6 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -94,10 +94,13 @@ type worker struct {
mu sync.Mutex
+ // update loop
+ mux *event.TypeMux
+ events event.Subscription
+ wg sync.WaitGroup
+
agents map[Agent]struct{}
recv chan *Result
- mux *event.TypeMux
- quit chan struct{}
pow pow.PoW
eth core.Backend
@@ -138,13 +141,14 @@ func newWorker(config *core.ChainConfig, coinbase common.Address, eth core.Backe
possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase,
txQueue: make(map[common.Hash]*types.Transaction),
- quit: make(chan struct{}),
agents: make(map[Agent]struct{}),
fullValidation: false,
}
+ worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
+ worker.wg.Add(1)
go worker.update()
- go worker.wait()
+ go worker.wait()
worker.commitNewWork()
return worker
@@ -184,9 +188,12 @@ func (self *worker) start() {
}
func (self *worker) stop() {
+ // Quit update.
+ self.events.Unsubscribe()
+ self.wg.Wait()
+
self.mu.Lock()
defer self.mu.Unlock()
-
if atomic.LoadInt32(&self.mining) == 1 {
// Stop all agents.
for agent := range self.agents {
@@ -217,36 +224,23 @@ func (self *worker) unregister(agent Agent) {
}
func (self *worker) update() {
- eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
- defer eventSub.Unsubscribe()
-
- eventCh := eventSub.Chan()
- for {
- select {
- case event, ok := <-eventCh:
- if !ok {
- // Event subscription closed, set the channel to nil to stop spinning
- eventCh = nil
- continue
- }
- // A real event arrived, process interesting content
- switch ev := event.Data.(type) {
- case core.ChainHeadEvent:
- self.commitNewWork()
- case core.ChainSideEvent:
- self.uncleMu.Lock()
- self.possibleUncles[ev.Block.Hash()] = ev.Block
- self.uncleMu.Unlock()
- case core.TxPreEvent:
- // Apply transaction to the pending state if we're not mining
- if atomic.LoadInt32(&self.mining) == 0 {
- self.currentMu.Lock()
- self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
- self.currentMu.Unlock()
- }
+ defer self.wg.Done()
+ for event := range self.events.Chan() {
+ // A real event arrived, process interesting content
+ switch ev := event.Data.(type) {
+ case core.ChainHeadEvent:
+ self.commitNewWork()
+ case core.ChainSideEvent:
+ self.uncleMu.Lock()
+ self.possibleUncles[ev.Block.Hash()] = ev.Block
+ self.uncleMu.Unlock()
+ case core.TxPreEvent:
+ // Apply transaction to the pending state if we're not mining
+ if atomic.LoadInt32(&self.mining) == 0 {
+ self.currentMu.Lock()
+ self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
+ self.currentMu.Unlock()
}
- case <-self.quit:
- return
}
}
}