aboutsummaryrefslogtreecommitdiffstats
path: root/eth/fetcher
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-08-15 02:25:41 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-08-25 22:48:47 +0800
commit47a7fe5d22fe2a6be783f6576070814fe951eaaf (patch)
tree61f2f691c6775fa5ae3547b8d769a709b7b3f04c /eth/fetcher
parentca88e18f59af84f34ad67da21fd27a6407eea87c (diff)
downloaddexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.gz
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.bz2
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.lz
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.xz
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.zst
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.zip
eth: port the synchronisation algo to eth/62
Diffstat (limited to 'eth/fetcher')
-rw-r--r--eth/fetcher/fetcher.go401
-rw-r--r--eth/fetcher/fetcher_test.go471
2 files changed, 767 insertions, 105 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index 07eb165dc..f54256788 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -51,6 +51,12 @@ type blockRetrievalFn func(common.Hash) *types.Block
// blockRequesterFn is a callback type for sending a block retrieval request.
type blockRequesterFn func([]common.Hash) error
+// headerRequesterFn is a callback type for sending a header retrieval request.
+type headerRequesterFn func(common.Hash) error
+
+// bodyRequesterFn is a callback type for sending a body retrieval request.
+type bodyRequesterFn func([]common.Hash) error
+
// blockValidatorFn is a callback type to verify a block's header for fast propagation.
type blockValidatorFn func(block *types.Block, parent *types.Block) error
@@ -69,12 +75,30 @@ type peerDropFn func(id string)
// announce is the hash notification of the availability of a new block in the
// network.
type announce struct {
- hash common.Hash // Hash of the block being announced
- number uint64 // Number of the block being announced (0 = unknown | old protocol)
- time time.Time // Timestamp of the announcement
+ hash common.Hash // Hash of the block being announced
+ number uint64 // Number of the block being announced (0 = unknown | old protocol)
+ header *types.Header // Header of the block partially reassembled (new protocol)
+ time time.Time // Timestamp of the announcement
+
+ origin string // Identifier of the peer originating the notification
+
+ fetch61 blockRequesterFn // [eth/61] Fetcher function to retrieve an announced block
+ fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block
+ fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block
+}
- origin string // Identifier of the peer originating the notification
- fetch blockRequesterFn // Fetcher function to retrieve
+// headerFilterTask represents a batch of headers needing fetcher filtering.
+type headerFilterTask struct {
+ headers []*types.Header // Collection of headers to filter
+ time time.Time // Arrival time of the headers
+}
+
+// headerFilterTask represents a batch of block bodies (transactions and uncles)
+// needing fetcher filtering.
+type bodyFilterTask struct {
+ transactions [][]*types.Transaction // Collection of transactions per block bodies
+ uncles [][]*types.Header // Collection of uncles per block bodies
+ time time.Time // Arrival time of the blocks' contents
}
// inject represents a schedules import operation.
@@ -89,14 +113,20 @@ type Fetcher struct {
// Various event channels
notify chan *announce
inject chan *inject
- filter chan chan []*types.Block
- done chan common.Hash
- quit chan struct{}
+
+ blockFilter chan chan []*types.Block
+ headerFilter chan chan *headerFilterTask
+ bodyFilter chan chan *bodyFilterTask
+
+ done chan common.Hash
+ quit chan struct{}
// Announce states
- announces map[string]int // Per peer announce counts to prevent memory exhaustion
- announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
- fetching map[common.Hash]*announce // Announced blocks, currently fetching
+ announces map[string]int // Per peer announce counts to prevent memory exhaustion
+ announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
+ fetching map[common.Hash]*announce // Announced blocks, currently fetching
+ fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
+ completing map[common.Hash]*announce // Blocks with headers, currently body-completing
// Block cache
queue *prque.Prque // Queue containing the import operations (block number sorted)
@@ -112,8 +142,9 @@ type Fetcher struct {
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
- fetchingHook func([]common.Hash) // Method to call upon starting a block fetch
- importedHook func(*types.Block) // Method to call upon successful block import
+ fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
+ completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
+ importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
}
// New creates a block fetcher to retrieve blocks based on hash announcements.
@@ -121,12 +152,16 @@ func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlo
return &Fetcher{
notify: make(chan *announce),
inject: make(chan *inject),
- filter: make(chan chan []*types.Block),
+ blockFilter: make(chan chan []*types.Block),
+ headerFilter: make(chan chan *headerFilterTask),
+ bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*announce),
fetching: make(map[common.Hash]*announce),
+ fetched: make(map[common.Hash][]*announce),
+ completing: make(map[common.Hash]*announce),
queue: prque.New(),
queues: make(map[string]int),
queued: make(map[common.Hash]*inject),
@@ -153,13 +188,17 @@ func (f *Fetcher) Stop() {
// Notify announces the fetcher of the potential availability of a new block in
// the network.
-func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, fetcher blockRequesterFn) error {
+func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
+ blockFetcher blockRequesterFn, // eth/61 specific whole block fetcher
+ headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
block := &announce{
- hash: hash,
- number: number,
- time: time,
- origin: peer,
- fetch: fetcher,
+ hash: hash,
+ number: number,
+ time: time,
+ origin: peer,
+ fetch61: blockFetcher,
+ fetchHeader: headerFetcher,
+ fetchBodies: bodyFetcher,
}
select {
case f.notify <- block:
@@ -183,14 +222,16 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
}
}
-// Filter extracts all the blocks that were explicitly requested by the fetcher,
+// FilterBlocks extracts all the blocks that were explicitly requested by the fetcher,
// returning those that should be handled differently.
-func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
+func (f *Fetcher) FilterBlocks(blocks types.Blocks) types.Blocks {
+ glog.V(logger.Detail).Infof("[eth/61] filtering %d blocks", len(blocks))
+
// Send the filter channel to the fetcher
filter := make(chan []*types.Block)
select {
- case f.filter <- filter:
+ case f.blockFilter <- filter:
case <-f.quit:
return nil
}
@@ -209,11 +250,69 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
}
}
+// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
+// returning those that should be handled differently.
+func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
+ glog.V(logger.Detail).Infof("[eth/62] filtering %d headers", len(headers))
+
+ // Send the filter channel to the fetcher
+ filter := make(chan *headerFilterTask)
+
+ select {
+ case f.headerFilter <- filter:
+ case <-f.quit:
+ return nil
+ }
+ // Request the filtering of the header list
+ select {
+ case filter <- &headerFilterTask{headers: headers, time: time}:
+ case <-f.quit:
+ return nil
+ }
+ // Retrieve the headers remaining after filtering
+ select {
+ case task := <-filter:
+ return task.headers
+ case <-f.quit:
+ return nil
+ }
+}
+
+// FilterBodies extracts all the block bodies that were explicitly requested by
+// the fetcher, returning those that should be handled differently.
+func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
+ glog.V(logger.Detail).Infof("[eth/62] filtering %d:%d bodies", len(transactions), len(uncles))
+
+ // Send the filter channel to the fetcher
+ filter := make(chan *bodyFilterTask)
+
+ select {
+ case f.bodyFilter <- filter:
+ case <-f.quit:
+ return nil, nil
+ }
+ // Request the filtering of the body list
+ select {
+ case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}:
+ case <-f.quit:
+ return nil, nil
+ }
+ // Retrieve the bodies remaining after filtering
+ select {
+ case task := <-filter:
+ return task.transactions, task.uncles
+ case <-f.quit:
+ return nil, nil
+ }
+}
+
// Loop is the main fetcher loop, checking and processing various notification
// events.
func (f *Fetcher) loop() {
// Iterate the block fetching until a quit is requested
- fetch := time.NewTimer(0)
+ fetchTimer := time.NewTimer(0)
+ completeTimer := time.NewTimer(0)
+
for {
// Clean up any expired block fetches
for hash, announce := range f.fetching {
@@ -255,14 +354,25 @@ func (f *Fetcher) loop() {
glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
break
}
+ // If we have a valid block number, check that it's potentially useful
+ if notification.number > 0 {
+ if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
+ glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
+ discardMeter.Mark(1)
+ break
+ }
+ }
// All is well, schedule the announce if block's not yet downloading
if _, ok := f.fetching[notification.hash]; ok {
break
}
+ if _, ok := f.completing[notification.hash]; ok {
+ break
+ }
f.announces[notification.origin] = count
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
if len(f.announced) == 1 {
- f.reschedule(fetch)
+ f.rescheduleFetch(fetchTimer)
}
case op := <-f.inject:
@@ -275,7 +385,7 @@ func (f *Fetcher) loop() {
f.forgetHash(hash)
f.forgetBlock(hash)
- case <-fetch.C:
+ case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
request := make(map[string][]common.Hash)
@@ -292,30 +402,77 @@ func (f *Fetcher) loop() {
}
}
}
- // Send out all block requests
+ // Send out all block (eth/61) or header (eth/62) requests
for peer, hashes := range request {
if glog.V(logger.Detail) && len(hashes) > 0 {
list := "["
for _, hash := range hashes {
- list += fmt.Sprintf("%x, ", hash[:4])
+ list += fmt.Sprintf("%x…, ", hash[:4])
}
list = list[:len(list)-2] + "]"
- glog.V(logger.Detail).Infof("Peer %s: fetching %s", peer, list)
+ if f.fetching[hashes[0]].fetch61 != nil {
+ glog.V(logger.Detail).Infof("[eth/61] Peer %s: fetching blocks %s", peer, list)
+ } else {
+ glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching headers %s", peer, list)
+ }
}
// Create a closure of the fetch and schedule in on a new thread
- fetcher, hashes := f.fetching[hashes[0]].fetch, hashes
+ fetchBlocks, fetchHeader, hashes := f.fetching[hashes[0]].fetch61, f.fetching[hashes[0]].fetchHeader, hashes
go func() {
if f.fetchingHook != nil {
f.fetchingHook(hashes)
}
- fetcher(hashes)
+ if fetchBlocks != nil {
+ // Use old eth/61 protocol to retrieve whole blocks
+ fetchBlocks(hashes)
+ } else {
+ // Use new eth/62 protocol to retrieve headers first
+ for _, hash := range hashes {
+ fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
+ }
+ }
}()
}
// Schedule the next fetch if blocks are still pending
- f.reschedule(fetch)
+ f.rescheduleFetch(fetchTimer)
- case filter := <-f.filter:
+ case <-completeTimer.C:
+ // At least one header's timer ran out, retrieve everything
+ request := make(map[string][]common.Hash)
+
+ for hash, announces := range f.fetched {
+ // Pick a random peer to retrieve from, reset all others
+ announce := announces[rand.Intn(len(announces))]
+ f.forgetHash(hash)
+
+ // If the block still didn't arrive, queue for completion
+ if f.getBlock(hash) == nil {
+ request[announce.origin] = append(request[announce.origin], hash)
+ f.completing[hash] = announce
+ }
+ }
+ // Send out all block body requests
+ for peer, hashes := range request {
+ if glog.V(logger.Detail) && len(hashes) > 0 {
+ list := "["
+ for _, hash := range hashes {
+ list += fmt.Sprintf("%x…, ", hash[:4])
+ }
+ list = list[:len(list)-2] + "]"
+
+ glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching bodies %s", peer, list)
+ }
+ // Create a closure of the fetch and schedule in on a new thread
+ if f.completingHook != nil {
+ f.completingHook(hashes)
+ }
+ go f.completing[hashes[0]].fetchBodies(hashes)
+ }
+ // Schedule the next fetch if blocks are still pending
+ f.rescheduleComplete(completeTimer)
+
+ case filter := <-f.blockFilter:
// Blocks arrived, extract any explicit fetches, return all else
var blocks types.Blocks
select {
@@ -352,12 +509,135 @@ func (f *Fetcher) loop() {
f.enqueue(announce.origin, block)
}
}
+
+ case filter := <-f.headerFilter:
+ // Headers arrived from a remote peer. Extract those that were explicitly
+ // requested by the fetcher, and return everything else so it's delivered
+ // to other parts of the system.
+ var task *headerFilterTask
+ select {
+ case task = <-filter:
+ case <-f.quit:
+ return
+ }
+ // Split the batch of headers into unknown ones (to return to the caller),
+ // known incomplete ones (requiring body retrievals) and completed blocks.
+ unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
+ for _, header := range task.headers {
+ hash := header.Hash()
+
+ // Filter fetcher-requested headers from other synchronisation algorithms
+ if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
+ // If the delivered header does not match the promised number, drop the announcer
+ if header.Number.Uint64() != announce.number {
+ glog.V(logger.Detail).Infof("[eth/62] Peer %s: invalid block number for [%x…]: announced %d, provided %d", announce.origin, header.Hash().Bytes()[:4], announce.number, header.Number.Uint64())
+ f.dropPeer(announce.origin)
+ f.forgetHash(hash)
+ continue
+ }
+ // Only keep if not imported by other means
+ if f.getBlock(hash) == nil {
+ announce.header = header
+ announce.time = task.time
+
+ // If the block is empty (header only), short circuit into the final import queue
+ if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
+ glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
+
+ complete = append(complete, types.NewBlockWithHeader(header))
+ f.completing[hash] = announce
+ continue
+ }
+ // Otherwise add to the list of blocks needing completion
+ incomplete = append(incomplete, announce)
+ } else {
+ glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] already imported, discarding header", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
+ f.forgetHash(hash)
+ }
+ } else {
+ // Fetcher doesn't know about it, add to the return list
+ unknown = append(unknown, header)
+ }
+ }
+ select {
+ case filter <- &headerFilterTask{headers: unknown, time: task.time}:
+ case <-f.quit:
+ return
+ }
+ // Schedule the retrieved headers for body completion
+ for _, announce := range incomplete {
+ hash := announce.header.Hash()
+ if _, ok := f.completing[hash]; ok {
+ continue
+ }
+ f.fetched[hash] = append(f.fetched[hash], announce)
+ if len(f.fetched) == 1 {
+ f.rescheduleComplete(completeTimer)
+ }
+ }
+ // Schedule the header-only blocks for import
+ for _, block := range complete {
+ if announce := f.completing[block.Hash()]; announce != nil {
+ f.enqueue(announce.origin, block)
+ }
+ }
+
+ case filter := <-f.bodyFilter:
+ // Block bodies arrived, extract any explicitly requested blocks, return the rest
+ var task *bodyFilterTask
+ select {
+ case task = <-filter:
+ case <-f.quit:
+ return
+ }
+
+ blocks := []*types.Block{}
+ for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
+ // Match up a body to any possible completion request
+ matched := false
+
+ for hash, announce := range f.completing {
+ if f.queued[hash] == nil {
+ txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
+ uncleHash := types.CalcUncleHash(task.uncles[i])
+
+ if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash {
+ // Mark the body matched, reassemble if still unknown
+ matched = true
+
+ if f.getBlock(hash) == nil {
+ blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]))
+ } else {
+ f.forgetHash(hash)
+ }
+ }
+ }
+ }
+ if matched {
+ task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
+ task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
+ i--
+ continue
+ }
+ }
+
+ select {
+ case filter <- task:
+ case <-f.quit:
+ return
+ }
+ // Schedule the retrieved blocks for ordered import
+ for _, block := range blocks {
+ if announce := f.completing[block.Hash()]; announce != nil {
+ f.enqueue(announce.origin, block)
+ }
+ }
}
}
}
-// reschedule resets the specified fetch timer to the next announce timeout.
-func (f *Fetcher) reschedule(fetch *time.Timer) {
+// rescheduleFetch resets the specified fetch timer to the next announce timeout.
+func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
// Short circuit if no blocks are announced
if len(f.announced) == 0 {
return
@@ -372,6 +652,22 @@ func (f *Fetcher) reschedule(fetch *time.Timer) {
fetch.Reset(arriveTimeout - time.Since(earliest))
}
+// rescheduleComplete resets the specified completion timer to the next fetch timeout.
+func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
+ // Short circuit if no headers are fetched
+ if len(f.fetched) == 0 {
+ return
+ }
+ // Otherwise find the earliest expiring announcement
+ earliest := time.Now()
+ for _, announces := range f.fetched {
+ if earliest.After(announces[0].time) {
+ earliest = announces[0].time
+ }
+ }
+ complete.Reset(gatherSlack - time.Since(earliest))
+}
+
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
func (f *Fetcher) enqueue(peer string, block *types.Block) {
@@ -380,13 +676,15 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
- glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
+ glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
+ f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
- glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
+ glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
discardMeter.Mark(1)
+ f.forgetHash(hash)
return
}
// Schedule the block for future importing
@@ -400,7 +698,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
f.queue.Push(op, -float32(block.NumberU64()))
if glog.V(logger.Debug) {
- glog.Infof("Peer %s: queued block #%d [%x], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
+ glog.Infof("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
}
}
}
@@ -412,13 +710,14 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
hash := block.Hash()
// Run the import on a new thread
- glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", peer, block.NumberU64(), hash[:4])
+ glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x…]", peer, block.NumberU64(), hash[:4])
go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
if parent == nil {
+ glog.V(logger.Debug).Infof("Peer %s: parent []%x] of block #%d [%x…] unknown", block.ParentHash().Bytes()[:4], peer, block.NumberU64(), hash[:4])
return
}
// Quickly validate the header and propagate the block if it passes
@@ -434,13 +733,13 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
default:
// Something went very wrong, drop the peer
- glog.V(logger.Debug).Infof("Peer %s: block #%d [%x] verification failed: %v", peer, block.NumberU64(), hash[:4], err)
+ glog.V(logger.Debug).Infof("Peer %s: block #%d [%x…] verification failed: %v", peer, block.NumberU64(), hash[:4], err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertChain(types.Blocks{block}); err != nil {
- glog.V(logger.Warn).Infof("Peer %s: block #%d [%x] import failed: %v", peer, block.NumberU64(), hash[:4], err)
+ glog.V(logger.Warn).Infof("Peer %s: block #%d [%x…] import failed: %v", peer, block.NumberU64(), hash[:4], err)
return
}
// If import succeeded, broadcast the block
@@ -474,9 +773,27 @@ func (f *Fetcher) forgetHash(hash common.Hash) {
}
delete(f.fetching, hash)
}
+
+ // Remove any pending completion requests and decrement the DOS counters
+ for _, announce := range f.fetched[hash] {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] == 0 {
+ delete(f.announces, announce.origin)
+ }
+ }
+ delete(f.fetched, hash)
+
+ // Remove any pending completions and decrement the DOS counters
+ if announce := f.completing[hash]; announce != nil {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] == 0 {
+ delete(f.announces, announce.origin)
+ }
+ delete(f.completing, hash)
+ }
}
-// forgetBlock removes all traces of a queued block frmo the fetcher's internal
+// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
func (f *Fetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go
index b0d9ce843..707d8d758 100644
--- a/eth/fetcher/fetcher_test.go
+++ b/eth/fetcher/fetcher_test.go
@@ -27,21 +27,39 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
)
var (
testdb, _ = ethdb.NewMemDatabase()
- genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0))
+ testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
+ genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil)
)
// makeChain creates a chain of n blocks starting at and including parent.
-// the returned hash chain is ordered head->parent.
+// the returned hash chain is ordered head->parent. In addition, every 3rd block
+// contains a transaction and every 5th an uncle to allow testing correct block
+// reassembly.
func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
- blocks := core.GenerateChain(parent, testdb, n, func(i int, gen *core.BlockGen) {
- gen.SetCoinbase(common.Address{seed})
+ blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
+ block.SetCoinbase(common.Address{seed})
+
+ // If the block number is multiple of 3, send a bonus transaction to the miner
+ if parent == genesis && i%3 == 0 {
+ tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey)
+ if err != nil {
+ panic(err)
+ }
+ block.AddTx(tx)
+ }
+ // If the block number is a multiple of 5, add a bonus uncle to the block
+ if i%5 == 0 {
+ block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
+ }
})
hashes := make([]common.Hash, n+1)
hashes[len(hashes)-1] = parent.Hash()
@@ -60,6 +78,7 @@ type fetcherTester struct {
hashes []common.Hash // Hash chain belonging to the tester
blocks map[common.Hash]*types.Block // Blocks belonging to the tester
+ drops map[string]bool // Map of peers dropped by the fetcher
lock sync.RWMutex
}
@@ -69,6 +88,7 @@ func newTester() *fetcherTester {
tester := &fetcherTester{
hashes: []common.Hash{genesis.Hash()},
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
+ drops: make(map[string]bool),
}
tester.fetcher = New(tester.getBlock, tester.verifyBlock, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
tester.fetcher.Start()
@@ -122,12 +142,14 @@ func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
return 0, nil
}
-// dropPeer is a nop placeholder for the peer removal.
+// dropPeer is an emulator for the peer removal, simply accumulating the various
+// peers dropped by the fetcher.
func (f *fetcherTester) dropPeer(peer string) {
+ f.drops[peer] = true
}
-// peerFetcher retrieves a fetcher associated with a simulated peer.
-func (f *fetcherTester) makeFetcher(blocks map[common.Hash]*types.Block) blockRequesterFn {
+// makeBlockFetcher retrieves a block fetcher associated with a simulated peer.
+func (f *fetcherTester) makeBlockFetcher(blocks map[common.Hash]*types.Block) blockRequesterFn {
closure := make(map[common.Hash]*types.Block)
for hash, block := range blocks {
closure[hash] = block
@@ -142,18 +164,105 @@ func (f *fetcherTester) makeFetcher(blocks map[common.Hash]*types.Block) blockRe
}
}
// Return on a new thread
- go f.fetcher.Filter(blocks)
+ go f.fetcher.FilterBlocks(blocks)
+
+ return nil
+ }
+}
+
+// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
+func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
+ closure := make(map[common.Hash]*types.Block)
+ for hash, block := range blocks {
+ closure[hash] = block
+ }
+ // Create a function that return a header from the closure
+ return func(hash common.Hash) error {
+ // Gather the blocks to return
+ headers := make([]*types.Header, 0, 1)
+ if block, ok := closure[hash]; ok {
+ headers = append(headers, block.Header())
+ }
+ // Return on a new thread
+ go f.fetcher.FilterHeaders(headers, time.Now().Add(drift))
+
+ return nil
+ }
+}
+
+// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
+func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
+ closure := make(map[common.Hash]*types.Block)
+ for hash, block := range blocks {
+ closure[hash] = block
+ }
+ // Create a function that returns blocks from the closure
+ return func(hashes []common.Hash) error {
+ // Gather the block bodies to return
+ transactions := make([][]*types.Transaction, 0, len(hashes))
+ uncles := make([][]*types.Header, 0, len(hashes))
+
+ for _, hash := range hashes {
+ if block, ok := closure[hash]; ok {
+ transactions = append(transactions, block.Transactions())
+ uncles = append(uncles, block.Uncles())
+ }
+ }
+ // Return on a new thread
+ go f.fetcher.FilterBodies(transactions, uncles, time.Now().Add(drift))
return nil
}
}
+// verifyFetchingEvent verifies that one single event arrive on an fetching channel.
+func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
+ if arrive {
+ select {
+ case <-fetching:
+ case <-time.After(time.Second):
+ t.Fatalf("fetching timeout")
+ }
+ } else {
+ select {
+ case <-fetching:
+ t.Fatalf("fetching invoked")
+ case <-time.After(10 * time.Millisecond):
+ }
+ }
+}
+
+// verifyCompletingEvent verifies that one single event arrive on an completing channel.
+func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
+ if arrive {
+ select {
+ case <-completing:
+ case <-time.After(time.Second):
+ t.Fatalf("completing timeout")
+ }
+ } else {
+ select {
+ case <-completing:
+ t.Fatalf("completing invoked")
+ case <-time.After(10 * time.Millisecond):
+ }
+ }
+}
+
// verifyImportEvent verifies that one single event arrive on an import channel.
-func verifyImportEvent(t *testing.T, imported chan *types.Block) {
- select {
- case <-imported:
- case <-time.After(time.Second):
- t.Fatalf("import timeout")
+func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
+ if arrive {
+ select {
+ case <-imported:
+ case <-time.After(time.Second):
+ t.Fatalf("import timeout")
+ }
+ } else {
+ select {
+ case <-imported:
+ t.Fatalf("import invoked")
+ case <-time.After(10 * time.Millisecond):
+ }
}
}
@@ -164,7 +273,7 @@ func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
select {
case <-imported:
case <-time.After(time.Second):
- t.Fatalf("block %d: import timeout", i)
+ t.Fatalf("block %d: import timeout", i+1)
}
}
verifyImportDone(t, imported)
@@ -181,51 +290,78 @@ func verifyImportDone(t *testing.T, imported chan *types.Block) {
// Tests that a fetcher accepts block announcements and initiates retrievals for
// them, successfully importing into the local chain.
-func TestSequentialAnnouncements(t *testing.T) {
+func TestSequentialAnnouncements61(t *testing.T) { testSequentialAnnouncements(t, 61) }
+func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
+func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
+func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
+
+func testSequentialAnnouncements(t *testing.T, protocol int) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
// Iteratively announce blocks until all are imported
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- {
- tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), fetcher)
- verifyImportEvent(t, imported)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ }
+ verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
}
// Tests that if blocks are announced by multiple peers (or even the same buggy
// peer), they will only get downloaded at most once.
-func TestConcurrentAnnouncements(t *testing.T) {
+func TestConcurrentAnnouncements61(t *testing.T) { testConcurrentAnnouncements(t, 61) }
+func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
+func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
+func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
+
+func testConcurrentAnnouncements(t *testing.T, protocol int) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
// Assemble a tester with a built in counter for the requests
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
counter := uint32(0)
- wrapper := func(hashes []common.Hash) error {
+ blockWrapper := func(hashes []common.Hash) error {
atomic.AddUint32(&counter, uint32(len(hashes)))
- return fetcher(hashes)
+ return blockFetcher(hashes)
+ }
+ headerWrapper := func(hash common.Hash) error {
+ atomic.AddUint32(&counter, 1)
+ return headerFetcher(hash)
}
// Iteratively announce blocks until all are imported
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- {
- tester.fetcher.Notify("first", hashes[i], 0, time.Now().Add(-arriveTimeout), wrapper)
- tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout+time.Millisecond), wrapper)
- tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout-time.Millisecond), wrapper)
-
- verifyImportEvent(t, imported)
+ if protocol < 62 {
+ tester.fetcher.Notify("first", hashes[i], 0, time.Now().Add(-arriveTimeout), blockWrapper, nil, nil)
+ tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout+time.Millisecond), blockWrapper, nil, nil)
+ tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout-time.Millisecond), blockWrapper, nil, nil)
+ } else {
+ tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerWrapper, bodyFetcher)
+ tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), nil, headerWrapper, bodyFetcher)
+ tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), nil, headerWrapper, bodyFetcher)
+ }
+ verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
@@ -237,56 +373,90 @@ func TestConcurrentAnnouncements(t *testing.T) {
// Tests that announcements arriving while a previous is being fetched still
// results in a valid import.
-func TestOverlappingAnnouncements(t *testing.T) {
+func TestOverlappingAnnouncements61(t *testing.T) { testOverlappingAnnouncements(t, 61) }
+func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
+func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
+func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
+
+func testOverlappingAnnouncements(t *testing.T, protocol int) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
// Iteratively announce blocks, but overlap them continuously
- fetching := make(chan []common.Hash)
+ overlap := 16
imported := make(chan *types.Block, len(hashes)-1)
- tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
+ for i := 0; i < overlap; i++ {
+ imported <- nil
+ }
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- {
- tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), fetcher)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ }
select {
- case <-fetching:
+ case <-imported:
case <-time.After(time.Second):
- t.Fatalf("hash %d: announce timeout", len(hashes)-i)
+ t.Fatalf("block %d: import timeout", len(hashes)-i)
}
}
// Wait for all the imports to complete and check count
- verifyImportCount(t, imported, len(hashes)-1)
+ verifyImportCount(t, imported, overlap)
}
// Tests that announces already being retrieved will not be duplicated.
-func TestPendingDeduplication(t *testing.T) {
+func TestPendingDeduplication61(t *testing.T) { testPendingDeduplication(t, 61) }
+func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
+func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
+func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
+
+func testPendingDeduplication(t *testing.T, protocol int) {
// Create a hash and corresponding block
hashes, blocks := makeChain(1, 0, genesis)
// Assemble a tester with a built in counter and delayed fetcher
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
delay := 50 * time.Millisecond
counter := uint32(0)
- wrapper := func(hashes []common.Hash) error {
+ blockWrapper := func(hashes []common.Hash) error {
atomic.AddUint32(&counter, uint32(len(hashes)))
// Simulate a long running fetch
go func() {
time.Sleep(delay)
- fetcher(hashes)
+ blockFetcher(hashes)
+ }()
+ return nil
+ }
+ headerWrapper := func(hash common.Hash) error {
+ atomic.AddUint32(&counter, 1)
+
+ // Simulate a long running fetch
+ go func() {
+ time.Sleep(delay)
+ headerFetcher(hash)
}()
return nil
}
// Announce the same block many times until it's fetched (wait for any pending ops)
for tester.getBlock(hashes[0]) == nil {
- tester.fetcher.Notify("repeater", hashes[0], 0, time.Now().Add(-arriveTimeout), wrapper)
+ if protocol < 62 {
+ tester.fetcher.Notify("repeater", hashes[0], 0, time.Now().Add(-arriveTimeout), blockWrapper, nil, nil)
+ } else {
+ tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerWrapper, bodyFetcher)
+ }
time.Sleep(time.Millisecond)
}
time.Sleep(delay)
@@ -302,14 +472,21 @@ func TestPendingDeduplication(t *testing.T) {
// Tests that announcements retrieved in a random order are cached and eventually
// imported when all the gaps are filled in.
-func TestRandomArrivalImport(t *testing.T) {
+func TestRandomArrivalImport61(t *testing.T) { testRandomArrivalImport(t, 61) }
+func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
+func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
+func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
+
+func testRandomArrivalImport(t *testing.T, protocol int) {
// Create a chain of blocks to import, and choose one to delay
targetBlocks := maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
skip := targetBlocks / 2
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan *types.Block, len(hashes)-1)
@@ -317,25 +494,40 @@ func TestRandomArrivalImport(t *testing.T) {
for i := len(hashes) - 1; i >= 0; i-- {
if i != skip {
- tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), fetcher)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ }
time.Sleep(time.Millisecond)
}
}
// Finally announce the skipped entry and check full import
- tester.fetcher.Notify("valid", hashes[skip], 0, time.Now().Add(-arriveTimeout), fetcher)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[skip], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ }
verifyImportCount(t, imported, len(hashes)-1)
}
// Tests that direct block enqueues (due to block propagation vs. hash announce)
// are correctly schedule, filling and import queue gaps.
-func TestQueueGapFill(t *testing.T) {
+func TestQueueGapFill61(t *testing.T) { testQueueGapFill(t, 61) }
+func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
+func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
+func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
+
+func testQueueGapFill(t *testing.T, protocol int) {
// Create a chain of blocks to import, and choose one to not announce at all
targetBlocks := maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
skip := targetBlocks / 2
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan *types.Block, len(hashes)-1)
@@ -343,7 +535,11 @@ func TestQueueGapFill(t *testing.T) {
for i := len(hashes) - 1; i >= 0; i-- {
if i != skip {
- tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), fetcher)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ }
time.Sleep(time.Millisecond)
}
}
@@ -354,13 +550,20 @@ func TestQueueGapFill(t *testing.T) {
// Tests that blocks arriving from various sources (multiple propagations, hash
// announces, etc) do not get scheduled for import multiple times.
-func TestImportDeduplication(t *testing.T) {
+func TestImportDeduplication61(t *testing.T) { testImportDeduplication(t, 61) }
+func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
+func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
+func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
+
+func testImportDeduplication(t *testing.T, protocol int) {
// Create two blocks to import (one for duplication, the other for stalling)
hashes, blocks := makeChain(2, 0, genesis)
// Create the tester and wrap the importer with a counter
tester := newTester()
- fetcher := tester.makeFetcher(blocks)
+ blockFetcher := tester.makeBlockFetcher(blocks)
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
counter := uint32(0)
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
@@ -374,7 +577,11 @@ func TestImportDeduplication(t *testing.T) {
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
// Announce the duplicating block, wait for retrieval, and also propagate directly
- tester.fetcher.Notify("valid", hashes[0], 0, time.Now().Add(-arriveTimeout), fetcher)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[0], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ }
<-fetching
tester.fetcher.Enqueue("valid", blocks[hashes[0]])
@@ -391,35 +598,157 @@ func TestImportDeduplication(t *testing.T) {
}
// Tests that blocks with numbers much lower or higher than out current head get
-// discarded no prevent wasting resources on useless blocks from faulty peers.
-func TestDistantDiscarding(t *testing.T) {
- // Create a long chain to import
+// discarded to prevent wasting resources on useless blocks from faulty peers.
+func TestDistantPropagationDiscarding(t *testing.T) {
+ // Create a long chain to import and define the discard boundaries
hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
head := hashes[len(hashes)/2]
+ low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
+
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester()
tester.hashes = []common.Hash{head}
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
// Ensure that a block with a lower number than the threshold is discarded
- tester.fetcher.Enqueue("lower", blocks[hashes[0]])
+ tester.fetcher.Enqueue("lower", blocks[hashes[low]])
time.Sleep(10 * time.Millisecond)
if !tester.fetcher.queue.Empty() {
t.Fatalf("fetcher queued stale block")
}
// Ensure that a block with a higher number than the threshold is discarded
- tester.fetcher.Enqueue("higher", blocks[hashes[len(hashes)-1]])
+ tester.fetcher.Enqueue("higher", blocks[hashes[high]])
time.Sleep(10 * time.Millisecond)
if !tester.fetcher.queue.Empty() {
t.Fatalf("fetcher queued future block")
}
}
+// Tests that announcements with numbers much lower or higher than out current
+// head get discarded to prevent wasting resources on useless blocks from faulty
+// peers.
+func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
+func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
+func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
+
+func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
+ // Create a long chain to import and define the discard boundaries
+ hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
+ head := hashes[len(hashes)/2]
+
+ low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
+
+ // Create a tester and simulate a head block being the middle of the above chain
+ tester := newTester()
+ tester.hashes = []common.Hash{head}
+ tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+
+ fetching := make(chan struct{}, 2)
+ tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
+
+ // Ensure that a block with a lower number than the threshold is discarded
+ tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ select {
+ case <-time.After(50 * time.Millisecond):
+ case <-fetching:
+ t.Fatalf("fetcher requested stale header")
+ }
+ // Ensure that a block with a higher number than the threshold is discarded
+ tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ select {
+ case <-time.After(50 * time.Millisecond):
+ case <-fetching:
+ t.Fatalf("fetcher requested future header")
+ }
+}
+
+// Tests that peers announcing blocks with invalid numbers (i.e. not matching
+// the headers provided afterwards) get dropped as malicious.
+func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
+func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
+func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
+
+func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
+ // Create a single block to import and check numbers against
+ hashes, blocks := makeChain(1, 0, genesis)
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+
+ imported := make(chan *types.Block)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ // Announce a block with a bad number, check for immediate drop
+ tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ verifyImportEvent(t, imported, false)
+
+ if !tester.drops["bad"] {
+ t.Fatalf("peer with invalid numbered announcement not dropped")
+ }
+ // Make sure a good announcement passes without a drop
+ tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+ verifyImportEvent(t, imported, true)
+
+ if tester.drops["good"] {
+ t.Fatalf("peer with valid numbered announcement dropped")
+ }
+ verifyImportDone(t, imported)
+}
+
+// Tests that if a block is empty (i.e. header only), no body request should be
+// made, and instead the header should be assembled into a whole block in itself.
+func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
+func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
+func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
+
+func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
+ // Create a chain of blocks to import
+ hashes, blocks := makeChain(32, 0, genesis)
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+
+ // Add a monitoring hook for all internal events
+ fetching := make(chan []common.Hash)
+ tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
+
+ completing := make(chan []common.Hash)
+ tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
+
+ imported := make(chan *types.Block)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ // Iteratively announce blocks until all are imported
+ for i := len(hashes) - 2; i >= 0; i-- {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
+
+ // All announces should fetch the header
+ verifyFetchingEvent(t, fetching, true)
+
+ // Only blocks with data contents should request bodies
+ verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
+
+ // Irrelevant of the construct, import should succeed
+ verifyImportEvent(t, imported, true)
+ }
+ verifyImportDone(t, imported)
+}
+
// Tests that a peer is unable to use unbounded memory with sending infinite
// block announcements to a node, but that even in the face of such an attack,
// the fetcher remains operational.
-func TestHashMemoryExhaustionAttack(t *testing.T) {
+func TestHashMemoryExhaustionAttack61(t *testing.T) { testHashMemoryExhaustionAttack(t, 61) }
+func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
+func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
+func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
+
+func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
// Create a tester with instrumented import hooks
tester := newTester()
@@ -429,17 +758,29 @@ func TestHashMemoryExhaustionAttack(t *testing.T) {
// Create a valid chain and an infinite junk chain
targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
- valid := tester.makeFetcher(blocks)
+ validBlockFetcher := tester.makeBlockFetcher(blocks)
+ validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
+ validBodyFetcher := tester.makeBodyFetcher(blocks, 0)
attack, _ := makeChain(targetBlocks, 0, unknownBlock)
- attacker := tester.makeFetcher(nil)
+ attackerBlockFetcher := tester.makeBlockFetcher(nil)
+ attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack)
+ attackerBodyFetcher := tester.makeBodyFetcher(nil, 0)
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
for i := 0; i < len(attack); i++ {
if i < maxQueueDist {
- tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], 0, time.Now(), valid)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], 0, time.Now(), validBlockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), nil, validHeaderFetcher, validBodyFetcher)
+ }
+ }
+ if protocol < 62 {
+ tester.fetcher.Notify("attacker", attack[i], 0, time.Now(), attackerBlockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), nil, attackerHeaderFetcher, attackerBodyFetcher)
}
- tester.fetcher.Notify("attacker", attack[i], 0, time.Now(), attacker)
}
if len(tester.fetcher.announced) != hashLimit+maxQueueDist {
t.Fatalf("queued announce count mismatch: have %d, want %d", len(tester.fetcher.announced), hashLimit+maxQueueDist)
@@ -449,8 +790,12 @@ func TestHashMemoryExhaustionAttack(t *testing.T) {
// Feed the remaining valid hashes to ensure DOS protection state remains clean
for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
- tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), valid)
- verifyImportEvent(t, imported)
+ if protocol < 62 {
+ tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), validBlockFetcher, nil, nil)
+ } else {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, validHeaderFetcher, validBodyFetcher)
+ }
+ verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
}
@@ -498,7 +843,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
// Insert the remaining blocks in chunks to ensure clean DOS protection
for i := maxQueueDist; i < len(hashes)-1; i++ {
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
- verifyImportEvent(t, imported)
+ verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
}