diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-12 18:35:29 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-06-15 14:22:37 +0800 |
commit | fc7abd98865f3bdc6cc36258026db98a649cd577 (patch) | |
tree | 0bca7e6e40445df3f21304e84b6df4dd5d3e7bf5 /eth | |
parent | 0fc71877a7d7a46f35147f753cba0de7b937c77a (diff) | |
download | go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.tar go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.gz go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.bz2 go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.lz go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.xz go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.tar.zst go-tangerine-fc7abd98865f3bdc6cc36258026db98a649cd577.zip |
eth, eth/downloader: move block processing into the downlaoder
Diffstat (limited to 'eth')
-rw-r--r-- | eth/downloader/downloader.go | 177 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 252 | ||||
-rw-r--r-- | eth/handler.go | 2 | ||||
-rw-r--r-- | eth/sync.go | 53 |
4 files changed, 253 insertions, 231 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 88ceeb5ac..1bbba11ed 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -3,6 +3,7 @@ package downloader import ( "bytes" "errors" + "math" "math/rand" "sync" "sync/atomic" @@ -28,25 +29,27 @@ var ( crossCheckCycle = time.Second // Period after which to check for expired cross checks maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out + maxBlockProcess = 256 // Number of blocks to import at once into the chain ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errBannedHead = errors.New("peer head hash already banned") - errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errCrossCheckFailed = errors.New("block cross-check failed") - errCancelHashFetch = errors.New("hash fetching cancelled (requested)") - errCancelBlockFetch = errors.New("block downloading cancelled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errBannedHead = errors.New("peer head hash already banned") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errCrossCheckFailed = errors.New("block cross-check failed") + errCancelHashFetch = errors.New("hash fetching canceled (requested)") + errCancelBlockFetch = errors.New("block downloading canceled (requested)") + errCancelChainImport = errors.New("chain importing canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) // hashCheckFn is a callback type for verifying a hash's presence in the local chain. @@ -55,6 +58,9 @@ type hashCheckFn func(common.Hash) bool // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block +// chainInsertFn is a callback type to insert a batch of blocks into the local chain. +type chainInsertFn func(types.Blocks) (int, error) + // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) @@ -88,13 +94,15 @@ type Downloader struct { importLock sync.Mutex // Callbacks - hasBlock hashCheckFn // Checks if a block is present in the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - dropPeer peerDropFn // Retrieved the TD of our own chain + hasBlock hashCheckFn // Checks if a block is present in the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Retrieved the TD of our own chain // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 + processing int32 notified int32 // Channels @@ -113,18 +121,19 @@ type Block struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasBlock: hasBlock, - getBlock: getBlock, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + hasBlock: hasBlock, + getBlock: getBlock, + insertChain: insertChain, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan hashPack, 1), + blockCh: make(chan blockPack, 1), } // Inject all the known bad hashes downloader.banned = set.New() @@ -157,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t return } -// Synchronising returns the state of the downloader +// Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { return atomic.LoadInt32(&d.synchronising) > 0 } @@ -260,19 +269,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { return d.syncWithPeer(p, hash) } -// TakeBlocks takes blocks from the queue and yields them to the caller. -func (d *Downloader) TakeBlocks() []*Block { - blocks := d.queue.TakeBlocks() - if len(blocks) > 0 { - d.importLock.Lock() - d.importStart = time.Now() - d.importQueue = blocks - d.importDone = 0 - d.importLock.Unlock() - } - return blocks -} - // Has checks if the downloader knows about a particular hash, meaning that its // either already downloaded of pending retrieval. func (d *Downloader) Has(hash common.Hash) bool { @@ -307,19 +303,16 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. -func (d *Downloader) Cancel() bool { - // If we're not syncing just return. - hs, bs := d.queue.Size() - if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { - return false - } +func (d *Downloader) Cancel() { // Close the current cancel channel d.cancelLock.Lock() - select { - case <-d.cancelCh: - // Channel was already closed - default: - close(d.cancelCh) + if d.cancelCh != nil { + select { + case <-d.cancelCh: + // Channel was already closed + default: + close(d.cancelCh) + } } d.cancelLock.Unlock() @@ -330,11 +323,11 @@ func (d *Downloader) Cancel() bool { d.importQueue = nil d.importDone = 0 d.importLock.Unlock() - - return true } -// XXX Make synchronous +// fetchHahes starts retrieving hashes backwards from a specific peer and hash, +// up until it finds a common ancestor. If the source peer times out, alternative +// ones are tried for continuation. func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { var ( start = time.Now() @@ -530,10 +523,13 @@ out: glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } - // All was successful, promote the peer + // All was successful, promote the peer and potentially start processing peer.Promote() peer.SetIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + if atomic.LoadInt32(&d.processing) == 0 { + go d.process() + } case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -709,6 +705,71 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { } } +// process takes blocks from the queue and tries to import them into the chain. +func (d *Downloader) process() (err error) { + // Make sure only one goroutine is ever allowed to process blocks at once + if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { + return + } + // If the processor just exited, but there are freshly pending items, try to + // reenter. This is needed because the goroutine spinned up for processing + // the fresh blocks might have been rejected entry to to this present thread + // not yet releasing the `processing` state. + defer func() { + if err == nil && d.queue.GetHeadBlock() != nil { + err = d.process() + } + }() + // Release the lock upon exit (note, before checking for reentry!) + defer atomic.StoreInt32(&d.processing, 0) + + // Fetch the current cancel channel to allow termination + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + // Repeat the processing as long as there are blocks to import + for { + // Fetch the next batch of blocks + blocks := d.queue.TakeBlocks() + if len(blocks) == 0 { + return nil + } + // Reset the import statistics + d.importLock.Lock() + d.importStart = time.Now() + d.importQueue = blocks + d.importDone = 0 + d.importLock.Unlock() + + // Actually import the blocks + glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) + for len(blocks) != 0 { // TODO: quit + // Check for any termination requests + select { + case <-cancel: + return errCancelChainImport + default: + } + // Retrieve the first batch of blocks to insert + max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) + raw := make(types.Blocks, 0, max) + for _, block := range blocks[:max] { + raw = append(raw, block.RawBlock) + } + // Try to inset the blocks, drop the originating peer if there's an error + index, err := d.insertChain(raw) + if err != nil { + glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err) + d.dropPeer(blocks[index].OriginPeer) + d.Cancel() + return errCancelChainImport + } + blocks = blocks[max:] + } + } +} + // DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5b85f01fb..6cd141ef7 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -2,8 +2,10 @@ package downloader import ( "encoding/binary" + "errors" "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -81,7 +83,7 @@ func newTester() *downloadTester { peerBlocks: make(map[string]map[common.Hash]*types.Block), } var mux event.TypeMux - downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.dropPeer) + downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer) tester.downloader = downloader return tester @@ -89,44 +91,14 @@ func newTester() *downloadTester { // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string) error { - return dl.downloader.synchronise(id, dl.peerHashes[id][0]) -} - -// syncTake is starts synchronising with a remote peer, but concurrently it also -// starts fetching blocks that the downloader retrieved. IT blocks until both go -// routines terminate. -func (dl *downloadTester) syncTake(id string) ([]*Block, error) { - // Start a block collector to take blocks as they become available - done := make(chan struct{}) - took := []*Block{} - go func() { - for running := true; running; { - select { - case <-done: - running = false - default: - time.Sleep(time.Millisecond) - } - // Take a batch of blocks and accumulate - blocks := dl.downloader.TakeBlocks() - for _, block := range blocks { - dl.ownHashes = append(dl.ownHashes, block.RawBlock.Hash()) - dl.ownBlocks[block.RawBlock.Hash()] = block.RawBlock - } - took = append(took, blocks...) - } - done <- struct{}{} - }() - // Start the downloading, sync the taker and return - err := dl.sync(id) - - done <- struct{}{} - <-done - - return took, err + err := dl.downloader.synchronise(id, dl.peerHashes[id][0]) + for atomic.LoadInt32(&dl.downloader.processing) == 1 { + time.Sleep(time.Millisecond) + } + return err } -// hasBlock checks if a block is present in the testers canonical chain. +// hasBlock checks if a block is pres ent in the testers canonical chain. func (dl *downloadTester) hasBlock(hash common.Hash) bool { return dl.getBlock(hash) != nil } @@ -136,6 +108,18 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.ownBlocks[hash] } +// insertChain injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { + for i, block := range blocks { + if _, ok := dl.ownBlocks[block.ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + dl.ownHashes = append(dl.ownHashes, block.Hash()) + dl.ownBlocks[block.Hash()] = block + } + return len(blocks), nil +} + // newPeer registers a new block download source into the downloader. func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) @@ -223,27 +207,8 @@ func TestSynchronisation(t *testing.T) { if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { - t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks) - } -} - -// Tests that the synchronized blocks can be correctly retrieved. -func TestBlockTaking(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 - hashes := createHashes(targetBlocks, knownHash) - blocks := createBlocksFromHashes(hashes) - - tester := newTester() - tester.newPeer("peer", hashes, blocks) - - // Synchronise with the peer and test block retrieval - if err := tester.sync("peer"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { - t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks) + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } @@ -270,21 +235,21 @@ func TestCancel(t *testing.T) { tester := newTester() tester.newPeer("peer", hashes, blocks) + // Make sure canceling works with a pristine downloader + tester.downloader.Cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } // Synchronise with the peer, but cancel afterwards if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if !tester.downloader.Cancel() { - t.Fatalf("cancel operation failed") - } - // Make sure the queue reports empty and no blocks can be taken - hashCount, blockCount := tester.downloader.queue.Size() + tester.downloader.Cancel() + hashCount, blockCount = tester.downloader.queue.Size() if hashCount > 0 || blockCount > 0 { t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) } - if took := tester.downloader.TakeBlocks(); len(took) != 0 { - t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0) - } } // Tests that if a large batch of blocks are being downloaded, it is throttled @@ -298,29 +263,46 @@ func TestThrottling(t *testing.T) { tester := newTester() tester.newPeer("peer", hashes, blocks) + // Wrap the importer to allow stepping + done := make(chan int) + tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { + n, err := tester.insertChain(blocks) + done <- n + return n, err + } // Start a synchronisation concurrently errc := make(chan error) go func() { errc <- tester.sync("peer") }() // Iteratively take some blocks, always checking the retrieval count - for total := 0; total < targetBlocks; { - // Wait a bit for sync to complete + for len(tester.ownBlocks) < targetBlocks+1 { + // Wait a bit for sync to throttle itself + var cached int for start := time.Now(); time.Since(start) < 3*time.Second; { time.Sleep(25 * time.Millisecond) - if len(tester.downloader.queue.blockPool) == blockCacheLimit { + + cached = len(tester.downloader.queue.blockPool) + if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { break } } - // Fetch the next batch of blocks - took := tester.downloader.TakeBlocks() - if len(took) != blockCacheLimit { - t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit) + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) } - total += len(took) - if total > targetBlocks { - t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks) + <-done // finish previous blocking import + for cached > maxBlockProcess { + cached -= <-done } + time.Sleep(25 * time.Millisecond) // yield to the insertion + } + <-done // finish the last blocking import + + // Check that we haven't pulled more blocks than available + if len(tester.ownBlocks) > targetBlocks+1 { + t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) } if err := <-errc; err != nil { t.Fatalf("block synchronization failed: %v", err) @@ -343,28 +325,18 @@ func TestNonExistingParentAttack(t *testing.T) { tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - if err := tester.sync("attack"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + if err := tester.sync("attack"); err == nil { + t.Fatalf("block synchronization succeeded") } - bs := tester.downloader.TakeBlocks() - if len(bs) != 1 { - t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) + if tester.hasBlock(hashes[0]) { + t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]]) } - if tester.hasBlock(bs[0].RawBlock.ParentHash()) { - t.Fatalf("tester knows about the unknown hash") - } - tester.downloader.Cancel() - // Try to synchronize with the valid chain and make sure it succeeds if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs = tester.downloader.TakeBlocks() - if len(bs) != 1 { - t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) - } - if !tester.hasBlock(bs[0].RawBlock.ParentHash()) { - t.Fatalf("tester doesn't know about the origin hash") + if !tester.hasBlock(tester.peerHashes["valid"][0]) { + t.Fatalf("tester didn't accept known-parent block: %v", tester.peerBlocks["valid"][hashes[0]]) } } @@ -442,11 +414,11 @@ func TestInvalidHashOrderAttack(t *testing.T) { tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - if _, err := tester.syncTake("attack"); err != errInvalidChain { + if err := tester.sync("attack"); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -466,11 +438,11 @@ func TestMadeupHashChainAttack(t *testing.T) { tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil) // Try and sync with the malicious node and check that it fails - if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { + if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -487,7 +459,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 tester.newPeer("attack", hashes, nil) - if _, err := tester.syncTake("attack"); err != errStallingPeer { + if err := tester.sync("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -512,7 +484,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester() tester.newPeer("attack", gapped, blocks) - if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { + if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync @@ -520,7 +492,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = defaultCrossCheckCycle tester.newPeer("valid", hashes, blocks) - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -548,14 +520,14 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { + if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -582,7 +554,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { // the head of the invalid chain is blocked too. for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack"); err != errInvalidChain { + if err := tester.sync("attack"); err != errInvalidChain { if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead { break } @@ -603,7 +575,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { t.Fatalf("banned attacker registered: %v", peer) } // Ensure that a valid chain can still pass sync - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -637,7 +609,7 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { // the head of the invalid chain is blocked too. for { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack"); err != errInvalidChain { + if err := tester.sync("attack"); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Short circuit if the entire chain was banned @@ -658,33 +630,34 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { MaxBlockFetch = defaultMaxBlockFetch maxBannedHashes = defaultMaxBannedHashes - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } // Tests that misbehaving peers are disconnected, whilst behaving ones are not. -func TestAttackerDropping(t *testing.T) { - // Define the disconnection requirement for individual errors +func TestHashAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual hash fetch errors tests := []struct { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() @@ -706,3 +679,38 @@ func TestAttackerDropping(t *testing.T) { } } } + +// Tests that feeding bad blocks will result in a peer drop. +func TestBlockAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual block import errors + tests := []struct { + failure bool + drop bool + }{{true, true}, {false, false}} + + // Run the tests and check disconnection status + tester := newTester() + for i, tt := range tests { + // Register a new peer and ensure it's presence + id := fmt.Sprintf("test %d", i) + if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + t.Fatalf("test %d: failed to register new peer: %v", i, err) + } + if _, ok := tester.peerHashes[id]; !ok { + t.Fatalf("test %d: registered peer not found", i) + } + // Assemble a good or bad block, depending of the test + raw := createBlock(1, knownHash, common.Hash{}) + if tt.failure { + raw = createBlock(1, unknownHash, common.Hash{}) + } + block := &Block{OriginPeer: id, RawBlock: raw} + + // Simulate block processing and check the result + tester.downloader.queue.blockCache[0] = block + tester.downloader.process() + if _, ok := tester.peerHashes[id]; !ok != tt.drop { + t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop) + } + } +} diff --git a/eth/handler.go b/eth/handler.go index ac7fb8fcf..ec4f2d53a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -80,7 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer) + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), diff --git a/eth/sync.go b/eth/sync.go index b127ca979..88a76805c 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -1,9 +1,7 @@ package eth import ( - "math" "math/rand" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -15,12 +13,10 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockProcAmount = 256 // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. @@ -254,10 +250,10 @@ func (pm *ProtocolManager) fetcher() { // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as retrieving cached ones. func (pm *ProtocolManager) syncer() { - forceSync := time.Tick(forceSyncCycle) - blockProc := time.Tick(blockProcCycle) - blockProcPend := int32(0) + // Abort any pending syncs if we terminate + defer pm.downloader.Cancel() + forceSync := time.Tick(forceSyncCycle) for { select { case <-pm.newPeerCh: @@ -271,55 +267,12 @@ func (pm *ProtocolManager) syncer() { // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) - case <-blockProc: - // Try to pull some blocks from the downloaded - if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { - go func() { - pm.processBlocks() - atomic.StoreInt32(&blockProcPend, 0) - }() - } - case <-pm.quitSync: return } } } -// processBlocks retrieves downloaded blocks from the download cache and tries -// to construct the local block chain with it. Note, since the block retrieval -// order matters, access to this function *must* be synchronized/serialized. -func (pm *ProtocolManager) processBlocks() error { - pm.wg.Add(1) - defer pm.wg.Done() - - // Short circuit if no blocks are available for insertion - blocks := pm.downloader.TakeBlocks() - if len(blocks) == 0 { - return nil - } - glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) - - for len(blocks) != 0 && !pm.quit { - // Retrieve the first batch of blocks to insert - max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) - raw := make(types.Blocks, 0, max) - for _, block := range blocks[:max] { - raw = append(raw, block.RawBlock) - } - // Try to inset the blocks, drop the originating peer if there's an error - index, err := pm.chainman.InsertChain(raw) - if err != nil { - glog.V(logger.Debug).Infoln("Downloaded block import failed:", err) - pm.removePeer(blocks[index].OriginPeer) - pm.downloader.Cancel() - return err - } - blocks = blocks[max:] - } - return nil -} - // synchronise tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. func (pm *ProtocolManager) synchronise(peer *peer) { |