diff options
Diffstat (limited to 'eth/downloader/downloader_test.go')
-rw-r--r-- | eth/downloader/downloader_test.go | 334 |
1 files changed, 171 insertions, 163 deletions
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1fb5a0910..b354682a1 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -96,9 +96,7 @@ func newTester() *downloadTester { tester.stateDb, _ = ethdb.NewMemDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, - tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, - tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer) + tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) return tester } @@ -218,14 +216,14 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { return err } -// hasHeader checks if a header is present in the testers canonical chain. -func (dl *downloadTester) hasHeader(hash common.Hash) bool { - return dl.getHeader(hash) != nil +// HasHeader checks if a header is present in the testers canonical chain. +func (dl *downloadTester) HasHeader(hash common.Hash) bool { + return dl.GetHeaderByHash(hash) != nil } -// hasBlock checks if a block and associated state is present in the testers canonical chain. -func (dl *downloadTester) hasBlock(hash common.Hash) bool { - block := dl.getBlock(hash) +// HasBlockAndState checks if a block and associated state is present in the testers canonical chain. +func (dl *downloadTester) HasBlockAndState(hash common.Hash) bool { + block := dl.GetBlockByHash(hash) if block == nil { return false } @@ -233,24 +231,24 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool { return err == nil } -// getHeader retrieves a header from the testers canonical chain. -func (dl *downloadTester) getHeader(hash common.Hash) *types.Header { +// GetHeader retrieves a header from the testers canonical chain. +func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() return dl.ownHeaders[hash] } -// getBlock retrieves a block from the testers canonical chain. -func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { +// GetBlock retrieves a block from the testers canonical chain. +func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() return dl.ownBlocks[hash] } -// headHeader retrieves the current head header from the canonical chain. -func (dl *downloadTester) headHeader() *types.Header { +// CurrentHeader retrieves the current head header from the canonical chain. +func (dl *downloadTester) CurrentHeader() *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() @@ -262,8 +260,8 @@ func (dl *downloadTester) headHeader() *types.Header { return dl.genesis.Header() } -// headBlock retrieves the current head block from the canonical chain. -func (dl *downloadTester) headBlock() *types.Block { +// CurrentBlock retrieves the current head block from the canonical chain. +func (dl *downloadTester) CurrentBlock() *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() @@ -277,8 +275,8 @@ func (dl *downloadTester) headBlock() *types.Block { return dl.genesis } -// headFastBlock retrieves the current head fast-sync block from the canonical chain. -func (dl *downloadTester) headFastBlock() *types.Block { +// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain. +func (dl *downloadTester) CurrentFastBlock() *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() @@ -290,26 +288,26 @@ func (dl *downloadTester) headFastBlock() *types.Block { return dl.genesis } -// commitHeadBlock manually sets the head block to a given hash. -func (dl *downloadTester) commitHeadBlock(hash common.Hash) error { +// FastSyncCommitHead manually sets the head block to a given hash. +func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error { // For now only check that the state trie is correct - if block := dl.getBlock(hash); block != nil { + if block := dl.GetBlockByHash(hash); block != nil { _, err := trie.NewSecure(block.Root(), dl.stateDb, 0) return err } return fmt.Errorf("non existent block: %x", hash[:4]) } -// getTd retrieves the block's total difficulty from the canonical chain. -func (dl *downloadTester) getTd(hash common.Hash) *big.Int { +// GetTdByHash retrieves the block's total difficulty from the canonical chain. +func (dl *downloadTester) GetTdByHash(hash common.Hash) *big.Int { dl.lock.RLock() defer dl.lock.RUnlock() return dl.ownChainTd[hash] } -// insertHeaders injects a new batch of headers into the simulated chain. -func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) (int, error) { +// InsertHeaderChain injects a new batch of headers into the simulated chain. +func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -337,8 +335,8 @@ func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) return len(headers), nil } -// insertBlocks injects a new batch of blocks into the simulated chain. -func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { +// InsertChain injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -359,8 +357,8 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { return len(blocks), nil } -// insertReceipts injects a new batch of receipts into the simulated chain. -func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) { +// InsertReceiptChain injects a new batch of receipts into the simulated chain. +func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -377,8 +375,8 @@ func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.R return len(blocks), nil } -// rollback removes some recently added elements from the chain. -func (dl *downloadTester) rollback(hashes []common.Hash) { +// Rollback removes some recently added elements from the chain. +func (dl *downloadTester) Rollback(hashes []common.Hash) { dl.lock.Lock() defer dl.lock.Unlock() @@ -406,14 +404,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha defer dl.lock.Unlock() var err error - switch version { - case 62: - err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) - case 63: - err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) - case 64: - err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) - } + err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl, id, delay}) if err == nil { // Assign the owned hashes, headers and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -471,139 +462,133 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } -// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash +type downloadTesterPeer struct { + dl *downloadTester + id string + delay time.Duration +} + +// Head constructs a function to retrieve a peer's current head hash // and total difficulty. -func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) { - return func() (common.Hash, *big.Int) { - dl.lock.RLock() - defer dl.lock.RUnlock() +func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() - return dl.peerHashes[id][0], nil - } + return dlp.dl.peerHashes[dlp.id][0], nil } -// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed +// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. -func (dl *downloadTester) peerGetRelHeadersFn(id string, delay time.Duration) func(common.Hash, int, int, bool) error { - return func(origin common.Hash, amount int, skip int, reverse bool) error { - // Find the canonical number of the hash - dl.lock.RLock() - number := uint64(0) - for num, hash := range dl.peerHashes[id] { - if hash == origin { - number = uint64(len(dl.peerHashes[id]) - num - 1) - break - } +func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { + // Find the canonical number of the hash + dlp.dl.lock.RLock() + number := uint64(0) + for num, hash := range dlp.dl.peerHashes[dlp.id] { + if hash == origin { + number = uint64(len(dlp.dl.peerHashes[dlp.id]) - num - 1) + break } - dl.lock.RUnlock() - - // Use the absolute header fetcher to satisfy the query - return dl.peerGetAbsHeadersFn(id, delay)(number, amount, skip, reverse) } + dlp.dl.lock.RUnlock() + + // Use the absolute header fetcher to satisfy the query + return dlp.RequestHeadersByNumber(number, amount, skip, reverse) } -// peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered +// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. -func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) func(uint64, int, int, bool) error { - return func(origin uint64, amount int, skip int, reverse bool) error { - time.Sleep(delay) - - dl.lock.RLock() - defer dl.lock.RUnlock() - - // Gather the next batch of headers - hashes := dl.peerHashes[id] - headers := dl.peerHeaders[id] - result := make([]*types.Header, 0, amount) - for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { - if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { - result = append(result, header) - } +func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { + time.Sleep(dlp.delay) + + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() + + // Gather the next batch of headers + hashes := dlp.dl.peerHashes[dlp.id] + headers := dlp.dl.peerHeaders[dlp.id] + result := make([]*types.Header, 0, amount) + for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { + if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { + result = append(result, header) } - // Delay delivery a bit to allow attacks to unfold - go func() { - time.Sleep(time.Millisecond) - dl.downloader.DeliverHeaders(id, result) - }() - return nil } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dlp.dl.downloader.DeliverHeaders(dlp.id, result) + }() + return nil } -// peerGetBodiesFn constructs a getBlockBodies method associated with a particular +// RequestBodies constructs a getBlockBodies method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of block bodies from the particularly requested peer. -func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error { - return func(hashes []common.Hash) error { - time.Sleep(delay) +func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error { + time.Sleep(dlp.delay) - dl.lock.RLock() - defer dl.lock.RUnlock() + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() - blocks := dl.peerBlocks[id] + blocks := dlp.dl.peerBlocks[dlp.id] - transactions := make([][]*types.Transaction, 0, len(hashes)) - uncles := make([][]*types.Header, 0, len(hashes)) + transactions := make([][]*types.Transaction, 0, len(hashes)) + uncles := make([][]*types.Header, 0, len(hashes)) - for _, hash := range hashes { - if block, ok := blocks[hash]; ok { - transactions = append(transactions, block.Transactions()) - uncles = append(uncles, block.Uncles()) - } + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + transactions = append(transactions, block.Transactions()) + uncles = append(uncles, block.Uncles()) } - go dl.downloader.DeliverBodies(id, transactions, uncles) - - return nil } + go dlp.dl.downloader.DeliverBodies(dlp.id, transactions, uncles) + + return nil } -// peerGetReceiptsFn constructs a getReceipts method associated with a particular +// RequestReceipts constructs a getReceipts method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of block receipts from the particularly requested peer. -func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func([]common.Hash) error { - return func(hashes []common.Hash) error { - time.Sleep(delay) +func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error { + time.Sleep(dlp.delay) - dl.lock.RLock() - defer dl.lock.RUnlock() + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() - receipts := dl.peerReceipts[id] + receipts := dlp.dl.peerReceipts[dlp.id] - results := make([][]*types.Receipt, 0, len(hashes)) - for _, hash := range hashes { - if receipt, ok := receipts[hash]; ok { - results = append(results, receipt) - } + results := make([][]*types.Receipt, 0, len(hashes)) + for _, hash := range hashes { + if receipt, ok := receipts[hash]; ok { + results = append(results, receipt) } - go dl.downloader.DeliverReceipts(id, results) - - return nil } + go dlp.dl.downloader.DeliverReceipts(dlp.id, results) + + return nil } -// peerGetNodeDataFn constructs a getNodeData method associated with a particular +// RequestNodeData constructs a getNodeData method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of node state data from the particularly requested peer. -func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error { - return func(hashes []common.Hash) error { - time.Sleep(delay) - - dl.lock.RLock() - defer dl.lock.RUnlock() - - results := make([][]byte, 0, len(hashes)) - for _, hash := range hashes { - if data, err := dl.peerDb.Get(hash.Bytes()); err == nil { - if !dl.peerMissingStates[id][hash] { - results = append(results, data) - } +func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error { + time.Sleep(dlp.delay) + + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() + + results := make([][]byte, 0, len(hashes)) + for _, hash := range hashes { + if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil { + if !dlp.dl.peerMissingStates[dlp.id][hash] { + results = append(results, data) } } - go dl.downloader.DeliverNodeData(id, results) - - return nil } + go dlp.dl.downloader.DeliverNodeData(dlp.id, results) + + return nil } // assertOwnChain checks if the local chain contains the correct number of items @@ -1212,7 +1197,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("fast-attack", nil, mode); err == nil { t.Fatalf("succeeded fast attacker synchronisation") } - if head := tester.headHeader().Number.Int64(); int(head) > MaxHeaderFetch { + if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch) } // Attempt to sync with an attacker that feeds junk during the block import phase. @@ -1226,11 +1211,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("block-attack", nil, mode); err == nil { t.Fatalf("succeeded block attacker synchronisation") } - if head := tester.headHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { + if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch) } if mode == FastSync { - if head := tester.headBlock().NumberU64(); head != 0 { + if head := tester.CurrentBlock().NumberU64(); head != 0 { t.Errorf("fast sync pivot block #%d not rolled back", head) } } @@ -1251,11 +1236,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("withhold-attack", nil, mode); err == nil { t.Fatalf("succeeded withholding attacker synchronisation") } - if head := tester.headHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { + if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch) } if mode == FastSync { - if head := tester.headBlock().NumberU64(); head != 0 { + if head := tester.CurrentBlock().NumberU64(); head != 0 { t.Errorf("fast sync pivot block #%d not rolled back", head) } } @@ -1670,6 +1655,48 @@ func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } +type floodingTestPeer struct { + peer Peer + tester *downloadTester +} + +func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } +func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error { + return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse) +} +func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error { + return ftp.peer.RequestBodies(hashes) +} +func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error { + return ftp.peer.RequestReceipts(hashes) +} +func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error { + return ftp.peer.RequestNodeData(hashes) +} + +func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone); i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}}) + deliveriesDone <- struct{}{} + }() + } + // Deliver the actual requested headers. + go ftp.peer.RequestHeadersByNumber(from, count, skip, reverse) + // None of the extra deliveries should block. + timeout := time.After(15 * time.Second) + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + case <-timeout: + panic("blocked") + } + } + return nil +} + func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { t.Parallel() @@ -1677,7 +1704,6 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { defer master.terminate() hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false) - fakeHeads := []*types.Header{{}, {}, {}, {}} for i := 0; i < 200; i++ { tester := newTester() tester.peerDb = master.peerDb @@ -1685,29 +1711,11 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Whenever the downloader requests headers, flood it with // a lot of unrequested header deliveries. - tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { - deliveriesDone := make(chan struct{}, 500) - for i := 0; i < cap(deliveriesDone); i++ { - peer := fmt.Sprintf("fake-peer%d", i) - go func() { - tester.downloader.DeliverHeaders(peer, fakeHeads) - deliveriesDone <- struct{}{} - }() - } - // Deliver the actual requested headers. - impl := tester.peerGetAbsHeadersFn("peer", 0) - go impl(from, count, skip, reverse) - // None of the extra deliveries should block. - timeout := time.After(15 * time.Second) - for i := 0; i < cap(deliveriesDone); i++ { - select { - case <-deliveriesDone: - case <-timeout: - panic("blocked") - } - } - return nil + tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ + tester.downloader.peers.peers["peer"].peer, + tester, } + if err := tester.sync("peer", nil, mode); err != nil { t.Errorf("sync failed: %v", err) } @@ -1739,7 +1747,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { for i := 0; i < fsPivotInterval; i++ { tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true } - tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section + (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).delay = 500 * time.Millisecond // Enough to reach the critical section // Synchronise with the peer a few times and make sure they fail until the retry limit for i := 0; i < int(fsCriticalTrials)-1; i++ { @@ -1758,7 +1766,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { tester.lock.Lock() tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]] tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} - tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0) + (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).delay = 0 tester.lock.Unlock() } } |