aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-12 18:35:29 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-15 14:22:37 +0800
commitfc7abd98865f3bdc6cc36258026db98a649cd577 (patch)
tree0bca7e6e40445df3f21304e84b6df4dd5d3e7bf5
parent0fc71877a7d7a46f35147f753cba0de7b937c77a (diff)
downloaddexon-fc7abd98865f3bdc6cc36258026db98a649cd577.tar
dexon-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.gz
dexon-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.bz2
dexon-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.lz
dexon-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.xz
dexon-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.zst
dexon-fc7abd98865f3bdc6cc36258026db98a649cd577.zip
eth, eth/downloader: move block processing into the downlaoder
-rw-r--r--eth/downloader/downloader.go177
-rw-r--r--eth/downloader/downloader_test.go252
-rw-r--r--eth/handler.go2
-rw-r--r--eth/sync.go53
4 files changed, 253 insertions, 231 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 88ceeb5ac..1bbba11ed 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -3,6 +3,7 @@ package downloader
import (
"bytes"
"errors"
+ "math"
"math/rand"
"sync"
"sync/atomic"
@@ -28,25 +29,27 @@ var (
crossCheckCycle = time.Second // Period after which to check for expired cross checks
maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
+ maxBlockProcess = 256 // Number of blocks to import at once into the chain
)
var (
- errBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errStallingPeer = errors.New("peer is stalling")
- errBannedHead = errors.New("peer head hash already banned")
- errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
- errTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errInvalidChain = errors.New("retrieved hash chain is invalid")
- errCrossCheckFailed = errors.New("block cross-check failed")
- errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
- errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errBannedHead = errors.New("peer head hash already banned")
+ errNoPeers = errors.New("no peers to keep download active")
+ errPendingQueue = errors.New("pending items in queue")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errCrossCheckFailed = errors.New("block cross-check failed")
+ errCancelHashFetch = errors.New("hash fetching canceled (requested)")
+ errCancelBlockFetch = errors.New("block downloading canceled (requested)")
+ errCancelChainImport = errors.New("chain importing canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
@@ -55,6 +58,9 @@ type hashCheckFn func(common.Hash) bool
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
+// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type chainInsertFn func(types.Blocks) (int, error)
+
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
@@ -88,13 +94,15 @@ type Downloader struct {
importLock sync.Mutex
// Callbacks
- hasBlock hashCheckFn // Checks if a block is present in the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- dropPeer peerDropFn // Retrieved the TD of our own chain
+ hasBlock hashCheckFn // Checks if a block is present in the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ insertChain chainInsertFn // Injects a batch of blocks into the chain
+ dropPeer peerDropFn // Retrieved the TD of our own chain
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
+ processing int32
notified int32
// Channels
@@ -113,18 +121,19 @@ type Block struct {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader {
+func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
// Create the base downloader
downloader := &Downloader{
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasBlock: hasBlock,
- getBlock: getBlock,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ hasBlock: hasBlock,
+ getBlock: getBlock,
+ insertChain: insertChain,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan hashPack, 1),
+ blockCh: make(chan blockPack, 1),
}
// Inject all the known bad hashes
downloader.banned = set.New()
@@ -157,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t
return
}
-// Synchronising returns the state of the downloader
+// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
}
@@ -260,19 +269,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
return d.syncWithPeer(p, hash)
}
-// TakeBlocks takes blocks from the queue and yields them to the caller.
-func (d *Downloader) TakeBlocks() []*Block {
- blocks := d.queue.TakeBlocks()
- if len(blocks) > 0 {
- d.importLock.Lock()
- d.importStart = time.Now()
- d.importQueue = blocks
- d.importDone = 0
- d.importLock.Unlock()
- }
- return blocks
-}
-
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
func (d *Downloader) Has(hash common.Hash) bool {
@@ -307,19 +303,16 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
// Cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
-func (d *Downloader) Cancel() bool {
- // If we're not syncing just return.
- hs, bs := d.queue.Size()
- if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
- return false
- }
+func (d *Downloader) Cancel() {
// Close the current cancel channel
d.cancelLock.Lock()
- select {
- case <-d.cancelCh:
- // Channel was already closed
- default:
- close(d.cancelCh)
+ if d.cancelCh != nil {
+ select {
+ case <-d.cancelCh:
+ // Channel was already closed
+ default:
+ close(d.cancelCh)
+ }
}
d.cancelLock.Unlock()
@@ -330,11 +323,11 @@ func (d *Downloader) Cancel() bool {
d.importQueue = nil
d.importDone = 0
d.importLock.Unlock()
-
- return true
}
-// XXX Make synchronous
+// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
+// up until it finds a common ancestor. If the source peer times out, alternative
+// ones are tried for continuation.
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
var (
start = time.Now()
@@ -530,10 +523,13 @@ out:
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
break
}
- // All was successful, promote the peer
+ // All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+ if atomic.LoadInt32(&d.processing) == 0 {
+ go d.process()
+ }
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -709,6 +705,71 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
}
}
+// process takes blocks from the queue and tries to import them into the chain.
+func (d *Downloader) process() (err error) {
+ // Make sure only one goroutine is ever allowed to process blocks at once
+ if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
+ return
+ }
+ // If the processor just exited, but there are freshly pending items, try to
+ // reenter. This is needed because the goroutine spinned up for processing
+ // the fresh blocks might have been rejected entry to to this present thread
+ // not yet releasing the `processing` state.
+ defer func() {
+ if err == nil && d.queue.GetHeadBlock() != nil {
+ err = d.process()
+ }
+ }()
+ // Release the lock upon exit (note, before checking for reentry!)
+ defer atomic.StoreInt32(&d.processing, 0)
+
+ // Fetch the current cancel channel to allow termination
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
+
+ // Repeat the processing as long as there are blocks to import
+ for {
+ // Fetch the next batch of blocks
+ blocks := d.queue.TakeBlocks()
+ if len(blocks) == 0 {
+ return nil
+ }
+ // Reset the import statistics
+ d.importLock.Lock()
+ d.importStart = time.Now()
+ d.importQueue = blocks
+ d.importDone = 0
+ d.importLock.Unlock()
+
+ // Actually import the blocks
+ glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
+ for len(blocks) != 0 { // TODO: quit
+ // Check for any termination requests
+ select {
+ case <-cancel:
+ return errCancelChainImport
+ default:
+ }
+ // Retrieve the first batch of blocks to insert
+ max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
+ raw := make(types.Blocks, 0, max)
+ for _, block := range blocks[:max] {
+ raw = append(raw, block.RawBlock)
+ }
+ // Try to inset the blocks, drop the originating peer if there's an error
+ index, err := d.insertChain(raw)
+ if err != nil {
+ glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err)
+ d.dropPeer(blocks[index].OriginPeer)
+ d.Cancel()
+ return errCancelChainImport
+ }
+ blocks = blocks[max:]
+ }
+ }
+}
+
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 5b85f01fb..6cd141ef7 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -2,8 +2,10 @@ package downloader
import (
"encoding/binary"
+ "errors"
"fmt"
"math/big"
+ "sync/atomic"
"testing"
"time"
@@ -81,7 +83,7 @@ func newTester() *downloadTester {
peerBlocks: make(map[string]map[common.Hash]*types.Block),
}
var mux event.TypeMux
- downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.dropPeer)
+ downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer)
tester.downloader = downloader
return tester
@@ -89,44 +91,14 @@ func newTester() *downloadTester {
// sync starts synchronizing with a remote peer, blocking until it completes.
func (dl *downloadTester) sync(id string) error {
- return dl.downloader.synchronise(id, dl.peerHashes[id][0])
-}
-
-// syncTake is starts synchronising with a remote peer, but concurrently it also
-// starts fetching blocks that the downloader retrieved. IT blocks until both go
-// routines terminate.
-func (dl *downloadTester) syncTake(id string) ([]*Block, error) {
- // Start a block collector to take blocks as they become available
- done := make(chan struct{})
- took := []*Block{}
- go func() {
- for running := true; running; {
- select {
- case <-done:
- running = false
- default:
- time.Sleep(time.Millisecond)
- }
- // Take a batch of blocks and accumulate
- blocks := dl.downloader.TakeBlocks()
- for _, block := range blocks {
- dl.ownHashes = append(dl.ownHashes, block.RawBlock.Hash())
- dl.ownBlocks[block.RawBlock.Hash()] = block.RawBlock
- }
- took = append(took, blocks...)
- }
- done <- struct{}{}
- }()
- // Start the downloading, sync the taker and return
- err := dl.sync(id)
-
- done <- struct{}{}
- <-done
-
- return took, err
+ err := dl.downloader.synchronise(id, dl.peerHashes[id][0])
+ for atomic.LoadInt32(&dl.downloader.processing) == 1 {
+ time.Sleep(time.Millisecond)
+ }
+ return err
}
-// hasBlock checks if a block is present in the testers canonical chain.
+// hasBlock checks if a block is pres ent in the testers canonical chain.
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
return dl.getBlock(hash) != nil
}
@@ -136,6 +108,18 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
return dl.ownBlocks[hash]
}
+// insertChain injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
+ for i, block := range blocks {
+ if _, ok := dl.ownBlocks[block.ParentHash()]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ dl.ownHashes = append(dl.ownHashes, block.Hash())
+ dl.ownBlocks[block.Hash()] = block
+ }
+ return len(blocks), nil
+}
+
// newPeer registers a new block download source into the downloader.
func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id))
@@ -223,27 +207,8 @@ func TestSynchronisation(t *testing.T) {
if err := tester.sync("peer"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks {
- t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks)
- }
-}
-
-// Tests that the synchronized blocks can be correctly retrieved.
-func TestBlockTaking(t *testing.T) {
- // Create a small enough block chain to download and the tester
- targetBlocks := blockCacheLimit - 15
- hashes := createHashes(targetBlocks, knownHash)
- blocks := createBlocksFromHashes(hashes)
-
- tester := newTester()
- tester.newPeer("peer", hashes, blocks)
-
- // Synchronise with the peer and test block retrieval
- if err := tester.sync("peer"); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
- }
- if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks {
- t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks)
+ if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
+ t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
}
}
@@ -270,21 +235,21 @@ func TestCancel(t *testing.T) {
tester := newTester()
tester.newPeer("peer", hashes, blocks)
+ // Make sure canceling works with a pristine downloader
+ tester.downloader.Cancel()
+ hashCount, blockCount := tester.downloader.queue.Size()
+ if hashCount > 0 || blockCount > 0 {
+ t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
+ }
// Synchronise with the peer, but cancel afterwards
if err := tester.sync("peer"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if !tester.downloader.Cancel() {
- t.Fatalf("cancel operation failed")
- }
- // Make sure the queue reports empty and no blocks can be taken
- hashCount, blockCount := tester.downloader.queue.Size()
+ tester.downloader.Cancel()
+ hashCount, blockCount = tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
}
- if took := tester.downloader.TakeBlocks(); len(took) != 0 {
- t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0)
- }
}
// Tests that if a large batch of blocks are being downloaded, it is throttled
@@ -298,29 +263,46 @@ func TestThrottling(t *testing.T) {
tester := newTester()
tester.newPeer("peer", hashes, blocks)
+ // Wrap the importer to allow stepping
+ done := make(chan int)
+ tester.downloader.insertChain = func(blocks types.Blocks) (int, error) {
+ n, err := tester.insertChain(blocks)
+ done <- n
+ return n, err
+ }
// Start a synchronisation concurrently
errc := make(chan error)
go func() {
errc <- tester.sync("peer")
}()
// Iteratively take some blocks, always checking the retrieval count
- for total := 0; total < targetBlocks; {
- // Wait a bit for sync to complete
+ for len(tester.ownBlocks) < targetBlocks+1 {
+ // Wait a bit for sync to throttle itself
+ var cached int
for start := time.Now(); time.Since(start) < 3*time.Second; {
time.Sleep(25 * time.Millisecond)
- if len(tester.downloader.queue.blockPool) == blockCacheLimit {
+
+ cached = len(tester.downloader.queue.blockPool)
+ if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 {
break
}
}
- // Fetch the next batch of blocks
- took := tester.downloader.TakeBlocks()
- if len(took) != blockCacheLimit {
- t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit)
+ // Make sure we filled up the cache, then exhaust it
+ time.Sleep(25 * time.Millisecond) // give it a chance to screw up
+ if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 {
+ t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit)
}
- total += len(took)
- if total > targetBlocks {
- t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks)
+ <-done // finish previous blocking import
+ for cached > maxBlockProcess {
+ cached -= <-done
}
+ time.Sleep(25 * time.Millisecond) // yield to the insertion
+ }
+ <-done // finish the last blocking import
+
+ // Check that we haven't pulled more blocks than available
+ if len(tester.ownBlocks) > targetBlocks+1 {
+ t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1)
}
if err := <-errc; err != nil {
t.Fatalf("block synchronization failed: %v", err)
@@ -343,28 +325,18 @@ func TestNonExistingParentAttack(t *testing.T) {
tester.newPeer("attack", hashes, blocks)
// Try and sync with the malicious node and check that it fails
- if err := tester.sync("attack"); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ if err := tester.sync("attack"); err == nil {
+ t.Fatalf("block synchronization succeeded")
}
- bs := tester.downloader.TakeBlocks()
- if len(bs) != 1 {
- t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
+ if tester.hasBlock(hashes[0]) {
+ t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]])
}
- if tester.hasBlock(bs[0].RawBlock.ParentHash()) {
- t.Fatalf("tester knows about the unknown hash")
- }
- tester.downloader.Cancel()
-
// Try to synchronize with the valid chain and make sure it succeeds
if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- bs = tester.downloader.TakeBlocks()
- if len(bs) != 1 {
- t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
- }
- if !tester.hasBlock(bs[0].RawBlock.ParentHash()) {
- t.Fatalf("tester doesn't know about the origin hash")
+ if !tester.hasBlock(tester.peerHashes["valid"][0]) {
+ t.Fatalf("tester didn't accept known-parent block: %v", tester.peerBlocks["valid"][hashes[0]])
}
}
@@ -442,11 +414,11 @@ func TestInvalidHashOrderAttack(t *testing.T) {
tester.newPeer("attack", hashes, blocks)
// Try and sync with the malicious node and check that it fails
- if _, err := tester.syncTake("attack"); err != errInvalidChain {
+ if err := tester.sync("attack"); err != errInvalidChain {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
}
// Ensure that a valid chain can still pass sync
- if _, err := tester.syncTake("valid"); err != nil {
+ if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
@@ -466,11 +438,11 @@ func TestMadeupHashChainAttack(t *testing.T) {
tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil)
// Try and sync with the malicious node and check that it fails
- if _, err := tester.syncTake("attack"); err != errCrossCheckFailed {
+ if err := tester.sync("attack"); err != errCrossCheckFailed {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
}
// Ensure that a valid chain can still pass sync
- if _, err := tester.syncTake("valid"); err != nil {
+ if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
@@ -487,7 +459,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) {
// Try and sync with the attacker, one hash at a time
tester.maxHashFetch = 1
tester.newPeer("attack", hashes, nil)
- if _, err := tester.syncTake("attack"); err != errStallingPeer {
+ if err := tester.sync("attack"); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
}
@@ -512,7 +484,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
// Try and sync with the malicious node and check that it fails
tester := newTester()
tester.newPeer("attack", gapped, blocks)
- if _, err := tester.syncTake("attack"); err != errCrossCheckFailed {
+ if err := tester.sync("attack"); err != errCrossCheckFailed {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
}
// Ensure that a valid chain can still pass sync
@@ -520,7 +492,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
crossCheckCycle = defaultCrossCheckCycle
tester.newPeer("valid", hashes, blocks)
- if _, err := tester.syncTake("valid"); err != nil {
+ if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
@@ -548,14 +520,14 @@ func TestMadeupParentBlockChainAttack(t *testing.T) {
tester.newPeer("attack", hashes, blocks)
// Try and sync with the malicious node and check that it fails
- if _, err := tester.syncTake("attack"); err != errCrossCheckFailed {
+ if err := tester.sync("attack"); err != errCrossCheckFailed {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
}
// Ensure that a valid chain can still pass sync
blockSoftTTL = defaultBlockTTL
crossCheckCycle = defaultCrossCheckCycle
- if _, err := tester.syncTake("valid"); err != nil {
+ if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
@@ -582,7 +554,7 @@ func TestBannedChainStarvationAttack(t *testing.T) {
// the head of the invalid chain is blocked too.
for banned := tester.downloader.banned.Size(); ; {
// Try to sync with the attacker, check hash chain failure
- if _, err := tester.syncTake("attack"); err != errInvalidChain {
+ if err := tester.sync("attack"); err != errInvalidChain {
if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead {
break
}
@@ -603,7 +575,7 @@ func TestBannedChainStarvationAttack(t *testing.T) {
t.Fatalf("banned attacker registered: %v", peer)
}
// Ensure that a valid chain can still pass sync
- if _, err := tester.syncTake("valid"); err != nil {
+ if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
@@ -637,7 +609,7 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
// the head of the invalid chain is blocked too.
for {
// Try to sync with the attacker, check hash chain failure
- if _, err := tester.syncTake("attack"); err != errInvalidChain {
+ if err := tester.sync("attack"); err != errInvalidChain {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
}
// Short circuit if the entire chain was banned
@@ -658,33 +630,34 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
MaxBlockFetch = defaultMaxBlockFetch
maxBannedHashes = defaultMaxBannedHashes
- if _, err := tester.syncTake("valid"); err != nil {
+ if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
-func TestAttackerDropping(t *testing.T) {
- // Define the disconnection requirement for individual errors
+func TestHashAttackerDropping(t *testing.T) {
+ // Define the disconnection requirement for individual hash fetch errors
tests := []struct {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()
@@ -706,3 +679,38 @@ func TestAttackerDropping(t *testing.T) {
}
}
}
+
+// Tests that feeding bad blocks will result in a peer drop.
+func TestBlockAttackerDropping(t *testing.T) {
+ // Define the disconnection requirement for individual block import errors
+ tests := []struct {
+ failure bool
+ drop bool
+ }{{true, true}, {false, false}}
+
+ // Run the tests and check disconnection status
+ tester := newTester()
+ for i, tt := range tests {
+ // Register a new peer and ensure it's presence
+ id := fmt.Sprintf("test %d", i)
+ if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil {
+ t.Fatalf("test %d: failed to register new peer: %v", i, err)
+ }
+ if _, ok := tester.peerHashes[id]; !ok {
+ t.Fatalf("test %d: registered peer not found", i)
+ }
+ // Assemble a good or bad block, depending of the test
+ raw := createBlock(1, knownHash, common.Hash{})
+ if tt.failure {
+ raw = createBlock(1, unknownHash, common.Hash{})
+ }
+ block := &Block{OriginPeer: id, RawBlock: raw}
+
+ // Simulate block processing and check the result
+ tester.downloader.queue.blockCache[0] = block
+ tester.downloader.process()
+ if _, ok := tester.peerHashes[id]; !ok != tt.drop {
+ t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop)
+ }
+ }
+}
diff --git a/eth/handler.go b/eth/handler.go
index ac7fb8fcf..ec4f2d53a 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -80,7 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
- manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer)
+ manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
manager.SubProtocol = p2p.Protocol{
Name: "eth",
Version: uint(protocolVersion),
diff --git a/eth/sync.go b/eth/sync.go
index b127ca979..88a76805c 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -1,9 +1,7 @@
package eth
import (
- "math"
"math/rand"
- "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -15,12 +13,10 @@ import (
const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
- blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
- blockProcAmount = 256
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
@@ -254,10 +250,10 @@ func (pm *ProtocolManager) fetcher() {
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as retrieving cached ones.
func (pm *ProtocolManager) syncer() {
- forceSync := time.Tick(forceSyncCycle)
- blockProc := time.Tick(blockProcCycle)
- blockProcPend := int32(0)
+ // Abort any pending syncs if we terminate
+ defer pm.downloader.Cancel()
+ forceSync := time.Tick(forceSyncCycle)
for {
select {
case <-pm.newPeerCh:
@@ -271,55 +267,12 @@ func (pm *ProtocolManager) syncer() {
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
- case <-blockProc:
- // Try to pull some blocks from the downloaded
- if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) {
- go func() {
- pm.processBlocks()
- atomic.StoreInt32(&blockProcPend, 0)
- }()
- }
-
case <-pm.quitSync:
return
}
}
}
-// processBlocks retrieves downloaded blocks from the download cache and tries
-// to construct the local block chain with it. Note, since the block retrieval
-// order matters, access to this function *must* be synchronized/serialized.
-func (pm *ProtocolManager) processBlocks() error {
- pm.wg.Add(1)
- defer pm.wg.Done()
-
- // Short circuit if no blocks are available for insertion
- blocks := pm.downloader.TakeBlocks()
- if len(blocks) == 0 {
- return nil
- }
- glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
-
- for len(blocks) != 0 && !pm.quit {
- // Retrieve the first batch of blocks to insert
- max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
- raw := make(types.Blocks, 0, max)
- for _, block := range blocks[:max] {
- raw = append(raw, block.RawBlock)
- }
- // Try to inset the blocks, drop the originating peer if there's an error
- index, err := pm.chainman.InsertChain(raw)
- if err != nil {
- glog.V(logger.Debug).Infoln("Downloaded block import failed:", err)
- pm.removePeer(blocks[index].OriginPeer)
- pm.downloader.Cancel()
- return err
- }
- blocks = blocks[max:]
- }
- return nil
-}
-
// synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (pm *ProtocolManager) synchronise(peer *peer) {