aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-15 17:26:05 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-15 17:26:05 +0800
commitb240983e2bafcde1c5902ce3a196b22475412f16 (patch)
tree95a2f06dcf9320dc8e3d3104ad0cec5930db6f03
parent30a9939388ac738aba39eb64c287bbf9bbda91c9 (diff)
downloadgo-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.tar
go-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.tar.gz
go-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.tar.bz2
go-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.tar.lz
go-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.tar.xz
go-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.tar.zst
go-tangerine-b240983e2bafcde1c5902ce3a196b22475412f16.zip
eth, eth/downloader: do async block fetches, add dl tests
-rw-r--r--eth/downloader/downloader_test.go48
-rw-r--r--eth/downloader/peer.go2
-rw-r--r--eth/sync.go2
3 files changed, 47 insertions, 5 deletions
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 9803ae534..f71c16237 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -122,7 +122,14 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
// newPeer registers a new block download source into the downloader.
func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
- err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id))
+ return dl.newSlowPeer(id, hashes, blocks, 0)
+}
+
+// newSlowPeer registers a new block download source into the downloader, with a
+// specific delay time on processing the network packets sent to it, simulating
+// potentially slow network IO.
+func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error {
+ err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay))
if err == nil {
// Assign the owned hashes and blocks to the peer (deep copy)
dl.peerHashes[id] = make([]common.Hash, len(hashes))
@@ -147,8 +154,10 @@ func (dl *downloadTester) dropPeer(id string) {
// peerGetBlocksFn constructs a getHashes function associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of hashes from the particularly requested peer.
-func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) error {
+func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error {
return func(head common.Hash) error {
+ time.Sleep(delay)
+
limit := MaxHashFetch
if dl.maxHashFetch > 0 {
limit = dl.maxHashFetch
@@ -178,8 +187,10 @@ func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) erro
// peerGetBlocksFn constructs a getBlocks function associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of blocks from the particularly requested peer.
-func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error {
+func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error {
return func(hashes []common.Hash) error {
+ time.Sleep(delay)
+
blocks := dl.peerBlocks[id]
result := make([]*types.Block, 0, len(hashes))
for _, hash := range hashes {
@@ -340,6 +351,37 @@ func TestMultiSynchronisation(t *testing.T) {
}
}
+// Tests that synchronising with a peer who's very slow at network IO does not
+// stall the other peers in the system.
+func TestSlowSynchronisation(t *testing.T) {
+ tester := newTester()
+
+ // Create a batch of blocks, with a slow and a full speed peer
+ targetCycles := 2
+ targetBlocks := targetCycles*blockCacheLimit - 15
+ targetIODelay := 500 * time.Millisecond
+
+ hashes := createHashes(targetBlocks, knownHash)
+ blocks := createBlocksFromHashes(hashes)
+
+ tester.newSlowPeer("fast", hashes, blocks, 0)
+ tester.newSlowPeer("slow", hashes, blocks, targetIODelay)
+
+ // Try to sync with the peers (pull hashes from fast)
+ start := time.Now()
+ if err := tester.sync("fast"); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
+ t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
+ }
+ // Check that the slow peer got hit at most once per block-cache-size import
+ limit := time.Duration(targetCycles+1) * targetIODelay
+ if delay := time.Since(start); delay >= limit {
+ t.Fatalf("synchronisation exceeded delay limit: have %v, want %v", delay, limit)
+ }
+}
+
// Tests that if a peer returns an invalid chain with a block pointing to a non-
// existing parent, it is correctly detected and handled.
func TestNonExistingParentAttack(t *testing.T) {
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 9614a6951..f36e133e4 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -74,7 +74,7 @@ func (p *peer) Fetch(request *fetchRequest) error {
for hash, _ := range request.Hashes {
hashes = append(hashes, hash)
}
- p.getBlocks(hashes)
+ go p.getBlocks(hashes)
return nil
}
diff --git a/eth/sync.go b/eth/sync.go
index 88a76805c..917fc0fce 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -171,7 +171,7 @@ func (pm *ProtocolManager) fetcher() {
// Send out all block requests
for peer, hashes := range request {
glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
- peer.requestBlocks(hashes)
+ go peer.requestBlocks(hashes)
}
request = make(map[*peer][]common.Hash)