aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go89
-rw-r--r--eth/downloader/downloader_test.go79
2 files changed, 101 insertions, 67 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 306c4fd2d..39976aae1 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,3 +1,4 @@
+// Package downloader contains the manual full chain synchronisation.
package downloader
import (
@@ -33,23 +34,22 @@ var (
)
var (
- errBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errStallingPeer = errors.New("peer is stalling")
- errBannedHead = errors.New("peer head hash already banned")
- errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
- errTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errInvalidChain = errors.New("retrieved hash chain is invalid")
- errCrossCheckFailed = errors.New("block cross-check failed")
- errCancelHashFetch = errors.New("hash fetching canceled (requested)")
- errCancelBlockFetch = errors.New("block downloading canceled (requested)")
- errCancelChainImport = errors.New("chain importing canceled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errBannedHead = errors.New("peer head hash already banned")
+ errNoPeers = errors.New("no peers to keep download active")
+ errPendingQueue = errors.New("pending items in queue")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errCrossCheckFailed = errors.New("block cross-check failed")
+ errCancelHashFetch = errors.New("hash fetching canceled (requested)")
+ errCancelBlockFetch = errors.New("block downloading canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
@@ -87,6 +87,8 @@ type Downloader struct {
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
banned *set.Set // Set of hashes we've received and banned
+ interrupt int32 // Atomic boolean to signal termination
+
// Statistics
importStart time.Time // Instance when the last blocks were taken from the cache
importQueue []*Block // Previously taken blocks to check import progress
@@ -97,7 +99,7 @@ type Downloader struct {
hasBlock hashCheckFn // Checks if a block is present in the chain
getBlock blockRetrievalFn // Retrieves a block from the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
- dropPeer peerDropFn // Retrieved the TD of our own chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -245,12 +247,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
}
-
- // Create cancel channel for aborting mid-flight
- d.cancelLock.Lock()
- d.cancelCh = make(chan struct{})
- d.cancelLock.Unlock()
-
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
return errPendingQueue
@@ -260,12 +256,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
d.peers.Reset()
d.checks = make(map[common.Hash]*crossCheck)
+ // Create cancel channel for aborting mid-flight
+ d.cancelLock.Lock()
+ d.cancelCh = make(chan struct{})
+ d.cancelLock.Unlock()
+
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
-
return d.syncWithPeer(p, hash)
}
@@ -282,7 +282,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
defer func() {
// reset on error
if err != nil {
- d.Cancel()
+ d.cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -301,9 +301,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
return nil
}
-// Cancel cancels all of the operations and resets the queue. It returns true
+// cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
-func (d *Downloader) Cancel() {
+func (d *Downloader) cancel() {
// Close the current cancel channel
d.cancelLock.Lock()
if d.cancelCh != nil {
@@ -320,6 +320,12 @@ func (d *Downloader) Cancel() {
d.queue.Reset()
}
+// Terminate interrupts the downloader, canceling all pending operations.
+func (d *Downloader) Terminate() {
+ atomic.StoreInt32(&d.interrupt, 1)
+ d.cancel()
+}
+
// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
// up until it finds a common ancestor. If the source peer times out, alternative
// ones are tried for continuation.
@@ -548,6 +554,7 @@ out:
peer.Demote()
peer.SetIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
+ go d.process()
}
}
@@ -712,7 +719,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
// between these state changes, a block may have arrived, but a processing
// attempt denied, so we need to re-enter to ensure the block isn't left
// to idle in the cache.
-func (d *Downloader) process() (err error) {
+func (d *Downloader) process() {
// Make sure only one goroutine is ever allowed to process blocks at once
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
return
@@ -722,8 +729,8 @@ func (d *Downloader) process() (err error) {
// the fresh blocks might have been rejected entry to to this present thread
// not yet releasing the `processing` state.
defer func() {
- if err == nil && d.queue.GetHeadBlock() != nil {
- err = d.process()
+ if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
+ d.process()
}
}()
// Release the lock upon exit (note, before checking for reentry!), and set
@@ -736,18 +743,12 @@ func (d *Downloader) process() (err error) {
atomic.StoreInt32(&d.processing, 0)
}()
-
- // Fetch the current cancel channel to allow termination
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
// Repeat the processing as long as there are blocks to import
for {
// Fetch the next batch of blocks
blocks := d.queue.TakeBlocks()
if len(blocks) == 0 {
- return nil
+ return
}
// Reset the import statistics
d.importLock.Lock()
@@ -758,12 +759,10 @@ func (d *Downloader) process() (err error) {
// Actually import the blocks
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
- for len(blocks) != 0 { // TODO: quit
+ for len(blocks) != 0 {
// Check for any termination requests
- select {
- case <-cancel:
- return errCancelChainImport
- default:
+ if atomic.LoadInt32(&d.interrupt) == 1 {
+ return
}
// Retrieve the first batch of blocks to insert
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
@@ -776,8 +775,8 @@ func (d *Downloader) process() (err error) {
if err != nil {
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
d.dropPeer(blocks[index].OriginPeer)
- d.Cancel()
- return errCancelChainImport
+ d.cancel()
+ return
}
blocks = blocks[max:]
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index f71c16237..4fc4e1434 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -52,6 +52,8 @@ func copyBlock(block *types.Block) *types.Block {
return createBlock(int(block.Number().Int64()), block.ParentHeaderHash, block.HeaderHash)
}
+// createBlocksFromHashes assembles a collection of blocks, each having a correct
+// place in the given hash chain.
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
blocks := make(map[common.Hash]*types.Block)
for i := 0; i < len(hashes); i++ {
@@ -64,6 +66,7 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
return blocks
}
+// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
downloader *Downloader
@@ -75,6 +78,7 @@ type downloadTester struct {
maxHashFetch int // Overrides the maximum number of retrieved hashes
}
+// newTester creates a new downloader test mocker.
func newTester() *downloadTester {
tester := &downloadTester{
ownHashes: []common.Hash{knownHash},
@@ -82,9 +86,7 @@ func newTester() *downloadTester {
peerHashes: make(map[string][]common.Hash),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
}
- var mux event.TypeMux
- downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer)
- tester.downloader = downloader
+ tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer)
return tester
}
@@ -247,7 +249,7 @@ func TestCancel(t *testing.T) {
tester.newPeer("peer", hashes, blocks)
// Make sure canceling works with a pristine downloader
- tester.downloader.Cancel()
+ tester.downloader.cancel()
hashCount, blockCount := tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
@@ -256,7 +258,7 @@ func TestCancel(t *testing.T) {
if err := tester.sync("peer"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- tester.downloader.Cancel()
+ tester.downloader.cancel()
hashCount, blockCount = tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
@@ -359,7 +361,7 @@ func TestSlowSynchronisation(t *testing.T) {
// Create a batch of blocks, with a slow and a full speed peer
targetCycles := 2
targetBlocks := targetCycles*blockCacheLimit - 15
- targetIODelay := 500 * time.Millisecond
+ targetIODelay := time.Second
hashes := createHashes(targetBlocks, knownHash)
blocks := createBlocksFromHashes(hashes)
@@ -708,6 +710,40 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
}
}
+// Tests a corner case (potential attack) where a peer delivers both good as well
+// as unrequested blocks to a hash request. This may trigger a different code
+// path than the fully correct or fully invalid delivery, potentially causing
+// internal state problems
+//
+// No, don't delete this test, it actually did happen!
+func TestOverlappingDeliveryAttack(t *testing.T) {
+ // Create an arbitrary batch of blocks ( < cache-size not to block)
+ targetBlocks := blockCacheLimit - 23
+ hashes := createHashes(targetBlocks, knownHash)
+ blocks := createBlocksFromHashes(hashes)
+
+ // Register an attacker that always returns non-requested blocks too
+ tester := newTester()
+ tester.newPeer("attack", hashes, blocks)
+
+ rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks
+ tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error {
+ // Add a non requested hash the screw the delivery (genesis should be fine)
+ return rawGetBlocks(append(request, hashes[0]))
+ }
+ // Test that synchronisation can complete, check for import success
+ if err := tester.sync("attack"); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ start := time.Now()
+ for len(tester.ownHashes) != len(hashes) && time.Since(start) < time.Second {
+ time.Sleep(50 * time.Millisecond)
+ }
+ if len(tester.ownHashes) != len(hashes) {
+ t.Fatalf("chain length mismatch: have %v, want %v", len(tester.ownHashes), len(hashes))
+ }
+}
+
// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
func TestHashAttackerDropping(t *testing.T) {
// Define the disconnection requirement for individual hash fetch errors
@@ -715,22 +751,21 @@ func TestHashAttackerDropping(t *testing.T) {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()