aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go159
1 files changed, 50 insertions, 109 deletions
diff --git a/eth/handler.go b/eth/handler.go
index fecd71632..1e0663816 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -1,39 +1,5 @@
package eth
-// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change
-// The idea is that most of the calls within the protocol will become synchronous.
-// Block downloading and block processing will be complete seperate processes
-/*
-# Possible scenarios
-
-// Synching scenario
-// Use the best peer to synchronise
-blocks, err := pm.downloader.Synchronise()
-if err != nil {
- // handle
- break
-}
-pm.chainman.InsertChain(blocks)
-
-// Receiving block with known parent
-if parent_exist {
- if err := pm.chainman.InsertChain(block); err != nil {
- // handle
- break
- }
- pm.BroadcastBlock(block)
-}
-
-// Receiving block with unknown parent
-blocks, err := pm.downloader.SynchroniseWithPeer(peer)
-if err != nil {
- // handle
- break
-}
-pm.chainman.InsertChain(blocks)
-
-*/
-
import (
"fmt"
"math"
@@ -54,7 +20,9 @@ import (
const (
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockProcTimer = 500 * time.Millisecond
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockProcAmount = 256
)
func errResp(code errCode, format string, v ...interface{}) error {
@@ -91,6 +59,10 @@ type ProtocolManager struct {
newPeerCh chan *peer
quitSync chan struct{}
+ // wait group is used for graceful shutdowns during downloading
+ // and processing
+ wg sync.WaitGroup
+ quit bool
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -122,60 +94,11 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
return manager
}
-func (pm *ProtocolManager) syncHandler() {
- // itimer is used to determine when to start ignoring `minDesiredPeerCount`
- itimer := time.NewTimer(peerCountTimeout)
-out:
- for {
- select {
- case <-pm.newPeerCh:
- // Meet the `minDesiredPeerCount` before we select our best peer
- if len(pm.peers) < minDesiredPeerCount {
- break
- }
-
- // Find the best peer
- peer := getBestPeer(pm.peers)
- if peer == nil {
- glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
- }
-
- itimer.Stop()
- go pm.synchronise(peer)
- case <-itimer.C:
- // The timer will make sure that the downloader keeps an active state
- // in which it attempts to always check the network for highest td peers
- // Either select the peer or restart the timer if no peers could
- // be selected.
- if peer := getBestPeer(pm.peers); peer != nil {
- go pm.synchronise(peer)
- } else {
- itimer.Reset(5 * time.Second)
- }
- case <-pm.quitSync:
- break out
- }
- }
-}
-
-func (pm *ProtocolManager) synchronise(peer *peer) {
- // Make sure the peer's TD is higher than our own. If not drop.
- if peer.td.Cmp(pm.chainman.Td()) <= 0 {
- return
- }
- // Check downloader if it's busy so it doesn't show the sync message
- // for every attempty
- if pm.downloader.IsBusy() {
- return
- }
-
- glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td)
- // Get the hashes from the peer (synchronously)
- err := pm.downloader.Synchronise(peer.id, peer.recentHash)
- if err != nil {
- // handle error
- glog.V(logger.Debug).Infoln("error downloading:", err)
- }
+func (pm *ProtocolManager) removePeer(peer *peer) {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+ pm.downloader.UnregisterPeer(peer.id)
+ delete(pm.peers, peer.id)
}
func (pm *ProtocolManager) Start() {
@@ -187,18 +110,26 @@ func (pm *ProtocolManager) Start() {
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
- // sync handler
- go pm.syncHandler()
+ go pm.update()
}
func (pm *ProtocolManager) Stop() {
+ // Showing a log message. During download / process this could actually
+ // take between 5 to 10 seconds and therefor feedback is required.
+ glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
+
+ pm.quit = true
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
close(pm.quitSync) // quits the sync handler
+
+ // Wait for any process action
+ pm.wg.Wait()
+
+ glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
-
td, current, genesis := pm.chainman.Status()
return newPeer(pv, nv, genesis, current, td, p, rw)
@@ -214,10 +145,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
defer func() {
- pm.pmu.Lock()
- defer pm.pmu.Unlock()
- delete(pm.peers, p.id)
- pm.downloader.UnregisterPeer(p.id)
+ pm.removePeer(p)
}()
// propagate existing transactions. new transactions appearing
@@ -352,6 +280,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Add the block hash as a known hash to the peer. This will later be used to determine
// who should receive this.
p.blockHashes.Add(hash)
+ // update the peer info
+ p.recentHash = hash
+ p.td = request.TD
_, chainHead, _ := self.chainman.Status()
@@ -376,24 +307,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Attempt to insert the newly received by checking if the parent exists.
// if the parent exists we process the block and propagate to our peers
- // if the parent does not exists we delegate to the downloader.
+ // otherwise synchronise with the peer
if self.chainman.HasBlock(request.Block.ParentHash()) {
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
- // handle error
+ glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
+
+ self.removePeer(p)
+
+ return nil
+ }
+
+ if err := self.verifyTd(p, request); err != nil {
+ glog.V(logger.Error).Infoln(err)
+ // XXX for now return nil so it won't disconnect (we should in the future)
return nil
}
self.BroadcastBlock(hash, request.Block)
} else {
- // adding blocks is synchronous
- go func() {
- // TODO check parent error
- err := self.downloader.AddBlock(p.id, request.Block, request.TD)
- if err != nil {
- glog.V(logger.Detail).Infoln("downloader err:", err)
- return
- }
- self.BroadcastBlock(hash, request.Block)
- }()
+ go self.synchronise(p)
}
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@@ -401,6 +332,16 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return nil
}
+func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error {
+ if request.Block.Td.Cmp(request.TD) != 0 {
+ glog.V(logger.Detail).Infoln(peer)
+
+ return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD)
+ }
+
+ return nil
+}
+
// BroadcastBlock will propagate the block to its connected peers. It will sort
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
@@ -421,7 +362,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
for _, peer := range peers {
peer.sendNewBlock(block)
}
- glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total propagation time:", time.Since(block.ReceivedAt))
+ glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt))
}
// BroadcastTx will propagate the block to its connected peers. It will sort