aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-05-01 06:23:51 +0800
committerobscuren <geffobscura@gmail.com>2015-05-01 21:58:44 +0800
commit016f152b36106130fa42514ef6cfacc09dfc3142 (patch)
treea38fa42c59a8a4e0c18b68fc8e5dcb6bd533719b /eth/handler.go
parent8595198c1b56364bb9589a912d2a9797b93a3357 (diff)
downloadgo-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.gz
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.bz2
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.lz
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.xz
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.zst
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.zip
eth, eth/downloader: Moved block processing & graceful shutdown
The downloader is no longer responsible for processing blocks. The eth-protocol handler now takes care of this instead. Added graceful shutdown during block processing. Closes #846
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go139
1 files changed, 23 insertions, 116 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 2b432f95d..1e0663816 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -1,39 +1,5 @@
package eth
-// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change
-// The idea is that most of the calls within the protocol will become synchronous.
-// Block downloading and block processing will be complete seperate processes
-/*
-# Possible scenarios
-
-// Synching scenario
-// Use the best peer to synchronise
-blocks, err := pm.downloader.Synchronise()
-if err != nil {
- // handle
- break
-}
-pm.chainman.InsertChain(blocks)
-
-// Receiving block with known parent
-if parent_exist {
- if err := pm.chainman.InsertChain(block); err != nil {
- // handle
- break
- }
- pm.BroadcastBlock(block)
-}
-
-// Receiving block with unknown parent
-blocks, err := pm.downloader.SynchroniseWithPeer(peer)
-if err != nil {
- // handle
- break
-}
-pm.chainman.InsertChain(blocks)
-
-*/
-
import (
"fmt"
"math"
@@ -54,7 +20,9 @@ import (
const (
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockProcTimer = 500 * time.Millisecond
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockProcAmount = 256
)
func errResp(code errCode, format string, v ...interface{}) error {
@@ -91,6 +59,10 @@ type ProtocolManager struct {
newPeerCh chan *peer
quitSync chan struct{}
+ // wait group is used for graceful shutdowns during downloading
+ // and processing
+ wg sync.WaitGroup
+ quit bool
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -129,65 +101,6 @@ func (pm *ProtocolManager) removePeer(peer *peer) {
delete(pm.peers, peer.id)
}
-func (pm *ProtocolManager) syncHandler() {
- // itimer is used to determine when to start ignoring `minDesiredPeerCount`
- itimer := time.NewTimer(peerCountTimeout)
-out:
- for {
- select {
- case <-pm.newPeerCh:
- // Meet the `minDesiredPeerCount` before we select our best peer
- if len(pm.peers) < minDesiredPeerCount {
- break
- }
-
- // Find the best peer
- peer := getBestPeer(pm.peers)
- if peer == nil {
- glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
- }
-
- itimer.Stop()
- go pm.synchronise(peer)
- case <-itimer.C:
- // The timer will make sure that the downloader keeps an active state
- // in which it attempts to always check the network for highest td peers
- // Either select the peer or restart the timer if no peers could
- // be selected.
- if peer := getBestPeer(pm.peers); peer != nil {
- go pm.synchronise(peer)
- } else {
- itimer.Reset(5 * time.Second)
- }
- case <-pm.quitSync:
- break out
- }
- }
-}
-
-func (pm *ProtocolManager) synchronise(peer *peer) {
- // Make sure the peer's TD is higher than our own. If not drop.
- if peer.td.Cmp(pm.chainman.Td()) <= 0 {
- return
- }
- // Check downloader if it's busy so it doesn't show the sync message
- // for every attempty
- if pm.downloader.IsBusy() {
- return
- }
-
- glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td)
- // Get the hashes from the peer (synchronously)
- err := pm.downloader.Synchronise(peer.id, peer.recentHash)
- if err != nil && err == downloader.ErrBadPeer {
- glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
- pm.removePeer(peer)
- } else if err != nil {
- // handle error
- glog.V(logger.Debug).Infoln("error downloading:", err)
- }
-}
-
func (pm *ProtocolManager) Start() {
// broadcast transactions
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
@@ -197,18 +110,26 @@ func (pm *ProtocolManager) Start() {
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
- // sync handler
- go pm.syncHandler()
+ go pm.update()
}
func (pm *ProtocolManager) Stop() {
+ // Showing a log message. During download / process this could actually
+ // take between 5 to 10 seconds and therefor feedback is required.
+ glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
+
+ pm.quit = true
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
close(pm.quitSync) // quits the sync handler
+
+ // Wait for any process action
+ pm.wg.Wait()
+
+ glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
-
td, current, genesis := pm.chainman.Status()
return newPeer(pv, nv, genesis, current, td, p, rw)
@@ -359,6 +280,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Add the block hash as a known hash to the peer. This will later be used to determine
// who should receive this.
p.blockHashes.Add(hash)
+ // update the peer info
+ p.recentHash = hash
+ p.td = request.TD
_, chainHead, _ := self.chainman.Status()
@@ -383,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Attempt to insert the newly received by checking if the parent exists.
// if the parent exists we process the block and propagate to our peers
- // if the parent does not exists we delegate to the downloader.
+ // otherwise synchronise with the peer
if self.chainman.HasBlock(request.Block.ParentHash()) {
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
@@ -400,24 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
self.BroadcastBlock(hash, request.Block)
} else {
- // adding blocks is synchronous
- go func() {
- err := self.downloader.AddBlock(p.id, request.Block, request.TD)
- if err != nil && err == downloader.ErrBadPeer {
- glog.V(logger.Error).Infoln("removed peer (", p.id, ") with err:", err)
-
- self.removePeer(p)
- return
- } else if err != nil {
- glog.V(logger.Detail).Infoln("downloader err:", err)
- return
- }
- if err := self.verifyTd(p, request); err != nil {
- glog.V(logger.Error).Infoln(err)
- return
- }
- self.BroadcastBlock(hash, request.Block)
- }()
+ go self.synchronise(p)
}
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)