diff options
Diffstat (limited to 'eth/downloader/downloader_test.go')
-rw-r--r-- | eth/downloader/downloader_test.go | 341 |
1 files changed, 337 insertions, 4 deletions
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index dbcf93607..6ce19480f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math/big" + "sync" "sync/atomic" "testing" "time" @@ -99,6 +100,8 @@ type downloadTester struct { peerHashes map[string][]common.Hash // Hash chain belonging to different test peers peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains + + lock sync.RWMutex } // newTester creates a new downloader test mocker. @@ -118,8 +121,8 @@ func newTester() *downloadTester { // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string, td *big.Int) error { + dl.lock.RLock() hash := dl.peerHashes[id][0] - // If no particular TD was requested, load from the peer's blockchain if td == nil { td = big.NewInt(1) @@ -127,14 +130,19 @@ func (dl *downloadTester) sync(id string, td *big.Int) error { td = diff } } - err := dl.downloader.synchronise(id, hash, td) + dl.lock.RUnlock() + err := dl.downloader.synchronise(id, hash, td) for { // If the queue is empty and processing stopped, break hashes, blocks := dl.downloader.queue.Size() if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 { break } + // If there are queued blocks, but the head is missing, it's a stale leftover + if hashes+blocks > 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 && dl.downloader.queue.GetHeadBlock() == nil { + break + } // Otherwise sleep a bit and retry time.Sleep(time.Millisecond) } @@ -143,26 +151,41 @@ func (dl *downloadTester) sync(id string, td *big.Int) error { // hasBlock checks if a block is pres ent in the testers canonical chain. func (dl *downloadTester) hasBlock(hash common.Hash) bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + return dl.getBlock(hash) != nil } // getBlock retrieves a block from the testers canonical chain. func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { + dl.lock.RLock() + defer dl.lock.RUnlock() + return dl.ownBlocks[hash] } // headBlock retrieves the current head block from the canonical chain. func (dl *downloadTester) headBlock() *types.Block { + dl.lock.RLock() + defer dl.lock.RUnlock() + return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1]) } // getTd retrieves the block's total difficulty from the canonical chain. func (dl *downloadTester) getTd(hash common.Hash) *big.Int { + dl.lock.RLock() + defer dl.lock.RUnlock() + return dl.ownChainTd[hash] } // insertChain injects a new batch of blocks into the simulated chain. func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { + dl.lock.Lock() + defer dl.lock.Unlock() + for i, block := range blocks { if _, ok := dl.ownBlocks[block.ParentHash()]; !ok { return i, errors.New("unknown parent") @@ -183,9 +206,12 @@ func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + dl.lock.Lock() + defer dl.lock.Unlock() + err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), - nil, dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -207,6 +233,9 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha // dropPeer simulates a hard peer removal from the connection pool. func (dl *downloadTester) dropPeer(id string) { + dl.lock.Lock() + defer dl.lock.Unlock() + delete(dl.peerHashes, id) delete(dl.peerBlocks, id) delete(dl.peerChainTds, id) @@ -221,6 +250,9 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun return func(head common.Hash) error { time.Sleep(delay) + dl.lock.RLock() + defer dl.lock.RUnlock() + // Gather the next batch of hashes hashes := dl.peerHashes[id] result := make([]common.Hash, 0, MaxHashFetch) @@ -250,6 +282,9 @@ func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) fun return func(head uint64, count int) error { time.Sleep(delay) + dl.lock.RLock() + defer dl.lock.RUnlock() + // Gather the next batch of hashes hashes := dl.peerHashes[id] result := make([]common.Hash, 0, count) @@ -271,6 +306,10 @@ func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) fun func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { time.Sleep(delay) + + dl.lock.RLock() + defer dl.lock.RUnlock() + blocks := dl.peerBlocks[id] result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { @@ -284,6 +323,27 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ } } +// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed +// origin; associated with a particular peer in the download tester. The returned +// function can be used to retrieve batches of headers from the particular peer. +func (dl *downloadTester) peerGetRelHeadersFn(id string, delay time.Duration) func(common.Hash, int, int, bool) error { + return func(origin common.Hash, amount int, skip int, reverse bool) error { + // Find the canonical number of the hash + dl.lock.RLock() + number := uint64(0) + for num, hash := range dl.peerHashes[id] { + if hash == origin { + number = uint64(len(dl.peerHashes[id]) - num - 1) + break + } + } + dl.lock.RUnlock() + + // Use the absolute header fetcher to satisfy the query + return dl.peerGetAbsHeadersFn(id, delay)(number, amount, skip, reverse) + } +} + // peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. @@ -291,6 +351,9 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu return func(origin uint64, amount int, skip int, reverse bool) error { time.Sleep(delay) + dl.lock.RLock() + defer dl.lock.RUnlock() + // Gather the next batch of hashes hashes := dl.peerHashes[id] blocks := dl.peerBlocks[id] @@ -315,6 +378,10 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { time.Sleep(delay) + + dl.lock.RLock() + defer dl.lock.RUnlock() + blocks := dl.peerBlocks[id] transactions := make([][]*types.Transaction, 0, len(hashes)) @@ -384,13 +451,23 @@ func testThrottling(t *testing.T, protocol int) { errc <- tester.sync("peer", nil) }() // Iteratively take some blocks, always checking the retrieval count - for len(tester.ownBlocks) < targetBlocks+1 { + for { + // Check the retrieval count synchronously (! reason for this ugly block) + tester.lock.RLock() + retrieved := len(tester.ownBlocks) + tester.lock.RUnlock() + if retrieved >= targetBlocks+1 { + break + } // Wait a bit for sync to throttle itself var cached int for start := time.Now(); time.Since(start) < time.Second; { time.Sleep(25 * time.Millisecond) + tester.downloader.queue.lock.RLock() cached = len(tester.downloader.queue.blockPool) + tester.downloader.queue.lock.RUnlock() + if cached == blockCacheLimit || len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) == targetBlocks+1 { break } @@ -727,3 +804,259 @@ func testBlockBodyAttackerDropping(t *testing.T, protocol int) { } } } + +// Tests that synchronisation boundaries (origin block number and highest block +// number) is tracked and updated correctly. +func TestSyncBoundaries61(t *testing.T) { testSyncBoundaries(t, 61) } +func TestSyncBoundaries62(t *testing.T) { testSyncBoundaries(t, 62) } +func TestSyncBoundaries63(t *testing.T) { testSyncBoundaries(t, 63) } +func TestSyncBoundaries64(t *testing.T) { testSyncBoundaries(t, 64) } + +func testSyncBoundaries(t *testing.T, protocol int) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + // Set a sync init hook to catch boundary changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester := newTester() + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + // Retrieve the sync boundaries and ensure they are zero (pristine sync) + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != 0 { + t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) + } + // Synchronise half the blocks and check initial boundaries + tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], blocks) + pending := new(sync.WaitGroup) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("peer-half", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != uint64(targetBlocks/2+1) { + t.Fatalf("Initial boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, targetBlocks/2+1) + } + progress <- struct{}{} + pending.Wait() + + // Synchronise all the blocks and check continuation boundaries + tester.newPeer("peer-full", protocol, hashes, blocks) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("peer-full", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != uint64(targetBlocks/2+1) || latest != uint64(targetBlocks) { + t.Fatalf("Completing boundary mismatch: have %v/%v, want %v/%v", origin, latest, targetBlocks/2+1, targetBlocks) + } + progress <- struct{}{} + pending.Wait() +} + +// Tests that synchronisation boundaries (origin block number and highest block +// number) is tracked and updated correctly in case of a fork (or manual head +// revertal). +func TestForkedSyncBoundaries61(t *testing.T) { testForkedSyncBoundaries(t, 61) } +func TestForkedSyncBoundaries62(t *testing.T) { testForkedSyncBoundaries(t, 62) } +func TestForkedSyncBoundaries63(t *testing.T) { testForkedSyncBoundaries(t, 63) } +func TestForkedSyncBoundaries64(t *testing.T) { testForkedSyncBoundaries(t, 64) } + +func testForkedSyncBoundaries(t *testing.T, protocol int) { + // Create a forked chain to simulate origin revertal + common, fork := MaxHashFetch, 2*MaxHashFetch + hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) + + // Set a sync init hook to catch boundary changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester := newTester() + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + // Retrieve the sync boundaries and ensure they are zero (pristine sync) + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != 0 { + t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) + } + // Synchronise with one of the forks and check boundaries + tester.newPeer("fork A", protocol, hashesA, blocksA) + pending := new(sync.WaitGroup) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("fork A", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != uint64(len(hashesA)-1) { + t.Fatalf("Initial boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, len(hashesA)-1) + } + progress <- struct{}{} + pending.Wait() + + // Simulate a successful sync above the fork + tester.downloader.syncStatsOrigin = tester.downloader.syncStatsHeight + + // Synchronise with the second fork and check boundary resets + tester.newPeer("fork B", protocol, hashesB, blocksB) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("fork B", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != uint64(common) || latest != uint64(len(hashesB)-1) { + t.Fatalf("Forking boundary mismatch: have %v/%v, want %v/%v", origin, latest, common, len(hashesB)-1) + } + progress <- struct{}{} + pending.Wait() +} + +// Tests that if synchronisation is aborted due to some failure, then the boundary +// origin is not updated in the next sync cycle, as it should be considered the +// continuation of the previous sync and not a new instance. +func TestFailedSyncBoundaries61(t *testing.T) { testFailedSyncBoundaries(t, 61) } +func TestFailedSyncBoundaries62(t *testing.T) { testFailedSyncBoundaries(t, 62) } +func TestFailedSyncBoundaries63(t *testing.T) { testFailedSyncBoundaries(t, 63) } +func TestFailedSyncBoundaries64(t *testing.T) { testFailedSyncBoundaries(t, 64) } + +func testFailedSyncBoundaries(t *testing.T, protocol int) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + // Set a sync init hook to catch boundary changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester := newTester() + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + // Retrieve the sync boundaries and ensure they are zero (pristine sync) + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != 0 { + t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) + } + // Attempt a full sync with a faulty peer + tester.newPeer("faulty", protocol, hashes, blocks) + missing := targetBlocks / 2 + delete(tester.peerBlocks["faulty"], hashes[missing]) + + pending := new(sync.WaitGroup) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("faulty", nil); err == nil { + t.Fatalf("succeeded faulty synchronisation") + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != uint64(targetBlocks) { + t.Fatalf("Initial boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, targetBlocks) + } + progress <- struct{}{} + pending.Wait() + + // Synchronise with a good peer and check that the boundary origin remind the same after a failure + tester.newPeer("valid", protocol, hashes, blocks) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("valid", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != uint64(targetBlocks) { + t.Fatalf("Completing boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, targetBlocks) + } + progress <- struct{}{} + pending.Wait() +} + +// Tests that if an attacker fakes a chain height, after the attack is detected, +// the boundary height is successfully reduced at the next sync invocation. +func TestFakedSyncBoundaries61(t *testing.T) { testFakedSyncBoundaries(t, 61) } +func TestFakedSyncBoundaries62(t *testing.T) { testFakedSyncBoundaries(t, 62) } +func TestFakedSyncBoundaries63(t *testing.T) { testFakedSyncBoundaries(t, 63) } +func TestFakedSyncBoundaries64(t *testing.T) { testFakedSyncBoundaries(t, 64) } + +func testFakedSyncBoundaries(t *testing.T, protocol int) { + // Create a small block chain + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks+3, 0, genesis) + + // Set a sync init hook to catch boundary changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester := newTester() + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + // Retrieve the sync boundaries and ensure they are zero (pristine sync) + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != 0 { + t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) + } + // Create and sync with an attacker that promises a higher chain than available + tester.newPeer("attack", protocol, hashes, blocks) + for i := 1; i < 3; i++ { + delete(tester.peerBlocks["attack"], hashes[i]) + } + + pending := new(sync.WaitGroup) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("attack", nil); err == nil { + t.Fatalf("succeeded attacker synchronisation") + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != uint64(targetBlocks+3) { + t.Fatalf("Initial boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, targetBlocks+3) + } + progress <- struct{}{} + pending.Wait() + + // Synchronise with a good peer and check that the boundary height has been reduced to the true value + tester.newPeer("valid", protocol, hashes[3:], blocks) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("valid", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + }() + <-starting + if origin, latest := tester.downloader.Boundaries(); origin != 0 || latest != uint64(targetBlocks) { + t.Fatalf("Initial boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, targetBlocks) + } + progress <- struct{}{} + pending.Wait() +} |