diff options
Diffstat (limited to 'eth/fetcher/fetcher.go')
-rw-r--r-- | eth/fetcher/fetcher.go | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 98cc1a76b..50966f5ee 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -83,6 +83,7 @@ type announce struct { // headerFilterTask represents a batch of headers needing fetcher filtering. type headerFilterTask struct { + peer string // The source peer of block headers headers []*types.Header // Collection of headers to filter time time.Time // Arrival time of the headers } @@ -90,6 +91,7 @@ type headerFilterTask struct { // headerFilterTask represents a batch of block bodies (transactions and uncles) // needing fetcher filtering. type bodyFilterTask struct { + peer string // The source peer of block bodies transactions [][]*types.Transaction // Collection of transactions per block bodies uncles [][]*types.Header // Collection of uncles per block bodies time time.Time // Arrival time of the blocks' contents @@ -218,8 +220,8 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { // FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. -func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header { - log.Trace("Filtering headers", "headers", len(headers)) +func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { + log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) // Send the filter channel to the fetcher filter := make(chan *headerFilterTask) @@ -231,7 +233,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type } // Request the filtering of the header list select { - case filter <- &headerFilterTask{headers: headers, time: time}: + case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: case <-f.quit: return nil } @@ -246,8 +248,8 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type // FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. -func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { - log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles)) +func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { + log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) // Send the filter channel to the fetcher filter := make(chan *bodyFilterTask) @@ -259,7 +261,7 @@ func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]* } // Request the filtering of the body list select { - case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}: + case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: case <-f.quit: return nil, nil } @@ -444,7 +446,7 @@ func (f *Fetcher) loop() { hash := header.Hash() // Filter fetcher-requested headers from other synchronisation algorithms - if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { + if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) @@ -523,7 +525,7 @@ func (f *Fetcher) loop() { txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) uncleHash := types.CalcUncleHash(task.uncles[i]) - if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash { + if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { // Mark the body matched, reassemble if still unknown matched = true |