aboutsummaryrefslogtreecommitdiffstats
path: root/eth/fetcher/fetcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/fetcher/fetcher.go')
-rw-r--r--eth/fetcher/fetcher.go18
1 files changed, 10 insertions, 8 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index 98cc1a76b..50966f5ee 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -83,6 +83,7 @@ type announce struct {
// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
+ peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
}
@@ -90,6 +91,7 @@ type headerFilterTask struct {
// headerFilterTask represents a batch of block bodies (transactions and uncles)
// needing fetcher filtering.
type bodyFilterTask struct {
+ peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
@@ -218,8 +220,8 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
-func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
- log.Trace("Filtering headers", "headers", len(headers))
+func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
+ log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher
filter := make(chan *headerFilterTask)
@@ -231,7 +233,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
}
// Request the filtering of the header list
select {
- case filter <- &headerFilterTask{headers: headers, time: time}:
+ case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case <-f.quit:
return nil
}
@@ -246,8 +248,8 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
-func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
- log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles))
+func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
+ log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)
@@ -259,7 +261,7 @@ func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*
}
// Request the filtering of the body list
select {
- case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}:
+ case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case <-f.quit:
return nil, nil
}
@@ -444,7 +446,7 @@ func (f *Fetcher) loop() {
hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms
- if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
+ if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
@@ -523,7 +525,7 @@ func (f *Fetcher) loop() {
txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
uncleHash := types.CalcUncleHash(task.uncles[i])
- if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash {
+ if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
// Mark the body matched, reassemble if still unknown
matched = true