diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/downloader/peer.go | 45 | ||||
-rw-r--r-- | eth/downloader/queue.go | 12 |
2 files changed, 46 insertions, 11 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 1f457cb15..9ba6dabbd 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -28,9 +28,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "gopkg.in/fatih/set.v0" ) +// Maximum number of entries allowed on the list or lacking items. +const maxLackingHashes = 4096 + // Hash and block fetchers belonging to eth/61 and below type relativeHashFetcherFn func(common.Hash) error type absoluteHashFetcherFn func(uint64, int) error @@ -67,7 +69,8 @@ type peer struct { receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started - ignored *set.Set // Set of hashes not to request (didn't have previously) + lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) + lackingLock sync.RWMutex // Lock protecting the lacking hashes list getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position @@ -95,7 +98,7 @@ func newPeer(id string, version int, head common.Hash, blockCapacity: 1, receiptCapacity: 1, stateCapacity: 1, - ignored: set.New(), + lacking: make(map[common.Hash]struct{}), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -119,7 +122,10 @@ func (p *peer) Reset() { atomic.StoreInt32(&p.blockCapacity, 1) atomic.StoreInt32(&p.receiptCapacity, 1) atomic.StoreInt32(&p.stateCapacity, 1) - p.ignored.Clear() + + p.lackingLock.Lock() + p.lacking = make(map[common.Hash]struct{}) + p.lackingLock.Unlock() } // Fetch61 sends a block retrieval request to the remote peer. @@ -305,13 +311,42 @@ func (p *peer) Demote() { } } +// MarkLacking appends a new entity to the set of items (blocks, receipts, states) +// that a peer is known not to have (i.e. have been requested before). If the +// set reaches its maximum allowed capacity, items are randomly dropped off. +func (p *peer) MarkLacking(hash common.Hash) { + p.lackingLock.Lock() + defer p.lackingLock.Unlock() + + for len(p.lacking) >= maxLackingHashes { + for drop, _ := range p.lacking { + delete(p.lacking, drop) + break + } + } + p.lacking[hash] = struct{}{} +} + +// Lacks retrieves whether the hash of a blockchain item is on the peers lacking +// list (i.e. whether we know that the peer does not have it). +func (p *peer) Lacks(hash common.Hash) bool { + p.lackingLock.RLock() + defer p.lackingLock.RUnlock() + + _, ok := p.lacking[hash] + return ok +} + // String implements fmt.Stringer. func (p *peer) String() string { + p.lackingLock.RLock() + defer p.lackingLock.RUnlock() + return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ - fmt.Sprintf("ignored %4d", p.ignored.Size()), + fmt.Sprintf("lacking %4d", len(p.lacking)), ) } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 56b46e285..1fb5b6e12 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -501,7 +501,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { hash, priority := taskQueue.Pop() - if p.ignored.Has(hash) { + if p.Lacks(hash.(common.Hash)) { skip[hash.(common.Hash)] = int(priority) } else { send[hash.(common.Hash)] = int(priority) @@ -607,7 +607,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ continue } // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.ignored.Has(header.Hash()) { + if p.Lacks(header.Hash()) { skip = append(skip, header) } else { send = append(send, header) @@ -781,7 +781,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { // If no blocks were retrieved, mark them as unavailable for the origin peer if len(blocks) == 0 { for hash, _ := range request.Hashes { - request.Peer.ignored.Add(hash) + request.Peer.MarkLacking(hash) } } // Iterate over the downloaded blocks and add each of them @@ -877,8 +877,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ // If no data items were retrieved, mark them as unavailable for the origin peer if results == 0 { - for hash, _ := range request.Headers { - request.Peer.ignored.Add(hash) + for _, header := range request.Headers { + request.Peer.MarkLacking(header.Hash()) } } // Assemble each of the results with their headers and retrieved data parts @@ -944,7 +944,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If no data was retrieved, mark their hashes as unavailable for the origin peer if len(data) == 0 { for hash, _ := range request.Hashes { - request.Peer.ignored.Add(hash) + request.Peer.MarkLacking(hash) } } // Iterate over the downloaded data and verify each of them |