From 016f152b36106130fa42514ef6cfacc09dfc3142 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 1 May 2015 00:23:51 +0200 Subject: eth, eth/downloader: Moved block processing & graceful shutdown The downloader is no longer responsible for processing blocks. The eth-protocol handler now takes care of this instead. Added graceful shutdown during block processing. Closes #846 --- eth/handler.go | 139 ++++++++++----------------------------------------------- 1 file changed, 23 insertions(+), 116 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 2b432f95d..1e0663816 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -1,39 +1,5 @@ package eth -// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change -// The idea is that most of the calls within the protocol will become synchronous. -// Block downloading and block processing will be complete seperate processes -/* -# Possible scenarios - -// Synching scenario -// Use the best peer to synchronise -blocks, err := pm.downloader.Synchronise() -if err != nil { - // handle - break -} -pm.chainman.InsertChain(blocks) - -// Receiving block with known parent -if parent_exist { - if err := pm.chainman.InsertChain(block); err != nil { - // handle - break - } - pm.BroadcastBlock(block) -} - -// Receiving block with unknown parent -blocks, err := pm.downloader.SynchroniseWithPeer(peer) -if err != nil { - // handle - break -} -pm.chainman.InsertChain(blocks) - -*/ - import ( "fmt" "math" @@ -54,7 +20,9 @@ import ( const ( peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcTimer = 500 * time.Millisecond + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 ) func errResp(code errCode, format string, v ...interface{}) error { @@ -91,6 +59,10 @@ type ProtocolManager struct { newPeerCh chan *peer quitSync chan struct{} + // wait group is used for graceful shutdowns during downloading + // and processing + wg sync.WaitGroup + quit bool } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -129,65 +101,6 @@ func (pm *ProtocolManager) removePeer(peer *peer) { delete(pm.peers, peer.id) } -func (pm *ProtocolManager) syncHandler() { - // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTimer(peerCountTimeout) -out: - for { - select { - case <-pm.newPeerCh: - // Meet the `minDesiredPeerCount` before we select our best peer - if len(pm.peers) < minDesiredPeerCount { - break - } - - // Find the best peer - peer := getBestPeer(pm.peers) - if peer == nil { - glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") - } - - itimer.Stop() - go pm.synchronise(peer) - case <-itimer.C: - // The timer will make sure that the downloader keeps an active state - // in which it attempts to always check the network for highest td peers - // Either select the peer or restart the timer if no peers could - // be selected. - if peer := getBestPeer(pm.peers); peer != nil { - go pm.synchronise(peer) - } else { - itimer.Reset(5 * time.Second) - } - case <-pm.quitSync: - break out - } - } -} - -func (pm *ProtocolManager) synchronise(peer *peer) { - // Make sure the peer's TD is higher than our own. If not drop. - if peer.td.Cmp(pm.chainman.Td()) <= 0 { - return - } - // Check downloader if it's busy so it doesn't show the sync message - // for every attempty - if pm.downloader.IsBusy() { - return - } - - glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td) - // Get the hashes from the peer (synchronously) - err := pm.downloader.Synchronise(peer.id, peer.recentHash) - if err != nil && err == downloader.ErrBadPeer { - glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action") - pm.removePeer(peer) - } else if err != nil { - // handle error - glog.V(logger.Debug).Infoln("error downloading:", err) - } -} - func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) @@ -197,18 +110,26 @@ func (pm *ProtocolManager) Start() { pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - // sync handler - go pm.syncHandler() + go pm.update() } func (pm *ProtocolManager) Stop() { + // Showing a log message. During download / process this could actually + // take between 5 to 10 seconds and therefor feedback is required. + glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...") + + pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop close(pm.quitSync) // quits the sync handler + + // Wait for any process action + pm.wg.Wait() + + glog.V(logger.Info).Infoln("Ethereum protocol handler stopped") } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - td, current, genesis := pm.chainman.Status() return newPeer(pv, nv, genesis, current, td, p, rw) @@ -359,6 +280,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Add the block hash as a known hash to the peer. This will later be used to determine // who should receive this. p.blockHashes.Add(hash) + // update the peer info + p.recentHash = hash + p.td = request.TD _, chainHead, _ := self.chainman.Status() @@ -383,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Attempt to insert the newly received by checking if the parent exists. // if the parent exists we process the block and propagate to our peers - // if the parent does not exists we delegate to the downloader. + // otherwise synchronise with the peer if self.chainman.HasBlock(request.Block.ParentHash()) { if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") @@ -400,24 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } self.BroadcastBlock(hash, request.Block) } else { - // adding blocks is synchronous - go func() { - err := self.downloader.AddBlock(p.id, request.Block, request.TD) - if err != nil && err == downloader.ErrBadPeer { - glog.V(logger.Error).Infoln("removed peer (", p.id, ") with err:", err) - - self.removePeer(p) - return - } else if err != nil { - glog.V(logger.Detail).Infoln("downloader err:", err) - return - } - if err := self.verifyTd(p, request); err != nil { - glog.V(logger.Error).Infoln(err) - return - } - self.BroadcastBlock(hash, request.Block) - }() + go self.synchronise(p) } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) -- cgit v1.2.3