diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-05-20 20:25:28 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-05-20 20:25:28 +0800 |
commit | a8472e0fdbb4ac157671ba3bcc79001674d582d2 (patch) | |
tree | 07736b4a0d318671a502fa42e41e81cd5c76cebd /eth/downloader/peer.go | |
parent | e798e4fd750745cec99c5a531e42998d9a7be85e (diff) | |
parent | 8906b2fe0934c67ebb1db5d4d77acdf1a7e988f0 (diff) | |
download | go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.gz go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.bz2 go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.lz go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.xz go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.tar.zst go-tangerine-a8472e0fdbb4ac157671ba3bcc79001674d582d2.zip |
Merge pull request #2315 from karalabe/concurrent-headers-2
eth/downloader: concurrent header downloads
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 66 |
1 files changed, 61 insertions, 5 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c4846194b..6aab907d7 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -58,15 +58,18 @@ type peer struct { id string // Unique identifier of the peer head common.Hash // Hash of the peers latest known block + headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) + headerThroughput float64 // Number of headers measured to be retrievable per second blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second receiptThroughput float64 // Number of receipts measured to be retrievable per second stateThroughput float64 // Number of node data pieces measured to be retrievable per second - blockStarted time.Time // Time instance when the last block (body)fetch was started + headerStarted time.Time // Time instance when the last header fetch was started + blockStarted time.Time // Time instance when the last block (body) fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started @@ -118,10 +121,12 @@ func (p *peer) Reset() { p.lock.Lock() defer p.lock.Unlock() + atomic.StoreInt32(&p.headerIdle, 0) atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.stateIdle, 0) + p.headerThroughput = 0 p.blockThroughput = 0 p.receiptThroughput = 0 p.stateThroughput = 0 @@ -151,6 +156,24 @@ func (p *peer) Fetch61(request *fetchRequest) error { return nil } +// FetchHeaders sends a header retrieval request to the remote peer. +func (p *peer) FetchHeaders(from uint64, count int) error { + // Sanity check the protocol version + if p.version < 62 { + panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) { + return errAlreadyFetching + } + p.headerStarted = time.Now() + + // Issue the header retrieval request (absolut upwards without gaps) + go p.getAbsHeaders(from, count, 0, false) + + return nil +} + // FetchBodies sends a block body retrieval request to the remote peer. func (p *peer) FetchBodies(request *fetchRequest) error { // Sanity check the protocol version @@ -217,6 +240,13 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return nil } +// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval +// requests. Its estimated header retrieval throughput is updated with that measured +// just now. +func (p *peer) SetHeadersIdle(delivered int) { + p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) +} + // SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval // requests. Its estimated block retrieval throughput is updated with that measured // just now. @@ -264,6 +294,15 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured } +// HeaderCapacity retrieves the peers header download allowance based on its +// previously discovered throughput. +func (p *peer) HeaderCapacity() int { + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch)))) +} + // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. func (p *peer) BlockCapacity() int { @@ -323,14 +362,15 @@ func (p *peer) String() string { defer p.lock.RUnlock() return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ + fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+ + fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ fmt.Sprintf("lacking %4d", len(p.lacking)), ) } -// peerSet represents the collection of active peer participating in the block +// peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { peers map[string]*peer @@ -359,7 +399,7 @@ func (ps *peerSet) Reset() { // peer is already known. // // The method also sets the starting throughput values of the new peer to the -// average of all existing peers, to give it a realistic change of being used +// average of all existing peers, to give it a realistic chance of being used // for data retrievals. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() @@ -369,15 +409,17 @@ func (ps *peerSet) Register(p *peer) error { return errAlreadyRegistered } if len(ps.peers) > 0 { - p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0 + p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0 for _, peer := range ps.peers { peer.lock.RLock() + p.headerThroughput += peer.headerThroughput p.blockThroughput += peer.blockThroughput p.receiptThroughput += peer.receiptThroughput p.stateThroughput += peer.stateThroughput peer.lock.RUnlock() } + p.headerThroughput /= float64(len(ps.peers)) p.blockThroughput /= float64(len(ps.peers)) p.receiptThroughput /= float64(len(ps.peers)) p.stateThroughput /= float64(len(ps.peers)) @@ -441,6 +483,20 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { return ps.idlePeers(61, 61, idle, throughput) } +// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers +// within the active peer set, ordered by their reputation. +func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.headerIdle) == 0 + } + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.headerThroughput + } + return ps.idlePeers(62, 64, idle, throughput) +} + // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // the active peer set, ordered by their reputation. func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { |