From b86e7526e12a5a49c1739ec02d3c1c5cc667dcb3 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 24 Apr 2015 14:40:32 +0200 Subject: eth, eth/downloader: moved peer selection to protocol handler --- eth/handler.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 61 insertions(+), 4 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index a634b5bfd..a1b03f57c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -39,6 +39,7 @@ import ( "math" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -51,6 +52,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const ( + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + minDesiredPeerCount = 5 // Amount of peers desired to start syncing +) + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -82,6 +88,9 @@ type ProtocolManager struct { eventMux *event.TypeMux txSub event.Subscription minedBlockSub event.Subscription + + newPeerCh chan *peer + quit chan struct{} } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -93,7 +102,10 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo chainman: chainman, downloader: downloader, peers: make(map[string]*peer), + newPeerCh: make(chan *peer, 1), + quit: make(chan struct{}), } + go manager.peerHandler() manager.SubProtocol = p2p.Protocol{ Name: "eth", @@ -101,16 +113,61 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo Length: ProtocolLength, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(protocolVersion, networkId, p, rw) - err := manager.handle(peer) - //glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err) - return err + manager.newPeerCh <- peer + + return manager.handle(peer) }, } return manager } +func (pm *ProtocolManager) peerHandler() { + // itimer is used to determine when to start ignoring `minDesiredPeerCount` + itimer := time.NewTimer(peerCountTimeout) +out: + for { + select { + case <-pm.newPeerCh: + // Meet the `minDesiredPeerCount` before we select our best peer + if len(pm.peers) < minDesiredPeerCount { + break + } + itimer.Stop() + + // Find the best peer + peer := getBestPeer(pm.peers) + if peer == nil { + glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") + return + } + go pm.synchronise(peer) + case <-itimer.C: + // The timer will make sure that the downloader keeps an active state + // in which it attempts to always check the network for highest td peers + // Either select the peer or restart the timer if no peers could + // be selected. + if peer := getBestPeer(pm.peers); peer != nil { + go pm.synchronise(peer) + } else { + itimer.Reset(5 * time.Second) + } + case <-pm.quit: + break out + } + } +} + +func (pm *ProtocolManager) synchronise(peer *peer) { + // Get the hashes from the peer (synchronously) + _, err := pm.downloader.Synchronise(peer.id, peer.recentHash) + if err != nil { + // handle error + glog.V(logger.Debug).Infoln("error downloading:", err) + } +} + func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) @@ -141,7 +198,7 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.peers[p.id] = p pm.pmu.Unlock() - pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks) + pm.downloader.RegisterPeer(p.id, p.td, p.recentHash, p.requestHashes, p.requestBlocks) defer func() { pm.pmu.Lock() defer pm.pmu.Unlock() -- cgit v1.2.3 From 31f82eb3347454f64f3d41de3087109d09597806 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 24 Apr 2015 15:04:58 +0200 Subject: eth, eth/downloader: don't require td on downloader. Fixed tests --- eth/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index a1b03f57c..a5bc125da 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -134,14 +134,14 @@ out: if len(pm.peers) < minDesiredPeerCount { break } - itimer.Stop() // Find the best peer peer := getBestPeer(pm.peers) if peer == nil { glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") - return } + + itimer.Stop() go pm.synchronise(peer) case <-itimer.C: // The timer will make sure that the downloader keeps an active state -- cgit v1.2.3 From d84c2202e79c30ec906b1a078bfd9fdf5ae94a31 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 24 Apr 2015 15:37:32 +0200 Subject: eth, eth/downloader: simplified synchronisation process --- eth/handler.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index a5bc125da..8db476eb4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -90,7 +90,7 @@ type ProtocolManager struct { minedBlockSub event.Subscription newPeerCh chan *peer - quit chan struct{} + quitSync chan struct{} } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -103,9 +103,8 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo downloader: downloader, peers: make(map[string]*peer), newPeerCh: make(chan *peer, 1), - quit: make(chan struct{}), + quitSync: make(chan struct{}), } - go manager.peerHandler() manager.SubProtocol = p2p.Protocol{ Name: "eth", @@ -123,7 +122,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo return manager } -func (pm *ProtocolManager) peerHandler() { +func (pm *ProtocolManager) syncHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` itimer := time.NewTimer(peerCountTimeout) out: @@ -153,7 +152,7 @@ out: } else { itimer.Reset(5 * time.Second) } - case <-pm.quit: + case <-pm.quitSync: break out } } @@ -161,7 +160,7 @@ out: func (pm *ProtocolManager) synchronise(peer *peer) { // Get the hashes from the peer (synchronously) - _, err := pm.downloader.Synchronise(peer.id, peer.recentHash) + err := pm.downloader.Synchronise(peer.id, peer.recentHash) if err != nil { // handle error glog.V(logger.Debug).Infoln("error downloading:", err) @@ -176,11 +175,15 @@ func (pm *ProtocolManager) Start() { // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + + // sync handler + go pm.syncHandler() } func (pm *ProtocolManager) Stop() { pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + close(pm.quitSync) // quits the sync handler } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -198,7 +201,7 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.peers[p.id] = p pm.pmu.Unlock() - pm.downloader.RegisterPeer(p.id, p.td, p.recentHash, p.requestHashes, p.requestBlocks) + pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks) defer func() { pm.pmu.Lock() defer pm.pmu.Unlock() @@ -370,6 +373,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } else { // adding blocks is synchronous go func() { + // TODO check parent error err := self.downloader.AddBlock(p.id, request.Block, request.TD) if err != nil { glog.V(logger.Detail).Infoln("downloader err:", err) -- cgit v1.2.3 From 1681ee9883a5cd2dbcb423d08b491343c682c655 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 24 Apr 2015 17:03:09 +0200 Subject: eth: added a few informative messages regarding downloading --- eth/handler.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 8db476eb4..d00d00f23 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -159,6 +159,12 @@ out: } func (pm *ProtocolManager) synchronise(peer *peer) { + // Make sure the peer's TD is higher than our own. If not drop. + if peer.td.Cmp(pm.chainman.Td()) <= 0 { + return + } + + glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td) // Get the hashes from the peer (synchronously) err := pm.downloader.Synchronise(peer.id, peer.recentHash) if err != nil { -- cgit v1.2.3