aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2016-05-20 20:25:28 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2016-05-20 20:25:28 +0800
commita8472e0fdbb4ac157671ba3bcc79001674d582d2 (patch)
tree07736b4a0d318671a502fa42e41e81cd5c76cebd /eth/downloader/peer.go
parente798e4fd750745cec99c5a531e42998d9a7be85e (diff)
parent8906b2fe0934c67ebb1db5d4d77acdf1a7e988f0 (diff)
downloadgo-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar
go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.gz
go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.bz2
go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.lz
go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.xz
go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.zst
go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.zip
Merge pull request #2315 from karalabe/concurrent-headers-2
eth/downloader: concurrent header downloads
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go66
1 files changed, 61 insertions, 5 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index c4846194b..6aab907d7 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -58,15 +58,18 @@ type peer struct {
id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block
+ headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
+ headerThroughput float64 // Number of headers measured to be retrievable per second
blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second
- blockStarted time.Time // Time instance when the last block (body)fetch was started
+ headerStarted time.Time // Time instance when the last header fetch was started
+ blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
@@ -118,10 +121,12 @@ func (p *peer) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
+ atomic.StoreInt32(&p.headerIdle, 0)
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.stateIdle, 0)
+ p.headerThroughput = 0
p.blockThroughput = 0
p.receiptThroughput = 0
p.stateThroughput = 0
@@ -151,6 +156,24 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return nil
}
+// FetchHeaders sends a header retrieval request to the remote peer.
+func (p *peer) FetchHeaders(from uint64, count int) error {
+ // Sanity check the protocol version
+ if p.version < 62 {
+ panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
+ }
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.headerStarted = time.Now()
+
+ // Issue the header retrieval request (absolut upwards without gaps)
+ go p.getAbsHeaders(from, count, 0, false)
+
+ return nil
+}
+
// FetchBodies sends a block body retrieval request to the remote peer.
func (p *peer) FetchBodies(request *fetchRequest) error {
// Sanity check the protocol version
@@ -217,6 +240,13 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil
}
+// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
+// requests. Its estimated header retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetHeadersIdle(delivered int) {
+ p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
+}
+
// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// requests. Its estimated block retrieval throughput is updated with that measured
// just now.
@@ -264,6 +294,15 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
*throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
}
+// HeaderCapacity retrieves the peers header download allowance based on its
+// previously discovered throughput.
+func (p *peer) HeaderCapacity() int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch))))
+}
+
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
func (p *peer) BlockCapacity() int {
@@ -323,14 +362,15 @@ func (p *peer) String() string {
defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
+ fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+
+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
fmt.Sprintf("lacking %4d", len(p.lacking)),
)
}
-// peerSet represents the collection of active peer participating in the block
+// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
peers map[string]*peer
@@ -359,7 +399,7 @@ func (ps *peerSet) Reset() {
// peer is already known.
//
// The method also sets the starting throughput values of the new peer to the
-// average of all existing peers, to give it a realistic change of being used
+// average of all existing peers, to give it a realistic chance of being used
// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
@@ -369,15 +409,17 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
- p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
+ p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
for _, peer := range ps.peers {
peer.lock.RLock()
+ p.headerThroughput += peer.headerThroughput
p.blockThroughput += peer.blockThroughput
p.receiptThroughput += peer.receiptThroughput
p.stateThroughput += peer.stateThroughput
peer.lock.RUnlock()
}
+ p.headerThroughput /= float64(len(ps.peers))
p.blockThroughput /= float64(len(ps.peers))
p.receiptThroughput /= float64(len(ps.peers))
p.stateThroughput /= float64(len(ps.peers))
@@ -441,6 +483,20 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
return ps.idlePeers(61, 61, idle, throughput)
}
+// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
+// within the active peer set, ordered by their reputation.
+func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.headerIdle) == 0
+ }
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.headerThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
+}
+
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation.
func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {