From 4e158ea3b851e3886f5682a78c69ce06e3690865 Mon Sep 17 00:00:00 2001
From: Sonic <sonic@dexon.org>
Date: Fri, 9 Nov 2018 14:47:57 +0800
Subject: dex: copy fetcher and downloader from eth

---
 dex/downloader/api.go             |  166 ++++
 dex/downloader/downloader.go      | 1684 +++++++++++++++++++++++++++++++++++++
 dex/downloader/downloader_test.go | 1481 ++++++++++++++++++++++++++++++++
 dex/downloader/events.go          |   21 +
 dex/downloader/fakepeer.go        |  161 ++++
 dex/downloader/metrics.go         |   43 +
 dex/downloader/modes.go           |   73 ++
 dex/downloader/peer.go            |  573 +++++++++++++
 dex/downloader/queue.go           |  885 +++++++++++++++++++
 dex/downloader/statesync.go       |  484 +++++++++++
 dex/downloader/testchain_test.go  |  221 +++++
 dex/downloader/types.go           |   79 ++
 dex/fetcher/fetcher.go            |  736 ++++++++++++++++
 dex/fetcher/fetcher_test.go       |  790 +++++++++++++++++
 dex/fetcher/metrics.go            |   43 +
 15 files changed, 7440 insertions(+)
 create mode 100644 dex/downloader/api.go
 create mode 100644 dex/downloader/downloader.go
 create mode 100644 dex/downloader/downloader_test.go
 create mode 100644 dex/downloader/events.go
 create mode 100644 dex/downloader/fakepeer.go
 create mode 100644 dex/downloader/metrics.go
 create mode 100644 dex/downloader/modes.go
 create mode 100644 dex/downloader/peer.go
 create mode 100644 dex/downloader/queue.go
 create mode 100644 dex/downloader/statesync.go
 create mode 100644 dex/downloader/testchain_test.go
 create mode 100644 dex/downloader/types.go
 create mode 100644 dex/fetcher/fetcher.go
 create mode 100644 dex/fetcher/fetcher_test.go
 create mode 100644 dex/fetcher/metrics.go

diff --git a/dex/downloader/api.go b/dex/downloader/api.go
new file mode 100644
index 000000000..721818e75
--- /dev/null
+++ b/dex/downloader/api.go
@@ -0,0 +1,166 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"context"
+	"sync"
+
+	ethereum "github.com/dexon-foundation/dexon"
+	"github.com/dexon-foundation/dexon/event"
+	"github.com/dexon-foundation/dexon/rpc"
+)
+
+// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
+// It offers only methods that operates on data that can be available to anyone without security risks.
+type PublicDownloaderAPI struct {
+	d                         *Downloader
+	mux                       *event.TypeMux
+	installSyncSubscription   chan chan interface{}
+	uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
+}
+
+// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
+// listens for events from the downloader through the global event mux. In case it receives one of
+// these events it broadcasts it to all syncing subscriptions that are installed through the
+// installSyncSubscription channel.
+func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
+	api := &PublicDownloaderAPI{
+		d:                         d,
+		mux:                       m,
+		installSyncSubscription:   make(chan chan interface{}),
+		uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
+	}
+
+	go api.eventLoop()
+
+	return api
+}
+
+// eventLoop runs a loop until the event mux closes. It will install and uninstall new
+// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
+func (api *PublicDownloaderAPI) eventLoop() {
+	var (
+		sub               = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+		syncSubscriptions = make(map[chan interface{}]struct{})
+	)
+
+	for {
+		select {
+		case i := <-api.installSyncSubscription:
+			syncSubscriptions[i] = struct{}{}
+		case u := <-api.uninstallSyncSubscription:
+			delete(syncSubscriptions, u.c)
+			close(u.uninstalled)
+		case event := <-sub.Chan():
+			if event == nil {
+				return
+			}
+
+			var notification interface{}
+			switch event.Data.(type) {
+			case StartEvent:
+				notification = &SyncingResult{
+					Syncing: true,
+					Status:  api.d.Progress(),
+				}
+			case DoneEvent, FailedEvent:
+				notification = false
+			}
+			// broadcast
+			for c := range syncSubscriptions {
+				c <- notification
+			}
+		}
+	}
+}
+
+// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
+	notifier, supported := rpc.NotifierFromContext(ctx)
+	if !supported {
+		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+	}
+
+	rpcSub := notifier.CreateSubscription()
+
+	go func() {
+		statuses := make(chan interface{})
+		sub := api.SubscribeSyncStatus(statuses)
+
+		for {
+			select {
+			case status := <-statuses:
+				notifier.Notify(rpcSub.ID, status)
+			case <-rpcSub.Err():
+				sub.Unsubscribe()
+				return
+			case <-notifier.Closed():
+				sub.Unsubscribe()
+				return
+			}
+		}
+	}()
+
+	return rpcSub, nil
+}
+
+// SyncingResult provides information about the current synchronisation status for this node.
+type SyncingResult struct {
+	Syncing bool                  `json:"syncing"`
+	Status  ethereum.SyncProgress `json:"status"`
+}
+
+// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
+type uninstallSyncSubscriptionRequest struct {
+	c           chan interface{}
+	uninstalled chan interface{}
+}
+
+// SyncStatusSubscription represents a syncing subscription.
+type SyncStatusSubscription struct {
+	api       *PublicDownloaderAPI // register subscription in event loop of this api instance
+	c         chan interface{}     // channel where events are broadcasted to
+	unsubOnce sync.Once            // make sure unsubscribe logic is executed once
+}
+
+// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
+// The status channel that was passed to subscribeSyncStatus isn't used anymore
+// after this method returns.
+func (s *SyncStatusSubscription) Unsubscribe() {
+	s.unsubOnce.Do(func() {
+		req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
+		s.api.uninstallSyncSubscription <- &req
+
+		for {
+			select {
+			case <-s.c:
+				// drop new status events until uninstall confirmation
+				continue
+			case <-req.uninstalled:
+				return
+			}
+		}
+	})
+}
+
+// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
+// The given channel must receive interface values, the result can either
+func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
+	api.installSyncSubscription <- status
+	return &SyncStatusSubscription{api: api, c: status}
+}
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go
new file mode 100644
index 000000000..0383a3709
--- /dev/null
+++ b/dex/downloader/downloader.go
@@ -0,0 +1,1684 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package downloader contains the manual full chain synchronisation.
+package downloader
+
+import (
+	"errors"
+	"fmt"
+	"math/big"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	ethereum "github.com/dexon-foundation/dexon"
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/core/rawdb"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/ethdb"
+	"github.com/dexon-foundation/dexon/event"
+	"github.com/dexon-foundation/dexon/log"
+	"github.com/dexon-foundation/dexon/metrics"
+	"github.com/dexon-foundation/dexon/params"
+)
+
+var (
+	MaxHashFetch    = 512 // Amount of hashes to be fetched per retrieval request
+	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
+	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
+	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
+	MaxBodyFetch    = 128 // Amount of block bodies to be fetched per retrieval request
+	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
+	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
+
+	MaxForkAncestry  = 3 * params.EpochDuration // Maximum chain reorganisation
+	rttMinEstimate   = 2 * time.Second          // Minimum round-trip time to target for download requests
+	rttMaxEstimate   = 20 * time.Second         // Maximum round-trip time to target for download requests
+	rttMinConfidence = 0.1                      // Worse confidence factor in our estimated RTT value
+	ttlScaling       = 3                        // Constant scaling factor for RTT -> TTL conversion
+	ttlLimit         = time.Minute              // Maximum TTL allowance to prevent reaching crazy timeouts
+
+	qosTuningPeers   = 5    // Number of peers to tune based on (best peers)
+	qosConfidenceCap = 10   // Number of peers above which not to modify RTT confidence
+	qosTuningImpact  = 0.25 // Impact that a new tuning target has on the previous value
+
+	maxQueuedHeaders  = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+	maxHeadersProcess = 2048      // Number of header download results to import at once into the chain
+	maxResultsProcess = 2048      // Number of content download results to import at once into the chain
+
+	reorgProtThreshold   = 48 // Threshold number of recent blocks to disable mini reorg protection
+	reorgProtHeaderDelay = 2  // Number of headers to delay delivering to cover mini reorgs
+
+	fsHeaderCheckFrequency = 100             // Verification frequency of the downloaded headers during fast sync
+	fsHeaderSafetyNet      = 2048            // Number of headers to discard in case a chain violation is detected
+	fsHeaderForceVerify    = 24              // Number of headers to verify before and after the pivot to accept it
+	fsHeaderContCheck      = 3 * time.Second // Time interval to check for header continuations during state download
+	fsMinFullBlocks        = 64              // Number of blocks to retrieve fully even in fast sync
+)
+
+var (
+	errBusy                    = errors.New("busy")
+	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
+	errBadPeer                 = errors.New("action from bad peer ignored")
+	errStallingPeer            = errors.New("peer is stalling")
+	errNoPeers                 = errors.New("no peers to keep download active")
+	errTimeout                 = errors.New("timeout")
+	errEmptyHeaderSet          = errors.New("empty header set by peer")
+	errPeersUnavailable        = errors.New("no peers available or all tried for download")
+	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
+	errInvalidChain            = errors.New("retrieved hash chain is invalid")
+	errInvalidBlock            = errors.New("retrieved block is invalid")
+	errInvalidBody             = errors.New("retrieved block body is invalid")
+	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
+	errCancelBlockFetch        = errors.New("block download canceled (requested)")
+	errCancelHeaderFetch       = errors.New("block header download canceled (requested)")
+	errCancelBodyFetch         = errors.New("block body download canceled (requested)")
+	errCancelReceiptFetch      = errors.New("receipt download canceled (requested)")
+	errCancelStateFetch        = errors.New("state data download canceled (requested)")
+	errCancelHeaderProcessing  = errors.New("header processing canceled (requested)")
+	errCancelContentProcessing = errors.New("content processing canceled (requested)")
+	errNoSyncActive            = errors.New("no sync active")
+	errTooOld                  = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
+)
+
+type Downloader struct {
+	mode SyncMode       // Synchronisation mode defining the strategy used (per sync cycle)
+	mux  *event.TypeMux // Event multiplexer to announce sync operation events
+
+	queue   *queue   // Scheduler for selecting the hashes to download
+	peers   *peerSet // Set of active peers from which download can proceed
+	stateDB ethdb.Database
+
+	rttEstimate   uint64 // Round trip time to target for download requests
+	rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
+
+	// Statistics
+	syncStatsChainOrigin uint64 // Origin block number where syncing started at
+	syncStatsChainHeight uint64 // Highest block number known when syncing started
+	syncStatsState       stateSyncStats
+	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
+
+	lightchain LightChain
+	blockchain BlockChain
+
+	// Callbacks
+	dropPeer peerDropFn // Drops a peer for misbehaving
+
+	// Status
+	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
+	synchronising   int32
+	notified        int32
+	committed       int32
+
+	// Channels
+	headerCh      chan dataPack        // [eth/62] Channel receiving inbound block headers
+	bodyCh        chan dataPack        // [eth/62] Channel receiving inbound block bodies
+	receiptCh     chan dataPack        // [eth/63] Channel receiving inbound receipts
+	bodyWakeCh    chan bool            // [eth/62] Channel to signal the block body fetcher of new tasks
+	receiptWakeCh chan bool            // [eth/63] Channel to signal the receipt fetcher of new tasks
+	headerProcCh  chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+
+	// for stateFetcher
+	stateSyncStart chan *stateSync
+	trackStateReq  chan *stateReq
+	stateCh        chan dataPack // [eth/63] Channel receiving inbound node state data
+
+	// Cancellation and termination
+	cancelPeer string         // Identifier of the peer currently being used as the master (cancel on drop)
+	cancelCh   chan struct{}  // Channel to cancel mid-flight syncs
+	cancelLock sync.RWMutex   // Lock to protect the cancel channel and peer in delivers
+	cancelWg   sync.WaitGroup // Make sure all fetcher goroutines have exited.
+
+	quitCh   chan struct{} // Quit channel to signal termination
+	quitLock sync.RWMutex  // Lock to prevent double closes
+
+	// Testing hooks
+	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
+	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
+	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
+	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
+}
+
+// LightChain encapsulates functions required to synchronise a light chain.
+type LightChain interface {
+	// HasHeader verifies a header's presence in the local chain.
+	HasHeader(common.Hash, uint64) bool
+
+	// GetHeaderByHash retrieves a header from the local chain.
+	GetHeaderByHash(common.Hash) *types.Header
+
+	// CurrentHeader retrieves the head header from the local chain.
+	CurrentHeader() *types.Header
+
+	// GetTd returns the total difficulty of a local block.
+	GetTd(common.Hash, uint64) *big.Int
+
+	// InsertHeaderChain inserts a batch of headers into the local chain.
+	InsertHeaderChain([]*types.Header, int) (int, error)
+
+	// Rollback removes a few recently added elements from the local chain.
+	Rollback([]common.Hash)
+}
+
+// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
+type BlockChain interface {
+	LightChain
+
+	// HasBlock verifies a block's presence in the local chain.
+	HasBlock(common.Hash, uint64) bool
+
+	// GetBlockByHash retrieves a block from the local chain.
+	GetBlockByHash(common.Hash) *types.Block
+
+	// CurrentBlock retrieves the head block from the local chain.
+	CurrentBlock() *types.Block
+
+	// CurrentFastBlock retrieves the head fast block from the local chain.
+	CurrentFastBlock() *types.Block
+
+	// FastSyncCommitHead directly commits the head block to a certain entity.
+	FastSyncCommitHead(common.Hash) error
+
+	// InsertChain inserts a batch of blocks into the local chain.
+	InsertChain(types.Blocks) (int, error)
+
+	// InsertReceiptChain inserts a batch of receipts into the local chain.
+	InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
+}
+
+// New creates a new downloader to fetch hashes and blocks from remote peers.
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
+	if lightchain == nil {
+		lightchain = chain
+	}
+
+	dl := &Downloader{
+		mode:           mode,
+		stateDB:        stateDb,
+		mux:            mux,
+		queue:          newQueue(),
+		peers:          newPeerSet(),
+		rttEstimate:    uint64(rttMaxEstimate),
+		rttConfidence:  uint64(1000000),
+		blockchain:     chain,
+		lightchain:     lightchain,
+		dropPeer:       dropPeer,
+		headerCh:       make(chan dataPack, 1),
+		bodyCh:         make(chan dataPack, 1),
+		receiptCh:      make(chan dataPack, 1),
+		bodyWakeCh:     make(chan bool, 1),
+		receiptWakeCh:  make(chan bool, 1),
+		headerProcCh:   make(chan []*types.Header, 1),
+		quitCh:         make(chan struct{}),
+		stateCh:        make(chan dataPack),
+		stateSyncStart: make(chan *stateSync),
+		syncStatsState: stateSyncStats{
+			processed: rawdb.ReadFastTrieProgress(stateDb),
+		},
+		trackStateReq: make(chan *stateReq),
+	}
+	go dl.qosTuner()
+	go dl.stateFetcher()
+	return dl
+}
+
+// Progress retrieves the synchronisation boundaries, specifically the origin
+// block where synchronisation started at (may have failed/suspended); the block
+// or header sync is currently at; and the latest known block which the sync targets.
+//
+// In addition, during the state download phase of fast synchronisation the number
+// of processed and the total number of known states are also returned. Otherwise
+// these are zero.
+func (d *Downloader) Progress() ethereum.SyncProgress {
+	// Lock the current stats and return the progress
+	d.syncStatsLock.RLock()
+	defer d.syncStatsLock.RUnlock()
+
+	current := uint64(0)
+	switch d.mode {
+	case FullSync:
+		current = d.blockchain.CurrentBlock().NumberU64()
+	case FastSync:
+		current = d.blockchain.CurrentFastBlock().NumberU64()
+	case LightSync:
+		current = d.lightchain.CurrentHeader().Number.Uint64()
+	}
+	return ethereum.SyncProgress{
+		StartingBlock: d.syncStatsChainOrigin,
+		CurrentBlock:  current,
+		HighestBlock:  d.syncStatsChainHeight,
+		PulledStates:  d.syncStatsState.processed,
+		KnownStates:   d.syncStatsState.processed + d.syncStatsState.pending,
+	}
+}
+
+// Synchronising returns whether the downloader is currently retrieving blocks.
+func (d *Downloader) Synchronising() bool {
+	return atomic.LoadInt32(&d.synchronising) > 0
+}
+
+// RegisterPeer injects a new download peer into the set of block source to be
+// used for fetching hashes and blocks from.
+func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
+	logger := log.New("peer", id)
+	logger.Trace("Registering sync peer")
+	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
+		logger.Error("Failed to register sync peer", "err", err)
+		return err
+	}
+	d.qosReduceConfidence()
+
+	return nil
+}
+
+// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
+func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
+	return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
+}
+
+// UnregisterPeer remove a peer from the known list, preventing any action from
+// the specified peer. An effort is also made to return any pending fetches into
+// the queue.
+func (d *Downloader) UnregisterPeer(id string) error {
+	// Unregister the peer from the active peer set and revoke any fetch tasks
+	logger := log.New("peer", id)
+	logger.Trace("Unregistering sync peer")
+	if err := d.peers.Unregister(id); err != nil {
+		logger.Error("Failed to unregister sync peer", "err", err)
+		return err
+	}
+	d.queue.Revoke(id)
+
+	// If this peer was the master peer, abort sync immediately
+	d.cancelLock.RLock()
+	master := id == d.cancelPeer
+	d.cancelLock.RUnlock()
+
+	if master {
+		d.cancel()
+	}
+	return nil
+}
+
+// Synchronise tries to sync up our local block chain with a remote peer, both
+// adding various sanity checks as well as wrapping it with various log entries.
+func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
+	err := d.synchronise(id, head, td, mode)
+	switch err {
+	case nil:
+	case errBusy:
+
+	case errTimeout, errBadPeer, errStallingPeer,
+		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
+		errInvalidAncestor, errInvalidChain:
+		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
+		if d.dropPeer == nil {
+			// The dropPeer method is nil when `--copydb` is used for a local copy.
+			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
+		} else {
+			d.dropPeer(id)
+		}
+	default:
+		log.Warn("Synchronisation failed, retrying", "err", err)
+	}
+	return err
+}
+
+// synchronise will select the peer and use it for synchronising. If an empty string is given
+// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
+// checks fail an error will be returned. This method is synchronous
+func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
+	// Mock out the synchronisation if testing
+	if d.synchroniseMock != nil {
+		return d.synchroniseMock(id, hash)
+	}
+	// Make sure only one goroutine is ever allowed past this point at once
+	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
+		return errBusy
+	}
+	defer atomic.StoreInt32(&d.synchronising, 0)
+
+	// Post a user notification of the sync (only once per session)
+	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
+		log.Info("Block synchronisation started")
+	}
+	// Reset the queue, peer set and wake channels to clean any internal leftover state
+	d.queue.Reset()
+	d.peers.Reset()
+
+	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+		select {
+		case <-ch:
+		default:
+		}
+	}
+	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
+		for empty := false; !empty; {
+			select {
+			case <-ch:
+			default:
+				empty = true
+			}
+		}
+	}
+	for empty := false; !empty; {
+		select {
+		case <-d.headerProcCh:
+		default:
+			empty = true
+		}
+	}
+	// Create cancel channel for aborting mid-flight and mark the master peer
+	d.cancelLock.Lock()
+	d.cancelCh = make(chan struct{})
+	d.cancelPeer = id
+	d.cancelLock.Unlock()
+
+	defer d.Cancel() // No matter what, we can't leave the cancel channel open
+
+	// Set the requested sync mode, unless it's forbidden
+	d.mode = mode
+
+	// Retrieve the origin peer and initiate the downloading process
+	p := d.peers.Peer(id)
+	if p == nil {
+		return errUnknownPeer
+	}
+	return d.syncWithPeer(p, hash, td)
+}
+
+// syncWithPeer starts a block synchronization based on the hash chain from the
+// specified peer and head hash.
+func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
+	d.mux.Post(StartEvent{})
+	defer func() {
+		// reset on error
+		if err != nil {
+			d.mux.Post(FailedEvent{err})
+		} else {
+			d.mux.Post(DoneEvent{})
+		}
+	}()
+	if p.version < 62 {
+		return errTooOld
+	}
+
+	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
+	defer func(start time.Time) {
+		log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
+	}(time.Now())
+
+	// Look up the sync boundaries: the common ancestor and the target block
+	latest, err := d.fetchHeight(p)
+	if err != nil {
+		return err
+	}
+	height := latest.Number.Uint64()
+
+	origin, err := d.findAncestor(p, height)
+	if err != nil {
+		return err
+	}
+	d.syncStatsLock.Lock()
+	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+		d.syncStatsChainOrigin = origin
+	}
+	d.syncStatsChainHeight = height
+	d.syncStatsLock.Unlock()
+
+	// Ensure our origin point is below any fast sync pivot point
+	pivot := uint64(0)
+	if d.mode == FastSync {
+		if height <= uint64(fsMinFullBlocks) {
+			origin = 0
+		} else {
+			pivot = height - uint64(fsMinFullBlocks)
+			if pivot <= origin {
+				origin = pivot - 1
+			}
+		}
+	}
+	d.committed = 1
+	if d.mode == FastSync && pivot != 0 {
+		d.committed = 0
+	}
+	// Initiate the sync using a concurrent header and content retrieval algorithm
+	d.queue.Prepare(origin+1, d.mode)
+	if d.syncInitHook != nil {
+		d.syncInitHook(origin, height)
+	}
+
+	fetchers := []func() error{
+		func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
+		func() error { return d.fetchBodies(origin + 1) },          // Bodies are retrieved during normal and fast sync
+		func() error { return d.fetchReceipts(origin + 1) },        // Receipts are retrieved during fast sync
+		func() error { return d.processHeaders(origin+1, pivot, td) },
+	}
+	if d.mode == FastSync {
+		fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
+	} else if d.mode == FullSync {
+		fetchers = append(fetchers, d.processFullSyncContent)
+	}
+	return d.spawnSync(fetchers)
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers []func() error) error {
+	errc := make(chan error, len(fetchers))
+	d.cancelWg.Add(len(fetchers))
+	for _, fn := range fetchers {
+		fn := fn
+		go func() { defer d.cancelWg.Done(); errc <- fn() }()
+	}
+	// Wait for the first error, then terminate the others.
+	var err error
+	for i := 0; i < len(fetchers); i++ {
+		if i == len(fetchers)-1 {
+			// Close the queue when all fetchers have exited.
+			// This will cause the block processor to end when
+			// it has processed the queue.
+			d.queue.Close()
+		}
+		if err = <-errc; err != nil {
+			break
+		}
+	}
+	d.queue.Close()
+	d.Cancel()
+	return err
+}
+
+// cancel aborts all of the operations and resets the queue. However, cancel does
+// not wait for the running download goroutines to finish. This method should be
+// used when cancelling the downloads from inside the downloader.
+func (d *Downloader) cancel() {
+	// Close the current cancel channel
+	d.cancelLock.Lock()
+	if d.cancelCh != nil {
+		select {
+		case <-d.cancelCh:
+			// Channel was already closed
+		default:
+			close(d.cancelCh)
+		}
+	}
+	d.cancelLock.Unlock()
+}
+
+// Cancel aborts all of the operations and waits for all download goroutines to
+// finish before returning.
+func (d *Downloader) Cancel() {
+	d.cancel()
+	d.cancelWg.Wait()
+}
+
+// Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
+func (d *Downloader) Terminate() {
+	// Close the termination channel (make sure double close is allowed)
+	d.quitLock.Lock()
+	select {
+	case <-d.quitCh:
+	default:
+		close(d.quitCh)
+	}
+	d.quitLock.Unlock()
+
+	// Cancel any pending download requests
+	d.Cancel()
+}
+
+// fetchHeight retrieves the head header of the remote peer to aid in estimating
+// the total time a pending synchronisation would take.
+func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
+	p.log.Debug("Retrieving remote chain height")
+
+	// Request the advertised remote head block and wait for the response
+	head, _ := p.peer.Head()
+	go p.peer.RequestHeadersByHash(head, 1, 0, false)
+
+	ttl := d.requestTTL()
+	timeout := time.After(ttl)
+	for {
+		select {
+		case <-d.cancelCh:
+			return nil, errCancelBlockFetch
+
+		case packet := <-d.headerCh:
+			// Discard anything not from the origin peer
+			if packet.PeerId() != p.id {
+				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
+				break
+			}
+			// Make sure the peer actually gave something valid
+			headers := packet.(*headerPack).headers
+			if len(headers) != 1 {
+				p.log.Debug("Multiple headers for single request", "headers", len(headers))
+				return nil, errBadPeer
+			}
+			head := headers[0]
+			p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
+			return head, nil
+
+		case <-timeout:
+			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+			return nil, errTimeout
+
+		case <-d.bodyCh:
+		case <-d.receiptCh:
+			// Out of bounds delivery, ignore
+		}
+	}
+}
+
+// findAncestor tries to locate the common ancestor link of the local chain and
+// a remote peers blockchain. In the general case when our node was in sync and
+// on the correct chain, checking the top N links should already get us a match.
+// In the rare scenario when we ended up on a long reorganisation (i.e. none of
+// the head links match), we do a binary search to find the common ancestor.
+func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
+	// Figure out the valid ancestor range to prevent rewrite attacks
+	floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
+
+	if d.mode == FullSync {
+		ceil = d.blockchain.CurrentBlock().NumberU64()
+	} else if d.mode == FastSync {
+		ceil = d.blockchain.CurrentFastBlock().NumberU64()
+	}
+	if ceil >= MaxForkAncestry {
+		floor = int64(ceil - MaxForkAncestry)
+	}
+	p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
+
+	// Request the topmost blocks to short circuit binary ancestor lookup
+	head := ceil
+	if head > height {
+		head = height
+	}
+	from := int64(head) - int64(MaxHeaderFetch)
+	if from < 0 {
+		from = 0
+	}
+	// Span out with 15 block gaps into the future to catch bad head reports
+	limit := 2 * MaxHeaderFetch / 16
+	count := 1 + int((int64(ceil)-from)/16)
+	if count > limit {
+		count = limit
+	}
+	go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
+
+	// Wait for the remote response to the head fetch
+	number, hash := uint64(0), common.Hash{}
+
+	ttl := d.requestTTL()
+	timeout := time.After(ttl)
+
+	for finished := false; !finished; {
+		select {
+		case <-d.cancelCh:
+			return 0, errCancelHeaderFetch
+
+		case packet := <-d.headerCh:
+			// Discard anything not from the origin peer
+			if packet.PeerId() != p.id {
+				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
+				break
+			}
+			// Make sure the peer actually gave something valid
+			headers := packet.(*headerPack).headers
+			if len(headers) == 0 {
+				p.log.Warn("Empty head header set")
+				return 0, errEmptyHeaderSet
+			}
+			// Make sure the peer's reply conforms to the request
+			for i := 0; i < len(headers); i++ {
+				if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
+					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
+					return 0, errInvalidChain
+				}
+			}
+			// Check if a common ancestor was found
+			finished = true
+			for i := len(headers) - 1; i >= 0; i-- {
+				// Skip any headers that underflow/overflow our requested set
+				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
+					continue
+				}
+				// Otherwise check if we already know the header or not
+				h := headers[i].Hash()
+				n := headers[i].Number.Uint64()
+				if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) {
+					number, hash = n, h
+
+					// If every header is known, even future ones, the peer straight out lied about its head
+					if number > height && i == limit-1 {
+						p.log.Warn("Lied about chain head", "reported", height, "found", number)
+						return 0, errStallingPeer
+					}
+					break
+				}
+			}
+
+		case <-timeout:
+			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+			return 0, errTimeout
+
+		case <-d.bodyCh:
+		case <-d.receiptCh:
+			// Out of bounds delivery, ignore
+		}
+	}
+	// If the head fetch already found an ancestor, return
+	if hash != (common.Hash{}) {
+		if int64(number) <= floor {
+			p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
+			return 0, errInvalidAncestor
+		}
+		p.log.Debug("Found common ancestor", "number", number, "hash", hash)
+		return number, nil
+	}
+	// Ancestor not found, we need to binary search over our chain
+	start, end := uint64(0), head
+	if floor > 0 {
+		start = uint64(floor)
+	}
+	for start+1 < end {
+		// Split our chain interval in two, and request the hash to cross check
+		check := (start + end) / 2
+
+		ttl := d.requestTTL()
+		timeout := time.After(ttl)
+
+		go p.peer.RequestHeadersByNumber(check, 1, 0, false)
+
+		// Wait until a reply arrives to this request
+		for arrived := false; !arrived; {
+			select {
+			case <-d.cancelCh:
+				return 0, errCancelHeaderFetch
+
+			case packer := <-d.headerCh:
+				// Discard anything not from the origin peer
+				if packer.PeerId() != p.id {
+					log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
+					break
+				}
+				// Make sure the peer actually gave something valid
+				headers := packer.(*headerPack).headers
+				if len(headers) != 1 {
+					p.log.Debug("Multiple headers for single request", "headers", len(headers))
+					return 0, errBadPeer
+				}
+				arrived = true
+
+				// Modify the search interval based on the response
+				h := headers[0].Hash()
+				n := headers[0].Number.Uint64()
+				if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) {
+					end = check
+					break
+				}
+				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
+				if header.Number.Uint64() != check {
+					p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
+					return 0, errBadPeer
+				}
+				start = check
+				hash = h
+
+			case <-timeout:
+				p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
+				return 0, errTimeout
+
+			case <-d.bodyCh:
+			case <-d.receiptCh:
+				// Out of bounds delivery, ignore
+			}
+		}
+	}
+	// Ensure valid ancestry and return
+	if int64(start) <= floor {
+		p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
+		return 0, errInvalidAncestor
+	}
+	p.log.Debug("Found common ancestor", "number", start, "hash", hash)
+	return start, nil
+}
+
+// fetchHeaders keeps retrieving headers concurrently from the number
+// requested, until no more are returned, potentially throttling on the way. To
+// facilitate concurrency but still protect against malicious nodes sending bad
+// headers, we construct a header chain skeleton using the "origin" peer we are
+// syncing with, and fill in the missing headers using anyone else. Headers from
+// other peers are only accepted if they map cleanly to the skeleton. If no one
+// can fill in the skeleton - not even the origin peer - it's assumed invalid and
+// the origin is dropped.
+func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
+	p.log.Debug("Directing header downloads", "origin", from)
+	defer p.log.Debug("Header download terminated")
+
+	// Create a timeout timer, and the associated header fetcher
+	skeleton := true            // Skeleton assembly phase or finishing up
+	request := time.Now()       // time of the last skeleton fetch request
+	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
+	<-timeout.C                 // timeout channel should be initially empty
+	defer timeout.Stop()
+
+	var ttl time.Duration
+	getHeaders := func(from uint64) {
+		request = time.Now()
+
+		ttl = d.requestTTL()
+		timeout.Reset(ttl)
+
+		if skeleton {
+			p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
+			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+		} else {
+			p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
+			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
+		}
+	}
+	// Start pulling the header chain skeleton until all is done
+	getHeaders(from)
+
+	for {
+		select {
+		case <-d.cancelCh:
+			return errCancelHeaderFetch
+
+		case packet := <-d.headerCh:
+			// Make sure the active peer is giving us the skeleton headers
+			if packet.PeerId() != p.id {
+				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
+				break
+			}
+			headerReqTimer.UpdateSince(request)
+			timeout.Stop()
+
+			// If the skeleton's finished, pull any remaining head headers directly from the origin
+			if packet.Items() == 0 && skeleton {
+				skeleton = false
+				getHeaders(from)
+				continue
+			}
+			// If no more headers are inbound, notify the content fetchers and return
+			if packet.Items() == 0 {
+				// Don't abort header fetches while the pivot is downloading
+				if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
+					p.log.Debug("No headers, waiting for pivot commit")
+					select {
+					case <-time.After(fsHeaderContCheck):
+						getHeaders(from)
+						continue
+					case <-d.cancelCh:
+						return errCancelHeaderFetch
+					}
+				}
+				// Pivot done (or not in fast sync) and no more headers, terminate the process
+				p.log.Debug("No more headers available")
+				select {
+				case d.headerProcCh <- nil:
+					return nil
+				case <-d.cancelCh:
+					return errCancelHeaderFetch
+				}
+			}
+			headers := packet.(*headerPack).headers
+
+			// If we received a skeleton batch, resolve internals concurrently
+			if skeleton {
+				filled, proced, err := d.fillHeaderSkeleton(from, headers)
+				if err != nil {
+					p.log.Debug("Skeleton chain invalid", "err", err)
+					return errInvalidChain
+				}
+				headers = filled[proced:]
+				from += uint64(proced)
+			} else {
+				// If we're closing in on the chain head, but haven't yet reached it, delay
+				// the last few headers so mini reorgs on the head don't cause invalid hash
+				// chain errors.
+				if n := len(headers); n > 0 {
+					// Retrieve the current head we're at
+					head := uint64(0)
+					if d.mode == LightSync {
+						head = d.lightchain.CurrentHeader().Number.Uint64()
+					} else {
+						head = d.blockchain.CurrentFastBlock().NumberU64()
+						if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
+							head = full
+						}
+					}
+					// If the head is way older than this batch, delay the last few headers
+					if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
+						delay := reorgProtHeaderDelay
+						if delay > n {
+							delay = n
+						}
+						headers = headers[:n-delay]
+					}
+				}
+			}
+			// Insert all the new headers and fetch the next batch
+			if len(headers) > 0 {
+				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
+				select {
+				case d.headerProcCh <- headers:
+				case <-d.cancelCh:
+					return errCancelHeaderFetch
+				}
+				from += uint64(len(headers))
+				getHeaders(from)
+			} else {
+				// No headers delivered, or all of them being delayed, sleep a bit and retry
+				p.log.Trace("All headers delayed, waiting")
+				select {
+				case <-time.After(fsHeaderContCheck):
+					getHeaders(from)
+					continue
+				case <-d.cancelCh:
+					return errCancelHeaderFetch
+				}
+			}
+
+		case <-timeout.C:
+			if d.dropPeer == nil {
+				// The dropPeer method is nil when `--copydb` is used for a local copy.
+				// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+				p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id)
+				break
+			}
+			// Header retrieval timed out, consider the peer bad and drop
+			p.log.Debug("Header request timed out", "elapsed", ttl)
+			headerTimeoutMeter.Mark(1)
+			d.dropPeer(p.id)
+
+			// Finish the sync gracefully instead of dumping the gathered data though
+			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+				select {
+				case ch <- false:
+				case <-d.cancelCh:
+				}
+			}
+			select {
+			case d.headerProcCh <- nil:
+			case <-d.cancelCh:
+			}
+			return errBadPeer
+		}
+	}
+}
+
+// fillHeaderSkeleton concurrently retrieves headers from all our available peers
+// and maps them to the provided skeleton header chain.
+//
+// Any partial results from the beginning of the skeleton is (if possible) forwarded
+// immediately to the header processor to keep the rest of the pipeline full even
+// in the case of header stalls.
+//
+// The method returns the entire filled skeleton and also the number of headers
+// already forwarded for processing.
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
+	log.Debug("Filling up skeleton", "from", from)
+	d.queue.ScheduleSkeleton(from, skeleton)
+
+	var (
+		deliver = func(packet dataPack) (int, error) {
+			pack := packet.(*headerPack)
+			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
+		}
+		expire   = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
+		throttle = func() bool { return false }
+		reserve  = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
+			return d.queue.ReserveHeaders(p, count), false, nil
+		}
+		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
+		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
+		setIdle  = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
+	)
+	err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
+		d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
+		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
+
+	log.Debug("Skeleton fill terminated", "err", err)
+
+	filled, proced := d.queue.RetrieveHeaders()
+	return filled, proced, err
+}
+
+// fetchBodies iteratively downloads the scheduled block bodies, taking any
+// available peers, reserving a chunk of blocks for each, waiting for delivery
+// and also periodically checking for timeouts.
+func (d *Downloader) fetchBodies(from uint64) error {
+	log.Debug("Downloading block bodies", "origin", from)
+
+	var (
+		deliver = func(packet dataPack) (int, error) {
+			pack := packet.(*bodyPack)
+			return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
+		}
+		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
+		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
+		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
+		setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
+	)
+	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
+		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
+		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
+
+	log.Debug("Block body download terminated", "err", err)
+	return err
+}
+
+// fetchReceipts iteratively downloads the scheduled block receipts, taking any
+// available peers, reserving a chunk of receipts for each, waiting for delivery
+// and also periodically checking for timeouts.
+func (d *Downloader) fetchReceipts(from uint64) error {
+	log.Debug("Downloading transaction receipts", "origin", from)
+
+	var (
+		deliver = func(packet dataPack) (int, error) {
+			pack := packet.(*receiptPack)
+			return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
+		}
+		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
+		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
+		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
+		setIdle  = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
+	)
+	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
+		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
+		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
+
+	log.Debug("Transaction receipt download terminated", "err", err)
+	return err
+}
+
+// fetchParts iteratively downloads scheduled block parts, taking any available
+// peers, reserving a chunk of fetch requests for each, waiting for delivery and
+// also periodically checking for timeouts.
+//
+// As the scheduling/timeout logic mostly is the same for all downloaded data
+// types, this method is used by each for data gathering and is instrumented with
+// various callbacks to handle the slight differences between processing them.
+//
+// The instrumentation parameters:
+//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
+//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
+//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
+//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
+//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
+//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
+//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
+//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
+//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
+//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
+//  - fetch:       network callback to actually send a particular download request to a physical remote peer
+//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
+//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
+//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
+//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
+//  - kind:        textual label of the type being downloaded to display in log mesages
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+	expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
+	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
+	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
+
+	// Create a ticker to detect expired retrieval tasks
+	ticker := time.NewTicker(100 * time.Millisecond)
+	defer ticker.Stop()
+
+	update := make(chan struct{}, 1)
+
+	// Prepare the queue and fetch block parts until the block header fetcher's done
+	finished := false
+	for {
+		select {
+		case <-d.cancelCh:
+			return errCancel
+
+		case packet := <-deliveryCh:
+			// If the peer was previously banned and failed to deliver its pack
+			// in a reasonable time frame, ignore its message.
+			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
+				// Deliver the received chunk of data and check chain validity
+				accepted, err := deliver(packet)
+				if err == errInvalidChain {
+					return err
+				}
+				// Unless a peer delivered something completely else than requested (usually
+				// caused by a timed out request which came through in the end), set it to
+				// idle. If the delivery's stale, the peer should have already been idled.
+				if err != errStaleDelivery {
+					setIdle(peer, accepted)
+				}
+				// Issue a log to the user to see what's going on
+				switch {
+				case err == nil && packet.Items() == 0:
+					peer.log.Trace("Requested data not delivered", "type", kind)
+				case err == nil:
+					peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
+				default:
+					peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
+				}
+			}
+			// Blocks assembled, try to update the progress
+			select {
+			case update <- struct{}{}:
+			default:
+			}
+
+		case cont := <-wakeCh:
+			// The header fetcher sent a continuation flag, check if it's done
+			if !cont {
+				finished = true
+			}
+			// Headers arrive, try to update the progress
+			select {
+			case update <- struct{}{}:
+			default:
+			}
+
+		case <-ticker.C:
+			// Sanity check update the progress
+			select {
+			case update <- struct{}{}:
+			default:
+			}
+
+		case <-update:
+			// Short circuit if we lost all our peers
+			if d.peers.Len() == 0 {
+				return errNoPeers
+			}
+			// Check for fetch request timeouts and demote the responsible peers
+			for pid, fails := range expire() {
+				if peer := d.peers.Peer(pid); peer != nil {
+					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
+					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
+					// out that sync wise we need to get rid of the peer.
+					//
+					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
+					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
+					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
+					if fails > 2 {
+						peer.log.Trace("Data delivery timed out", "type", kind)
+						setIdle(peer, 0)
+					} else {
+						peer.log.Debug("Stalling delivery, dropping", "type", kind)
+						if d.dropPeer == nil {
+							// The dropPeer method is nil when `--copydb` is used for a local copy.
+							// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+							peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
+						} else {
+							d.dropPeer(pid)
+						}
+					}
+				}
+			}
+			// If there's nothing more to fetch, wait or terminate
+			if pending() == 0 {
+				if !inFlight() && finished {
+					log.Debug("Data fetching completed", "type", kind)
+					return nil
+				}
+				break
+			}
+			// Send a download request to all idle peers, until throttled
+			progressed, throttled, running := false, false, inFlight()
+			idles, total := idle()
+
+			for _, peer := range idles {
+				// Short circuit if throttling activated
+				if throttle() {
+					throttled = true
+					break
+				}
+				// Short circuit if there is no more available task.
+				if pending() == 0 {
+					break
+				}
+				// Reserve a chunk of fetches for a peer. A nil can mean either that
+				// no more headers are available, or that the peer is known not to
+				// have them.
+				request, progress, err := reserve(peer, capacity(peer))
+				if err != nil {
+					return err
+				}
+				if progress {
+					progressed = true
+				}
+				if request == nil {
+					continue
+				}
+				if request.From > 0 {
+					peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
+				} else {
+					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
+				}
+				// Fetch the chunk and make sure any errors return the hashes to the queue
+				if fetchHook != nil {
+					fetchHook(request.Headers)
+				}
+				if err := fetch(peer, request); err != nil {
+					// Although we could try and make an attempt to fix this, this error really
+					// means that we've double allocated a fetch task to a peer. If that is the
+					// case, the internal state of the downloader and the queue is very wrong so
+					// better hard crash and note the error instead of silently accumulating into
+					// a much bigger issue.
+					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
+				}
+				running = true
+			}
+			// Make sure that we have peers available for fetching. If all peers have been tried
+			// and all failed throw an error
+			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
+				return errPeersUnavailable
+			}
+		}
+	}
+}
+
+// processHeaders takes batches of retrieved headers from an input channel and
+// keeps processing and scheduling them into the header chain and downloader's
+// queue until the stream ends or a failure occurs.
+func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
+	// Keep a count of uncertain headers to roll back
+	rollback := []*types.Header{}
+	defer func() {
+		if len(rollback) > 0 {
+			// Flatten the headers and roll them back
+			hashes := make([]common.Hash, len(rollback))
+			for i, header := range rollback {
+				hashes[i] = header.Hash()
+			}
+			lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
+			if d.mode != LightSync {
+				lastFastBlock = d.blockchain.CurrentFastBlock().Number()
+				lastBlock = d.blockchain.CurrentBlock().Number()
+			}
+			d.lightchain.Rollback(hashes)
+			curFastBlock, curBlock := common.Big0, common.Big0
+			if d.mode != LightSync {
+				curFastBlock = d.blockchain.CurrentFastBlock().Number()
+				curBlock = d.blockchain.CurrentBlock().Number()
+			}
+			log.Warn("Rolled back headers", "count", len(hashes),
+				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
+				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
+				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
+		}
+	}()
+
+	// Wait for batches of headers to process
+	gotHeaders := false
+
+	for {
+		select {
+		case <-d.cancelCh:
+			return errCancelHeaderProcessing
+
+		case headers := <-d.headerProcCh:
+			// Terminate header processing if we synced up
+			if len(headers) == 0 {
+				// Notify everyone that headers are fully processed
+				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+					select {
+					case ch <- false:
+					case <-d.cancelCh:
+					}
+				}
+				// If no headers were retrieved at all, the peer violated its TD promise that it had a
+				// better chain compared to ours. The only exception is if its promised blocks were
+				// already imported by other means (e.g. fecher):
+				//
+				// R <remote peer>, L <local node>: Both at block 10
+				// R: Mine block 11, and propagate it to L
+				// L: Queue block 11 for import
+				// L: Notice that R's head and TD increased compared to ours, start sync
+				// L: Import of block 11 finishes
+				// L: Sync begins, and finds common ancestor at 11
+				// L: Request new headers up from 11 (R's TD was higher, it must have something)
+				// R: Nothing to give
+				if d.mode != LightSync {
+					head := d.blockchain.CurrentBlock()
+					if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
+						return errStallingPeer
+					}
+				}
+				// If fast or light syncing, ensure promised headers are indeed delivered. This is
+				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
+				// of delivering the post-pivot blocks that would flag the invalid content.
+				//
+				// This check cannot be executed "as is" for full imports, since blocks may still be
+				// queued for processing when the header download completes. However, as long as the
+				// peer gave us something useful, we're already happy/progressed (above check).
+				if d.mode == FastSync || d.mode == LightSync {
+					head := d.lightchain.CurrentHeader()
+					if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
+						return errStallingPeer
+					}
+				}
+				// Disable any rollback and return
+				rollback = nil
+				return nil
+			}
+			// Otherwise split the chunk of headers into batches and process them
+			gotHeaders = true
+
+			for len(headers) > 0 {
+				// Terminate if something failed in between processing chunks
+				select {
+				case <-d.cancelCh:
+					return errCancelHeaderProcessing
+				default:
+				}
+				// Select the next chunk of headers to import
+				limit := maxHeadersProcess
+				if limit > len(headers) {
+					limit = len(headers)
+				}
+				chunk := headers[:limit]
+
+				// In case of header only syncing, validate the chunk immediately
+				if d.mode == FastSync || d.mode == LightSync {
+					// Collect the yet unknown headers to mark them as uncertain
+					unknown := make([]*types.Header, 0, len(headers))
+					for _, header := range chunk {
+						if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
+							unknown = append(unknown, header)
+						}
+					}
+					// If we're importing pure headers, verify based on their recentness
+					frequency := fsHeaderCheckFrequency
+					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
+						frequency = 1
+					}
+					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
+						// If some headers were inserted, add them too to the rollback list
+						if n > 0 {
+							rollback = append(rollback, chunk[:n]...)
+						}
+						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
+						return errInvalidChain
+					}
+					// All verifications passed, store newly found uncertain headers
+					rollback = append(rollback, unknown...)
+					if len(rollback) > fsHeaderSafetyNet {
+						rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
+					}
+				}
+				// Unless we're doing light chains, schedule the headers for associated content retrieval
+				if d.mode == FullSync || d.mode == FastSync {
+					// If we've reached the allowed number of pending headers, stall a bit
+					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
+						select {
+						case <-d.cancelCh:
+							return errCancelHeaderProcessing
+						case <-time.After(time.Second):
+						}
+					}
+					// Otherwise insert the headers for content retrieval
+					inserts := d.queue.Schedule(chunk, origin)
+					if len(inserts) != len(chunk) {
+						log.Debug("Stale headers")
+						return errBadPeer
+					}
+				}
+				headers = headers[limit:]
+				origin += uint64(limit)
+			}
+
+			// Update the highest block number we know if a higher one is found.
+			d.syncStatsLock.Lock()
+			if d.syncStatsChainHeight < origin {
+				d.syncStatsChainHeight = origin - 1
+			}
+			d.syncStatsLock.Unlock()
+
+			// Signal the content downloaders of the availablility of new tasks
+			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+				select {
+				case ch <- true:
+				default:
+				}
+			}
+		}
+	}
+}
+
+// processFullSyncContent takes fetch results from the queue and imports them into the chain.
+func (d *Downloader) processFullSyncContent() error {
+	for {
+		results := d.queue.Results(true)
+		if len(results) == 0 {
+			return nil
+		}
+		if d.chainInsertHook != nil {
+			d.chainInsertHook(results)
+		}
+		if err := d.importBlockResults(results); err != nil {
+			return err
+		}
+	}
+}
+
+func (d *Downloader) importBlockResults(results []*fetchResult) error {
+	// Check for any early termination requests
+	if len(results) == 0 {
+		return nil
+	}
+	select {
+	case <-d.quitCh:
+		return errCancelContentProcessing
+	default:
+	}
+	// Retrieve the a batch of results to import
+	first, last := results[0].Header, results[len(results)-1].Header
+	log.Debug("Inserting downloaded chain", "items", len(results),
+		"firstnum", first.Number, "firsthash", first.Hash(),
+		"lastnum", last.Number, "lasthash", last.Hash(),
+	)
+	blocks := make([]*types.Block, len(results))
+	for i, result := range results {
+		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+	}
+	if index, err := d.blockchain.InsertChain(blocks); err != nil {
+		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+		return errInvalidChain
+	}
+	return nil
+}
+
+// processFastSyncContent takes fetch results from the queue and writes them to the
+// database. It also controls the synchronisation of state nodes of the pivot block.
+func (d *Downloader) processFastSyncContent(latest *types.Header) error {
+	// Start syncing state of the reported head block. This should get us most of
+	// the state of the pivot block.
+	stateSync := d.syncState(latest.Root)
+	defer stateSync.Cancel()
+	go func() {
+		if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
+			d.queue.Close() // wake up WaitResults
+		}
+	}()
+	// Figure out the ideal pivot block. Note, that this goalpost may move if the
+	// sync takes long enough for the chain head to move significantly.
+	pivot := uint64(0)
+	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
+		pivot = height - uint64(fsMinFullBlocks)
+	}
+	// To cater for moving pivot points, track the pivot block and subsequently
+	// accumulated download results separately.
+	var (
+		oldPivot *fetchResult   // Locked in pivot block, might change eventually
+		oldTail  []*fetchResult // Downloaded content after the pivot
+	)
+	for {
+		// Wait for the next batch of downloaded data to be available, and if the pivot
+		// block became stale, move the goalpost
+		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
+		if len(results) == 0 {
+			// If pivot sync is done, stop
+			if oldPivot == nil {
+				return stateSync.Cancel()
+			}
+			// If sync failed, stop
+			select {
+			case <-d.cancelCh:
+				return stateSync.Cancel()
+			default:
+			}
+		}
+		if d.chainInsertHook != nil {
+			d.chainInsertHook(results)
+		}
+		if oldPivot != nil {
+			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
+		}
+		// Split around the pivot block and process the two sides via fast/full sync
+		if atomic.LoadInt32(&d.committed) == 0 {
+			latest = results[len(results)-1].Header
+			if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
+				log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
+				pivot = height - uint64(fsMinFullBlocks)
+			}
+		}
+		P, beforeP, afterP := splitAroundPivot(pivot, results)
+		if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
+			return err
+		}
+		if P != nil {
+			// If new pivot block found, cancel old state retrieval and restart
+			if oldPivot != P {
+				stateSync.Cancel()
+
+				stateSync = d.syncState(P.Header.Root)
+				defer stateSync.Cancel()
+				go func() {
+					if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
+						d.queue.Close() // wake up WaitResults
+					}
+				}()
+				oldPivot = P
+			}
+			// Wait for completion, occasionally checking for pivot staleness
+			select {
+			case <-stateSync.done:
+				if stateSync.err != nil {
+					return stateSync.err
+				}
+				if err := d.commitPivotBlock(P); err != nil {
+					return err
+				}
+				oldPivot = nil
+
+			case <-time.After(time.Second):
+				oldTail = afterP
+				continue
+			}
+		}
+		// Fast sync done, pivot commit done, full import
+		if err := d.importBlockResults(afterP); err != nil {
+			return err
+		}
+	}
+}
+
+func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+	for _, result := range results {
+		num := result.Header.Number.Uint64()
+		switch {
+		case num < pivot:
+			before = append(before, result)
+		case num == pivot:
+			p = result
+		default:
+			after = append(after, result)
+		}
+	}
+	return p, before, after
+}
+
+func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
+	// Check for any early termination requests
+	if len(results) == 0 {
+		return nil
+	}
+	select {
+	case <-d.quitCh:
+		return errCancelContentProcessing
+	case <-stateSync.done:
+		if err := stateSync.Wait(); err != nil {
+			return err
+		}
+	default:
+	}
+	// Retrieve the a batch of results to import
+	first, last := results[0].Header, results[len(results)-1].Header
+	log.Debug("Inserting fast-sync blocks", "items", len(results),
+		"firstnum", first.Number, "firsthash", first.Hash(),
+		"lastnumn", last.Number, "lasthash", last.Hash(),
+	)
+	blocks := make([]*types.Block, len(results))
+	receipts := make([]types.Receipts, len(results))
+	for i, result := range results {
+		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+		receipts[i] = result.Receipts
+	}
+	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+		return errInvalidChain
+	}
+	return nil
+}
+
+func (d *Downloader) commitPivotBlock(result *fetchResult) error {
+	block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+	log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
+	if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
+		return err
+	}
+	if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
+		return err
+	}
+	atomic.StoreInt32(&d.committed, 1)
+	return nil
+}
+
+// DeliverHeaders injects a new batch of block headers received from a remote
+// node into the download schedule.
+func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
+	return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
+}
+
+// DeliverBodies injects a new batch of block bodies received from a remote node.
+func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
+	return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
+}
+
+// DeliverReceipts injects a new batch of receipts received from a remote node.
+func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
+	return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
+}
+
+// DeliverNodeData injects a new batch of node state data received from a remote node.
+func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
+	return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
+}
+
+// deliver injects a new batch of data received from a remote node.
+func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
+	// Update the delivery metrics for both good and failed deliveries
+	inMeter.Mark(int64(packet.Items()))
+	defer func() {
+		if err != nil {
+			dropMeter.Mark(int64(packet.Items()))
+		}
+	}()
+	// Deliver or abort if the sync is canceled while queuing
+	d.cancelLock.RLock()
+	cancel := d.cancelCh
+	d.cancelLock.RUnlock()
+	if cancel == nil {
+		return errNoSyncActive
+	}
+	select {
+	case destCh <- packet:
+		return nil
+	case <-cancel:
+		return errNoSyncActive
+	}
+}
+
+// qosTuner is the quality of service tuning loop that occasionally gathers the
+// peer latency statistics and updates the estimated request round trip time.
+func (d *Downloader) qosTuner() {
+	for {
+		// Retrieve the current median RTT and integrate into the previoust target RTT
+		rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
+		atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
+
+		// A new RTT cycle passed, increase our confidence in the estimated RTT
+		conf := atomic.LoadUint64(&d.rttConfidence)
+		conf = conf + (1000000-conf)/2
+		atomic.StoreUint64(&d.rttConfidence, conf)
+
+		// Log the new QoS values and sleep until the next RTT
+		log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
+		select {
+		case <-d.quitCh:
+			return
+		case <-time.After(rtt):
+		}
+	}
+}
+
+// qosReduceConfidence is meant to be called when a new peer joins the downloader's
+// peer set, needing to reduce the confidence we have in out QoS estimates.
+func (d *Downloader) qosReduceConfidence() {
+	// If we have a single peer, confidence is always 1
+	peers := uint64(d.peers.Len())
+	if peers == 0 {
+		// Ensure peer connectivity races don't catch us off guard
+		return
+	}
+	if peers == 1 {
+		atomic.StoreUint64(&d.rttConfidence, 1000000)
+		return
+	}
+	// If we have a ton of peers, don't drop confidence)
+	if peers >= uint64(qosConfidenceCap) {
+		return
+	}
+	// Otherwise drop the confidence factor
+	conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
+	if float64(conf)/1000000 < rttMinConfidence {
+		conf = uint64(rttMinConfidence * 1000000)
+	}
+	atomic.StoreUint64(&d.rttConfidence, conf)
+
+	rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
+	log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
+}
+
+// requestRTT returns the current target round trip time for a download request
+// to complete in.
+//
+// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
+// the downloader tries to adapt queries to the RTT, so multiple RTT values can
+// be adapted to, but smaller ones are preferred (stabler download stream).
+func (d *Downloader) requestRTT() time.Duration {
+	return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
+}
+
+// requestTTL returns the current timeout allowance for a single download request
+// to finish under.
+func (d *Downloader) requestTTL() time.Duration {
+	var (
+		rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
+		conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
+	)
+	ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
+	if ttl > ttlLimit {
+		ttl = ttlLimit
+	}
+	return ttl
+}
diff --git a/dex/downloader/downloader_test.go b/dex/downloader/downloader_test.go
new file mode 100644
index 000000000..093b751ca
--- /dev/null
+++ b/dex/downloader/downloader_test.go
@@ -0,0 +1,1481 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"errors"
+	"fmt"
+	"math/big"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	ethereum "github.com/dexon-foundation/dexon"
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/ethdb"
+	"github.com/dexon-foundation/dexon/event"
+	"github.com/dexon-foundation/dexon/trie"
+)
+
+// Reduce some of the parameters to make the tester faster.
+func init() {
+	MaxForkAncestry = uint64(10000)
+	blockCacheItems = 1024
+	fsHeaderContCheck = 500 * time.Millisecond
+}
+
+// downloadTester is a test simulator for mocking out local block chain.
+type downloadTester struct {
+	downloader *Downloader
+
+	genesis *types.Block   // Genesis blocks used by the tester and peers
+	stateDb ethdb.Database // Database used by the tester for syncing from peers
+	peerDb  ethdb.Database // Database of the peers containing all data
+	peers   map[string]*downloadTesterPeer
+
+	ownHashes   []common.Hash                  // Hash chain belonging to the tester
+	ownHeaders  map[common.Hash]*types.Header  // Headers belonging to the tester
+	ownBlocks   map[common.Hash]*types.Block   // Blocks belonging to the tester
+	ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
+	ownChainTd  map[common.Hash]*big.Int       // Total difficulties of the blocks in the local chain
+
+	lock sync.RWMutex
+}
+
+// newTester creates a new downloader test mocker.
+func newTester() *downloadTester {
+	tester := &downloadTester{
+		genesis:     testGenesis,
+		peerDb:      testDB,
+		peers:       make(map[string]*downloadTesterPeer),
+		ownHashes:   []common.Hash{testGenesis.Hash()},
+		ownHeaders:  map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
+		ownBlocks:   map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
+		ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
+		ownChainTd:  map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
+	}
+	tester.stateDb = ethdb.NewMemDatabase()
+	tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
+	tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
+	return tester
+}
+
+// terminate aborts any operations on the embedded downloader and releases all
+// held resources.
+func (dl *downloadTester) terminate() {
+	dl.downloader.Terminate()
+}
+
+// sync starts synchronizing with a remote peer, blocking until it completes.
+func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
+	dl.lock.RLock()
+	hash := dl.peers[id].chain.headBlock().Hash()
+	// If no particular TD was requested, load from the peer's blockchain
+	if td == nil {
+		td = dl.peers[id].chain.td(hash)
+	}
+	dl.lock.RUnlock()
+
+	// Synchronise with the chosen peer and ensure proper cleanup afterwards
+	err := dl.downloader.synchronise(id, hash, td, mode)
+	select {
+	case <-dl.downloader.cancelCh:
+		// Ok, downloader fully cancelled after sync cycle
+	default:
+		// Downloader is still accepting packets, can block a peer up
+		panic("downloader active post sync cycle") // panic will be caught by tester
+	}
+	return err
+}
+
+// HasHeader checks if a header is present in the testers canonical chain.
+func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
+	return dl.GetHeaderByHash(hash) != nil
+}
+
+// HasBlock checks if a block is present in the testers canonical chain.
+func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
+	return dl.GetBlockByHash(hash) != nil
+}
+
+// GetHeader retrieves a header from the testers canonical chain.
+func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
+	dl.lock.RLock()
+	defer dl.lock.RUnlock()
+
+	return dl.ownHeaders[hash]
+}
+
+// GetBlock retrieves a block from the testers canonical chain.
+func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
+	dl.lock.RLock()
+	defer dl.lock.RUnlock()
+
+	return dl.ownBlocks[hash]
+}
+
+// CurrentHeader retrieves the current head header from the canonical chain.
+func (dl *downloadTester) CurrentHeader() *types.Header {
+	dl.lock.RLock()
+	defer dl.lock.RUnlock()
+
+	for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+		if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
+			return header
+		}
+	}
+	return dl.genesis.Header()
+}
+
+// CurrentBlock retrieves the current head block from the canonical chain.
+func (dl *downloadTester) CurrentBlock() *types.Block {
+	dl.lock.RLock()
+	defer dl.lock.RUnlock()
+
+	for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+		if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
+			if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
+				return block
+			}
+		}
+	}
+	return dl.genesis
+}
+
+// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
+func (dl *downloadTester) CurrentFastBlock() *types.Block {
+	dl.lock.RLock()
+	defer dl.lock.RUnlock()
+
+	for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+		if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
+			return block
+		}
+	}
+	return dl.genesis
+}
+
+// FastSyncCommitHead manually sets the head block to a given hash.
+func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
+	// For now only check that the state trie is correct
+	if block := dl.GetBlockByHash(hash); block != nil {
+		_, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb), 0)
+		return err
+	}
+	return fmt.Errorf("non existent block: %x", hash[:4])
+}
+
+// GetTd retrieves the block's total difficulty from the canonical chain.
+func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
+	dl.lock.RLock()
+	defer dl.lock.RUnlock()
+
+	return dl.ownChainTd[hash]
+}
+
+// InsertHeaderChain injects a new batch of headers into the simulated chain.
+func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
+	dl.lock.Lock()
+	defer dl.lock.Unlock()
+
+	// Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
+	if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
+		return 0, errors.New("unknown parent")
+	}
+	for i := 1; i < len(headers); i++ {
+		if headers[i].ParentHash != headers[i-1].Hash() {
+			return i, errors.New("unknown parent")
+		}
+	}
+	// Do a full insert if pre-checks passed
+	for i, header := range headers {
+		if _, ok := dl.ownHeaders[header.Hash()]; ok {
+			continue
+		}
+		if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
+			return i, errors.New("unknown parent")
+		}
+		dl.ownHashes = append(dl.ownHashes, header.Hash())
+		dl.ownHeaders[header.Hash()] = header
+		dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
+	}
+	return len(headers), nil
+}
+
+// InsertChain injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
+	dl.lock.Lock()
+	defer dl.lock.Unlock()
+
+	for i, block := range blocks {
+		if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
+			return i, errors.New("unknown parent")
+		} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
+			return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
+		}
+		if _, ok := dl.ownHeaders[block.Hash()]; !ok {
+			dl.ownHashes = append(dl.ownHashes, block.Hash())
+			dl.ownHeaders[block.Hash()] = block.Header()
+		}
+		dl.ownBlocks[block.Hash()] = block
+		dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
+		dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty())
+	}
+	return len(blocks), nil
+}
+
+// InsertReceiptChain injects a new batch of receipts into the simulated chain.
+func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) {
+	dl.lock.Lock()
+	defer dl.lock.Unlock()
+
+	for i := 0; i < len(blocks) && i < len(receipts); i++ {
+		if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
+			return i, errors.New("unknown owner")
+		}
+		if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
+			return i, errors.New("unknown parent")
+		}
+		dl.ownBlocks[blocks[i].Hash()] = blocks[i]
+		dl.ownReceipts[blocks[i].Hash()] = receipts[i]
+	}
+	return len(blocks), nil
+}
+
+// Rollback removes some recently added elements from the chain.
+func (dl *downloadTester) Rollback(hashes []common.Hash) {
+	dl.lock.Lock()
+	defer dl.lock.Unlock()
+
+	for i := len(hashes) - 1; i >= 0; i-- {
+		if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] {
+			dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1]
+		}
+		delete(dl.ownChainTd, hashes[i])
+		delete(dl.ownHeaders, hashes[i])
+		delete(dl.ownReceipts, hashes[i])
+		delete(dl.ownBlocks, hashes[i])
+	}
+}
+
+// newPeer registers a new block download source into the downloader.
+func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error {
+	dl.lock.Lock()
+	defer dl.lock.Unlock()
+
+	peer := &downloadTesterPeer{dl: dl, id: id, chain: chain}
+	dl.peers[id] = peer
+	return dl.downloader.RegisterPeer(id, version, peer)
+}
+
+// dropPeer simulates a hard peer removal from the connection pool.
+func (dl *downloadTester) dropPeer(id string) {
+	dl.lock.Lock()
+	defer dl.lock.Unlock()
+
+	delete(dl.peers, id)
+	dl.downloader.UnregisterPeer(id)
+}
+
+type downloadTesterPeer struct {
+	dl            *downloadTester
+	id            string
+	lock          sync.RWMutex
+	chain         *testChain
+	missingStates map[common.Hash]bool // State entries that fast sync should not return
+}
+
+// Head constructs a function to retrieve a peer's current head hash
+// and total difficulty.
+func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
+	b := dlp.chain.headBlock()
+	return b.Hash(), dlp.chain.td(b.Hash())
+}
+
+// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
+// origin; associated with a particular peer in the download tester. The returned
+// function can be used to retrieve batches of headers from the particular peer.
+func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
+	if reverse {
+		panic("reverse header requests not supported")
+	}
+
+	result := dlp.chain.headersByHash(origin, amount, skip)
+	go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
+	return nil
+}
+
+// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
+// origin; associated with a particular peer in the download tester. The returned
+// function can be used to retrieve batches of headers from the particular peer.
+func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
+	if reverse {
+		panic("reverse header requests not supported")
+	}
+
+	result := dlp.chain.headersByNumber(origin, amount, skip)
+	go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
+	return nil
+}
+
+// RequestBodies constructs a getBlockBodies method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of block bodies from the particularly requested peer.
+func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
+	txs, uncles := dlp.chain.bodies(hashes)
+	go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles)
+	return nil
+}
+
+// RequestReceipts constructs a getReceipts method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of block receipts from the particularly requested peer.
+func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
+	receipts := dlp.chain.receipts(hashes)
+	go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts)
+	return nil
+}
+
+// RequestNodeData constructs a getNodeData method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of node state data from the particularly requested peer.
+func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
+	dlp.dl.lock.RLock()
+	defer dlp.dl.lock.RUnlock()
+
+	results := make([][]byte, 0, len(hashes))
+	for _, hash := range hashes {
+		if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
+			if !dlp.missingStates[hash] {
+				results = append(results, data)
+			}
+		}
+	}
+	go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
+	return nil
+}
+
+// assertOwnChain checks if the local chain contains the correct number of items
+// of the various chain components.
+func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
+	assertOwnForkedChain(t, tester, 1, []int{length})
+}
+
+// assertOwnForkedChain checks if the local forked chain contains the correct
+// number of items of the various chain components.
+func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
+	// Initialize the counters for the first fork
+	headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks
+
+	if receipts < 0 {
+		receipts = 1
+	}
+	// Update the counters for each subsequent fork
+	for _, length := range lengths[1:] {
+		headers += length - common
+		blocks += length - common
+		receipts += length - common - fsMinFullBlocks
+	}
+	switch tester.downloader.mode {
+	case FullSync:
+		receipts = 1
+	case LightSync:
+		blocks, receipts = 1, 1
+	}
+	if hs := len(tester.ownHeaders); hs != headers {
+		t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
+	}
+	if bs := len(tester.ownBlocks); bs != blocks {
+		t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
+	}
+	if rs := len(tester.ownReceipts); rs != receipts {
+		t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
+	}
+}
+
+// Tests that simple synchronization against a canonical chain works correctly.
+// In this test common ancestor lookup should be short circuited and not require
+// binary searching.
+func TestCanonicalSynchronisation62(t *testing.T)      { testCanonicalSynchronisation(t, 62, FullSync) }
+func TestCanonicalSynchronisation63Full(t *testing.T)  { testCanonicalSynchronisation(t, 63, FullSync) }
+func TestCanonicalSynchronisation63Fast(t *testing.T)  { testCanonicalSynchronisation(t, 63, FastSync) }
+func TestCanonicalSynchronisation64Full(t *testing.T)  { testCanonicalSynchronisation(t, 64, FullSync) }
+func TestCanonicalSynchronisation64Fast(t *testing.T)  { testCanonicalSynchronisation(t, 64, FastSync) }
+func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
+
+func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create a small enough block chain to download
+	chain := testChainBase.shorten(blockCacheItems - 15)
+	tester.newPeer("peer", protocol, chain)
+
+	// Synchronise with the peer and make sure all relevant data was retrieved
+	if err := tester.sync("peer", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that if a large batch of blocks are being downloaded, it is throttled
+// until the cached blocks are retrieved.
+func TestThrottling62(t *testing.T)     { testThrottling(t, 62, FullSync) }
+func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
+func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
+func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
+func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
+
+func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create a long block chain to download and the tester
+	targetBlocks := testChainBase.len() - 1
+	tester.newPeer("peer", protocol, testChainBase)
+
+	// Wrap the importer to allow stepping
+	blocked, proceed := uint32(0), make(chan struct{})
+	tester.downloader.chainInsertHook = func(results []*fetchResult) {
+		atomic.StoreUint32(&blocked, uint32(len(results)))
+		<-proceed
+	}
+	// Start a synchronisation concurrently
+	errc := make(chan error)
+	go func() {
+		errc <- tester.sync("peer", nil, mode)
+	}()
+	// Iteratively take some blocks, always checking the retrieval count
+	for {
+		// Check the retrieval count synchronously (! reason for this ugly block)
+		tester.lock.RLock()
+		retrieved := len(tester.ownBlocks)
+		tester.lock.RUnlock()
+		if retrieved >= targetBlocks+1 {
+			break
+		}
+		// Wait a bit for sync to throttle itself
+		var cached, frozen int
+		for start := time.Now(); time.Since(start) < 3*time.Second; {
+			time.Sleep(25 * time.Millisecond)
+
+			tester.lock.Lock()
+			tester.downloader.queue.lock.Lock()
+			cached = len(tester.downloader.queue.blockDonePool)
+			if mode == FastSync {
+				if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
+					cached = receipts
+				}
+			}
+			frozen = int(atomic.LoadUint32(&blocked))
+			retrieved = len(tester.ownBlocks)
+			tester.downloader.queue.lock.Unlock()
+			tester.lock.Unlock()
+
+			if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
+				break
+			}
+		}
+		// Make sure we filled up the cache, then exhaust it
+		time.Sleep(25 * time.Millisecond) // give it a chance to screw up
+
+		tester.lock.RLock()
+		retrieved = len(tester.ownBlocks)
+		tester.lock.RUnlock()
+		if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
+			t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
+		}
+		// Permit the blocked blocks to import
+		if atomic.LoadUint32(&blocked) > 0 {
+			atomic.StoreUint32(&blocked, uint32(0))
+			proceed <- struct{}{}
+		}
+	}
+	// Check that we haven't pulled more blocks than available
+	assertOwnChain(t, tester, targetBlocks+1)
+	if err := <-errc; err != nil {
+		t.Fatalf("block synchronization failed: %v", err)
+	}
+}
+
+// Tests that simple synchronization against a forked chain works correctly. In
+// this test common ancestor lookup should *not* be short circuited, and a full
+// binary search should be executed.
+func TestForkedSync62(t *testing.T)      { testForkedSync(t, 62, FullSync) }
+func TestForkedSync63Full(t *testing.T)  { testForkedSync(t, 63, FullSync) }
+func TestForkedSync63Fast(t *testing.T)  { testForkedSync(t, 63, FastSync) }
+func TestForkedSync64Full(t *testing.T)  { testForkedSync(t, 64, FullSync) }
+func TestForkedSync64Fast(t *testing.T)  { testForkedSync(t, 64, FastSync) }
+func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
+
+func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
+	chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
+	tester.newPeer("fork A", protocol, chainA)
+	tester.newPeer("fork B", protocol, chainB)
+
+	// Synchronise with the peer and make sure all blocks were retrieved
+	if err := tester.sync("fork A", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chainA.len())
+
+	// Synchronise with the second peer and make sure that fork is pulled too
+	if err := tester.sync("fork B", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
+}
+
+// Tests that synchronising against a much shorter but much heavyer fork works
+// corrently and is not dropped.
+func TestHeavyForkedSync62(t *testing.T)      { testHeavyForkedSync(t, 62, FullSync) }
+func TestHeavyForkedSync63Full(t *testing.T)  { testHeavyForkedSync(t, 63, FullSync) }
+func TestHeavyForkedSync63Fast(t *testing.T)  { testHeavyForkedSync(t, 63, FastSync) }
+func TestHeavyForkedSync64Full(t *testing.T)  { testHeavyForkedSync(t, 64, FullSync) }
+func TestHeavyForkedSync64Fast(t *testing.T)  { testHeavyForkedSync(t, 64, FastSync) }
+func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) }
+
+func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
+	chainB := testChainForkHeavy.shorten(testChainBase.len() + 80)
+	tester.newPeer("light", protocol, chainA)
+	tester.newPeer("heavy", protocol, chainB)
+
+	// Synchronise with the peer and make sure all blocks were retrieved
+	if err := tester.sync("light", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chainA.len())
+
+	// Synchronise with the second peer and make sure that fork is pulled too
+	if err := tester.sync("heavy", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
+}
+
+// Tests that chain forks are contained within a certain interval of the current
+// chain head, ensuring that malicious peers cannot waste resources by feeding
+// long dead chains.
+func TestBoundedForkedSync62(t *testing.T)      { testBoundedForkedSync(t, 62, FullSync) }
+func TestBoundedForkedSync63Full(t *testing.T)  { testBoundedForkedSync(t, 63, FullSync) }
+func TestBoundedForkedSync63Fast(t *testing.T)  { testBoundedForkedSync(t, 63, FastSync) }
+func TestBoundedForkedSync64Full(t *testing.T)  { testBoundedForkedSync(t, 64, FullSync) }
+func TestBoundedForkedSync64Fast(t *testing.T)  { testBoundedForkedSync(t, 64, FastSync) }
+func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) }
+
+func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chainA := testChainForkLightA
+	chainB := testChainForkLightB
+	tester.newPeer("original", protocol, chainA)
+	tester.newPeer("rewriter", protocol, chainB)
+
+	// Synchronise with the peer and make sure all blocks were retrieved
+	if err := tester.sync("original", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chainA.len())
+
+	// Synchronise with the second peer and ensure that the fork is rejected to being too old
+	if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor {
+		t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
+	}
+}
+
+// Tests that chain forks are contained within a certain interval of the current
+// chain head for short but heavy forks too. These are a bit special because they
+// take different ancestor lookup paths.
+func TestBoundedHeavyForkedSync62(t *testing.T)      { testBoundedHeavyForkedSync(t, 62, FullSync) }
+func TestBoundedHeavyForkedSync63Full(t *testing.T)  { testBoundedHeavyForkedSync(t, 63, FullSync) }
+func TestBoundedHeavyForkedSync63Fast(t *testing.T)  { testBoundedHeavyForkedSync(t, 63, FastSync) }
+func TestBoundedHeavyForkedSync64Full(t *testing.T)  { testBoundedHeavyForkedSync(t, 64, FullSync) }
+func TestBoundedHeavyForkedSync64Fast(t *testing.T)  { testBoundedHeavyForkedSync(t, 64, FastSync) }
+func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) }
+
+func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create a long enough forked chain
+	chainA := testChainForkLightA
+	chainB := testChainForkHeavy
+	tester.newPeer("original", protocol, chainA)
+	tester.newPeer("heavy-rewriter", protocol, chainB)
+
+	// Synchronise with the peer and make sure all blocks were retrieved
+	if err := tester.sync("original", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chainA.len())
+
+	// Synchronise with the second peer and ensure that the fork is rejected to being too old
+	if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
+		t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
+	}
+}
+
+// Tests that an inactive downloader will not accept incoming block headers and
+// bodies.
+func TestInactiveDownloader62(t *testing.T) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Check that neither block headers nor bodies are accepted
+	if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
+		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+	}
+	if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
+		t.Errorf("error mismatch: have %v, want  %v", err, errNoSyncActive)
+	}
+}
+
+// Tests that an inactive downloader will not accept incoming block headers,
+// bodies and receipts.
+func TestInactiveDownloader63(t *testing.T) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Check that neither block headers nor bodies are accepted
+	if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
+		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+	}
+	if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
+		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+	}
+	if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive {
+		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+	}
+}
+
+// Tests that a canceled download wipes all previously accumulated state.
+func TestCancel62(t *testing.T)      { testCancel(t, 62, FullSync) }
+func TestCancel63Full(t *testing.T)  { testCancel(t, 63, FullSync) }
+func TestCancel63Fast(t *testing.T)  { testCancel(t, 63, FastSync) }
+func TestCancel64Full(t *testing.T)  { testCancel(t, 64, FullSync) }
+func TestCancel64Fast(t *testing.T)  { testCancel(t, 64, FastSync) }
+func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
+
+func testCancel(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chain := testChainBase.shorten(MaxHeaderFetch)
+	tester.newPeer("peer", protocol, chain)
+
+	// Make sure canceling works with a pristine downloader
+	tester.downloader.Cancel()
+	if !tester.downloader.queue.Idle() {
+		t.Errorf("download queue not idle")
+	}
+	// Synchronise with the peer, but cancel afterwards
+	if err := tester.sync("peer", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	tester.downloader.Cancel()
+	if !tester.downloader.queue.Idle() {
+		t.Errorf("download queue not idle")
+	}
+}
+
+// Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
+func TestMultiSynchronisation62(t *testing.T)      { testMultiSynchronisation(t, 62, FullSync) }
+func TestMultiSynchronisation63Full(t *testing.T)  { testMultiSynchronisation(t, 63, FullSync) }
+func TestMultiSynchronisation63Fast(t *testing.T)  { testMultiSynchronisation(t, 63, FastSync) }
+func TestMultiSynchronisation64Full(t *testing.T)  { testMultiSynchronisation(t, 64, FullSync) }
+func TestMultiSynchronisation64Fast(t *testing.T)  { testMultiSynchronisation(t, 64, FastSync) }
+func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
+
+func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create various peers with various parts of the chain
+	targetPeers := 8
+	chain := testChainBase.shorten(targetPeers * 100)
+
+	for i := 0; i < targetPeers; i++ {
+		id := fmt.Sprintf("peer #%d", i)
+		tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1)))
+	}
+	if err := tester.sync("peer #0", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that synchronisations behave well in multi-version protocol environments
+// and not wreak havoc on other nodes in the network.
+func TestMultiProtoSynchronisation62(t *testing.T)      { testMultiProtoSync(t, 62, FullSync) }
+func TestMultiProtoSynchronisation63Full(t *testing.T)  { testMultiProtoSync(t, 63, FullSync) }
+func TestMultiProtoSynchronisation63Fast(t *testing.T)  { testMultiProtoSync(t, 63, FastSync) }
+func TestMultiProtoSynchronisation64Full(t *testing.T)  { testMultiProtoSync(t, 64, FullSync) }
+func TestMultiProtoSynchronisation64Fast(t *testing.T)  { testMultiProtoSync(t, 64, FastSync) }
+func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
+
+func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create a small enough block chain to download
+	chain := testChainBase.shorten(blockCacheItems - 15)
+
+	// Create peers of every type
+	tester.newPeer("peer 62", 62, chain)
+	tester.newPeer("peer 63", 63, chain)
+	tester.newPeer("peer 64", 64, chain)
+
+	// Synchronise with the requested peer and make sure all blocks were retrieved
+	if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chain.len())
+
+	// Check that no peers have been dropped off
+	for _, version := range []int{62, 63, 64} {
+		peer := fmt.Sprintf("peer %d", version)
+		if _, ok := tester.peers[peer]; !ok {
+			t.Errorf("%s dropped", peer)
+		}
+	}
+}
+
+// Tests that if a block is empty (e.g. header only), no body request should be
+// made, and instead the header should be assembled into a whole block in itself.
+func TestEmptyShortCircuit62(t *testing.T)      { testEmptyShortCircuit(t, 62, FullSync) }
+func TestEmptyShortCircuit63Full(t *testing.T)  { testEmptyShortCircuit(t, 63, FullSync) }
+func TestEmptyShortCircuit63Fast(t *testing.T)  { testEmptyShortCircuit(t, 63, FastSync) }
+func TestEmptyShortCircuit64Full(t *testing.T)  { testEmptyShortCircuit(t, 64, FullSync) }
+func TestEmptyShortCircuit64Fast(t *testing.T)  { testEmptyShortCircuit(t, 64, FastSync) }
+func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
+
+func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create a block chain to download
+	chain := testChainBase
+	tester.newPeer("peer", protocol, chain)
+
+	// Instrument the downloader to signal body requests
+	bodiesHave, receiptsHave := int32(0), int32(0)
+	tester.downloader.bodyFetchHook = func(headers []*types.Header) {
+		atomic.AddInt32(&bodiesHave, int32(len(headers)))
+	}
+	tester.downloader.receiptFetchHook = func(headers []*types.Header) {
+		atomic.AddInt32(&receiptsHave, int32(len(headers)))
+	}
+	// Synchronise with the peer and make sure all blocks were retrieved
+	if err := tester.sync("peer", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chain.len())
+
+	// Validate the number of block bodies that should have been requested
+	bodiesNeeded, receiptsNeeded := 0, 0
+	for _, block := range chain.blockm {
+		if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
+			bodiesNeeded++
+		}
+	}
+	for _, receipt := range chain.receiptm {
+		if mode == FastSync && len(receipt) > 0 {
+			receiptsNeeded++
+		}
+	}
+	if int(bodiesHave) != bodiesNeeded {
+		t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
+	}
+	if int(receiptsHave) != receiptsNeeded {
+		t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
+	}
+}
+
+// Tests that headers are enqueued continuously, preventing malicious nodes from
+// stalling the downloader by feeding gapped header chains.
+func TestMissingHeaderAttack62(t *testing.T)      { testMissingHeaderAttack(t, 62, FullSync) }
+func TestMissingHeaderAttack63Full(t *testing.T)  { testMissingHeaderAttack(t, 63, FullSync) }
+func TestMissingHeaderAttack63Fast(t *testing.T)  { testMissingHeaderAttack(t, 63, FastSync) }
+func TestMissingHeaderAttack64Full(t *testing.T)  { testMissingHeaderAttack(t, 64, FullSync) }
+func TestMissingHeaderAttack64Fast(t *testing.T)  { testMissingHeaderAttack(t, 64, FastSync) }
+func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
+
+func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chain := testChainBase.shorten(blockCacheItems - 15)
+	brokenChain := chain.shorten(chain.len())
+	delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
+	tester.newPeer("attack", protocol, brokenChain)
+
+	if err := tester.sync("attack", nil, mode); err == nil {
+		t.Fatalf("succeeded attacker synchronisation")
+	}
+	// Synchronise with the valid peer and make sure sync succeeds
+	tester.newPeer("valid", protocol, chain)
+	if err := tester.sync("valid", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that if requested headers are shifted (i.e. first is missing), the queue
+// detects the invalid numbering.
+func TestShiftedHeaderAttack62(t *testing.T)      { testShiftedHeaderAttack(t, 62, FullSync) }
+func TestShiftedHeaderAttack63Full(t *testing.T)  { testShiftedHeaderAttack(t, 63, FullSync) }
+func TestShiftedHeaderAttack63Fast(t *testing.T)  { testShiftedHeaderAttack(t, 63, FastSync) }
+func TestShiftedHeaderAttack64Full(t *testing.T)  { testShiftedHeaderAttack(t, 64, FullSync) }
+func TestShiftedHeaderAttack64Fast(t *testing.T)  { testShiftedHeaderAttack(t, 64, FastSync) }
+func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
+
+func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chain := testChainBase.shorten(blockCacheItems - 15)
+
+	// Attempt a full sync with an attacker feeding shifted headers
+	brokenChain := chain.shorten(chain.len())
+	delete(brokenChain.headerm, brokenChain.chain[1])
+	delete(brokenChain.blockm, brokenChain.chain[1])
+	delete(brokenChain.receiptm, brokenChain.chain[1])
+	tester.newPeer("attack", protocol, brokenChain)
+	if err := tester.sync("attack", nil, mode); err == nil {
+		t.Fatalf("succeeded attacker synchronisation")
+	}
+
+	// Synchronise with the valid peer and make sure sync succeeds
+	tester.newPeer("valid", protocol, chain)
+	if err := tester.sync("valid", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that upon detecting an invalid header, the recent ones are rolled back
+// for various failure scenarios. Afterwards a full sync is attempted to make
+// sure no state was corrupted.
+func TestInvalidHeaderRollback63Fast(t *testing.T)  { testInvalidHeaderRollback(t, 63, FastSync) }
+func TestInvalidHeaderRollback64Fast(t *testing.T)  { testInvalidHeaderRollback(t, 64, FastSync) }
+func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
+
+func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	// Create a small enough block chain to download
+	targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
+	chain := testChainBase.shorten(targetBlocks)
+
+	// Attempt to sync with an attacker that feeds junk during the fast sync phase.
+	// This should result in the last fsHeaderSafetyNet headers being rolled back.
+	missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
+	fastAttackChain := chain.shorten(chain.len())
+	delete(fastAttackChain.headerm, fastAttackChain.chain[missing])
+	tester.newPeer("fast-attack", protocol, fastAttackChain)
+
+	if err := tester.sync("fast-attack", nil, mode); err == nil {
+		t.Fatalf("succeeded fast attacker synchronisation")
+	}
+	if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
+		t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
+	}
+
+	// Attempt to sync with an attacker that feeds junk during the block import phase.
+	// This should result in both the last fsHeaderSafetyNet number of headers being
+	// rolled back, and also the pivot point being reverted to a non-block status.
+	missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
+	blockAttackChain := chain.shorten(chain.len())
+	delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in
+	delete(blockAttackChain.headerm, blockAttackChain.chain[missing])
+	tester.newPeer("block-attack", protocol, blockAttackChain)
+
+	if err := tester.sync("block-attack", nil, mode); err == nil {
+		t.Fatalf("succeeded block attacker synchronisation")
+	}
+	if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
+		t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
+	}
+	if mode == FastSync {
+		if head := tester.CurrentBlock().NumberU64(); head != 0 {
+			t.Errorf("fast sync pivot block #%d not rolled back", head)
+		}
+	}
+
+	// Attempt to sync with an attacker that withholds promised blocks after the
+	// fast sync pivot point. This could be a trial to leave the node with a bad
+	// but already imported pivot block.
+	withholdAttackChain := chain.shorten(chain.len())
+	tester.newPeer("withhold-attack", protocol, withholdAttackChain)
+	tester.downloader.syncInitHook = func(uint64, uint64) {
+		for i := missing; i < withholdAttackChain.len(); i++ {
+			delete(withholdAttackChain.headerm, withholdAttackChain.chain[i])
+		}
+		tester.downloader.syncInitHook = nil
+	}
+	if err := tester.sync("withhold-attack", nil, mode); err == nil {
+		t.Fatalf("succeeded withholding attacker synchronisation")
+	}
+	if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
+		t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
+	}
+	if mode == FastSync {
+		if head := tester.CurrentBlock().NumberU64(); head != 0 {
+			t.Errorf("fast sync pivot block #%d not rolled back", head)
+		}
+	}
+
+	// synchronise with the valid peer and make sure sync succeeds. Since the last rollback
+	// should also disable fast syncing for this process, verify that we did a fresh full
+	// sync. Note, we can't assert anything about the receipts since we won't purge the
+	// database of them, hence we can't use assertOwnChain.
+	tester.newPeer("valid", protocol, chain)
+	if err := tester.sync("valid", nil, mode); err != nil {
+		t.Fatalf("failed to synchronise blocks: %v", err)
+	}
+	if hs := len(tester.ownHeaders); hs != chain.len() {
+		t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len())
+	}
+	if mode != LightSync {
+		if bs := len(tester.ownBlocks); bs != chain.len() {
+			t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
+		}
+	}
+}
+
+// Tests that a peer advertising an high TD doesn't get to stall the downloader
+// afterwards by not sending any useful hashes.
+func TestHighTDStarvationAttack62(t *testing.T)      { testHighTDStarvationAttack(t, 62, FullSync) }
+func TestHighTDStarvationAttack63Full(t *testing.T)  { testHighTDStarvationAttack(t, 63, FullSync) }
+func TestHighTDStarvationAttack63Fast(t *testing.T)  { testHighTDStarvationAttack(t, 63, FastSync) }
+func TestHighTDStarvationAttack64Full(t *testing.T)  { testHighTDStarvationAttack(t, 64, FullSync) }
+func TestHighTDStarvationAttack64Fast(t *testing.T)  { testHighTDStarvationAttack(t, 64, FastSync) }
+func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
+
+func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+
+	chain := testChainBase.shorten(1)
+	tester.newPeer("attack", protocol, chain)
+	if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
+		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
+	}
+}
+
+// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
+func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
+func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
+func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
+
+func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
+	t.Parallel()
+
+	// Define the disconnection requirement for individual hash fetch errors
+	tests := []struct {
+		result error
+		drop   bool
+	}{
+		{nil, false},                        // Sync succeeded, all is well
+		{errBusy, false},                    // Sync is already in progress, no problem
+		{errUnknownPeer, false},             // Peer is unknown, was already dropped, don't double drop
+		{errBadPeer, true},                  // Peer was deemed bad for some reason, drop it
+		{errStallingPeer, true},             // Peer was detected to be stalling, drop it
+		{errNoPeers, false},                 // No peers to download from, soft race, no issue
+		{errTimeout, true},                  // No hashes received in due time, drop the peer
+		{errEmptyHeaderSet, true},           // No headers were returned as a response, drop as it's a dead end
+		{errPeersUnavailable, true},         // Nobody had the advertised blocks, drop the advertiser
+		{errInvalidAncestor, true},          // Agreed upon ancestor is not acceptable, drop the chain rewriter
+		{errInvalidChain, true},             // Hash chain was detected as invalid, definitely drop
+		{errInvalidBlock, false},            // A bad peer was detected, but not the sync origin
+		{errInvalidBody, false},             // A bad peer was detected, but not the sync origin
+		{errInvalidReceipt, false},          // A bad peer was detected, but not the sync origin
+		{errCancelBlockFetch, false},        // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelHeaderFetch, false},       // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelBodyFetch, false},         // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelReceiptFetch, false},      // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelHeaderProcessing, false},  // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+	}
+	// Run the tests and check disconnection status
+	tester := newTester()
+	defer tester.terminate()
+	chain := testChainBase.shorten(1)
+
+	for i, tt := range tests {
+		// Register a new peer and ensure it's presence
+		id := fmt.Sprintf("test %d", i)
+		if err := tester.newPeer(id, protocol, chain); err != nil {
+			t.Fatalf("test %d: failed to register new peer: %v", i, err)
+		}
+		if _, ok := tester.peers[id]; !ok {
+			t.Fatalf("test %d: registered peer not found", i)
+		}
+		// Simulate a synchronisation and check the required result
+		tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
+
+		tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
+		if _, ok := tester.peers[id]; !ok != tt.drop {
+			t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
+		}
+	}
+}
+
+// Tests that synchronisation progress (origin block number, current block number
+// and highest block number) is tracked and updated correctly.
+func TestSyncProgress62(t *testing.T)      { testSyncProgress(t, 62, FullSync) }
+func TestSyncProgress63Full(t *testing.T)  { testSyncProgress(t, 63, FullSync) }
+func TestSyncProgress63Fast(t *testing.T)  { testSyncProgress(t, 63, FastSync) }
+func TestSyncProgress64Full(t *testing.T)  { testSyncProgress(t, 64, FullSync) }
+func TestSyncProgress64Fast(t *testing.T)  { testSyncProgress(t, 64, FastSync) }
+func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
+
+func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+	chain := testChainBase.shorten(blockCacheItems - 15)
+
+	// Set a sync init hook to catch progress changes
+	starting := make(chan struct{})
+	progress := make(chan struct{})
+
+	tester.downloader.syncInitHook = func(origin, latest uint64) {
+		starting <- struct{}{}
+		<-progress
+	}
+	checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+	// Synchronise half the blocks and check initial progress
+	tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2))
+	pending := new(sync.WaitGroup)
+	pending.Add(1)
+
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("peer-half", nil, mode); err != nil {
+			panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+		HighestBlock: uint64(chain.len()/2 - 1),
+	})
+	progress <- struct{}{}
+	pending.Wait()
+
+	// Synchronise all the blocks and check continuation progress
+	tester.newPeer("peer-full", protocol, chain)
+	pending.Add(1)
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("peer-full", nil, mode); err != nil {
+			panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
+		StartingBlock: uint64(chain.len()/2 - 1),
+		CurrentBlock:  uint64(chain.len()/2 - 1),
+		HighestBlock:  uint64(chain.len() - 1),
+	})
+
+	// Check final progress after successful sync
+	progress <- struct{}{}
+	pending.Wait()
+	checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+		StartingBlock: uint64(chain.len()/2 - 1),
+		CurrentBlock:  uint64(chain.len() - 1),
+		HighestBlock:  uint64(chain.len() - 1),
+	})
+}
+
+func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
+	t.Helper()
+	p := d.Progress()
+	p.KnownStates, p.PulledStates = 0, 0
+	want.KnownStates, want.PulledStates = 0, 0
+	if p != want {
+		t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want)
+	}
+}
+
+// Tests that synchronisation progress (origin block number and highest block
+// number) is tracked and updated correctly in case of a fork (or manual head
+// revertal).
+func TestForkedSyncProgress62(t *testing.T)      { testForkedSyncProgress(t, 62, FullSync) }
+func TestForkedSyncProgress63Full(t *testing.T)  { testForkedSyncProgress(t, 63, FullSync) }
+func TestForkedSyncProgress63Fast(t *testing.T)  { testForkedSyncProgress(t, 63, FastSync) }
+func TestForkedSyncProgress64Full(t *testing.T)  { testForkedSyncProgress(t, 64, FullSync) }
+func TestForkedSyncProgress64Fast(t *testing.T)  { testForkedSyncProgress(t, 64, FastSync) }
+func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
+
+func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+	chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch)
+	chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch)
+
+	// Set a sync init hook to catch progress changes
+	starting := make(chan struct{})
+	progress := make(chan struct{})
+
+	tester.downloader.syncInitHook = func(origin, latest uint64) {
+		starting <- struct{}{}
+		<-progress
+	}
+	checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+	// Synchronise with one of the forks and check progress
+	tester.newPeer("fork A", protocol, chainA)
+	pending := new(sync.WaitGroup)
+	pending.Add(1)
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("fork A", nil, mode); err != nil {
+			panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+		}
+	}()
+	<-starting
+
+	checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+		HighestBlock: uint64(chainA.len() - 1),
+	})
+	progress <- struct{}{}
+	pending.Wait()
+
+	// Simulate a successful sync above the fork
+	tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
+
+	// Synchronise with the second fork and check progress resets
+	tester.newPeer("fork B", protocol, chainB)
+	pending.Add(1)
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("fork B", nil, mode); err != nil {
+			panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{
+		StartingBlock: uint64(testChainBase.len()) - 1,
+		CurrentBlock:  uint64(chainA.len() - 1),
+		HighestBlock:  uint64(chainB.len() - 1),
+	})
+
+	// Check final progress after successful sync
+	progress <- struct{}{}
+	pending.Wait()
+	checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+		StartingBlock: uint64(testChainBase.len()) - 1,
+		CurrentBlock:  uint64(chainB.len() - 1),
+		HighestBlock:  uint64(chainB.len() - 1),
+	})
+}
+
+// Tests that if synchronisation is aborted due to some failure, then the progress
+// origin is not updated in the next sync cycle, as it should be considered the
+// continuation of the previous sync and not a new instance.
+func TestFailedSyncProgress62(t *testing.T)      { testFailedSyncProgress(t, 62, FullSync) }
+func TestFailedSyncProgress63Full(t *testing.T)  { testFailedSyncProgress(t, 63, FullSync) }
+func TestFailedSyncProgress63Fast(t *testing.T)  { testFailedSyncProgress(t, 63, FastSync) }
+func TestFailedSyncProgress64Full(t *testing.T)  { testFailedSyncProgress(t, 64, FullSync) }
+func TestFailedSyncProgress64Fast(t *testing.T)  { testFailedSyncProgress(t, 64, FastSync) }
+func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
+
+func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+	chain := testChainBase.shorten(blockCacheItems - 15)
+
+	// Set a sync init hook to catch progress changes
+	starting := make(chan struct{})
+	progress := make(chan struct{})
+
+	tester.downloader.syncInitHook = func(origin, latest uint64) {
+		starting <- struct{}{}
+		<-progress
+	}
+	checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+	// Attempt a full sync with a faulty peer
+	brokenChain := chain.shorten(chain.len())
+	missing := brokenChain.len() / 2
+	delete(brokenChain.headerm, brokenChain.chain[missing])
+	delete(brokenChain.blockm, brokenChain.chain[missing])
+	delete(brokenChain.receiptm, brokenChain.chain[missing])
+	tester.newPeer("faulty", protocol, brokenChain)
+
+	pending := new(sync.WaitGroup)
+	pending.Add(1)
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("faulty", nil, mode); err == nil {
+			panic("succeeded faulty synchronisation")
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+		HighestBlock: uint64(brokenChain.len() - 1),
+	})
+	progress <- struct{}{}
+	pending.Wait()
+	afterFailedSync := tester.downloader.Progress()
+
+	// Synchronise with a good peer and check that the progress origin remind the same
+	// after a failure
+	tester.newPeer("valid", protocol, chain)
+	pending.Add(1)
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("valid", nil, mode); err != nil {
+			panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "completing", afterFailedSync)
+
+	// Check final progress after successful sync
+	progress <- struct{}{}
+	pending.Wait()
+	checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+		CurrentBlock: uint64(chain.len() - 1),
+		HighestBlock: uint64(chain.len() - 1),
+	})
+}
+
+// Tests that if an attacker fakes a chain height, after the attack is detected,
+// the progress height is successfully reduced at the next sync invocation.
+func TestFakedSyncProgress62(t *testing.T)      { testFakedSyncProgress(t, 62, FullSync) }
+func TestFakedSyncProgress63Full(t *testing.T)  { testFakedSyncProgress(t, 63, FullSync) }
+func TestFakedSyncProgress63Fast(t *testing.T)  { testFakedSyncProgress(t, 63, FastSync) }
+func TestFakedSyncProgress64Full(t *testing.T)  { testFakedSyncProgress(t, 64, FullSync) }
+func TestFakedSyncProgress64Fast(t *testing.T)  { testFakedSyncProgress(t, 64, FastSync) }
+func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
+
+func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
+	tester := newTester()
+	defer tester.terminate()
+	chain := testChainBase.shorten(blockCacheItems - 15)
+
+	// Set a sync init hook to catch progress changes
+	starting := make(chan struct{})
+	progress := make(chan struct{})
+	tester.downloader.syncInitHook = func(origin, latest uint64) {
+		starting <- struct{}{}
+		<-progress
+	}
+	checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+	// Create and sync with an attacker that promises a higher chain than available.
+	brokenChain := chain.shorten(chain.len())
+	numMissing := 5
+	for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- {
+		delete(brokenChain.headerm, brokenChain.chain[i])
+	}
+	tester.newPeer("attack", protocol, brokenChain)
+
+	pending := new(sync.WaitGroup)
+	pending.Add(1)
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("attack", nil, mode); err == nil {
+			panic("succeeded attacker synchronisation")
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+		HighestBlock: uint64(brokenChain.len() - 1),
+	})
+	progress <- struct{}{}
+	pending.Wait()
+	afterFailedSync := tester.downloader.Progress()
+
+	// Synchronise with a good peer and check that the progress height has been reduced to
+	// the true value.
+	validChain := chain.shorten(chain.len() - numMissing)
+	tester.newPeer("valid", protocol, validChain)
+	pending.Add(1)
+
+	go func() {
+		defer pending.Done()
+		if err := tester.sync("valid", nil, mode); err != nil {
+			panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+		}
+	}()
+	<-starting
+	checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
+		CurrentBlock: afterFailedSync.CurrentBlock,
+		HighestBlock: uint64(validChain.len() - 1),
+	})
+
+	// Check final progress after successful sync.
+	progress <- struct{}{}
+	pending.Wait()
+	checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+		CurrentBlock: uint64(validChain.len() - 1),
+		HighestBlock: uint64(validChain.len() - 1),
+	})
+}
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang(t *testing.T) {
+	t.Parallel()
+
+	testCases := []struct {
+		protocol int
+		syncMode SyncMode
+	}{
+		{62, FullSync},
+		{63, FullSync},
+		{63, FastSync},
+		{64, FullSync},
+		{64, FastSync},
+		{64, LightSync},
+	}
+	for _, tc := range testCases {
+		t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
+			t.Parallel()
+			testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
+		})
+	}
+}
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+	master := newTester()
+	defer master.terminate()
+	chain := testChainBase.shorten(15)
+
+	for i := 0; i < 200; i++ {
+		tester := newTester()
+		tester.peerDb = master.peerDb
+		tester.newPeer("peer", protocol, chain)
+
+		// Whenever the downloader requests headers, flood it with
+		// a lot of unrequested header deliveries.
+		tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
+			peer:   tester.downloader.peers.peers["peer"].peer,
+			tester: tester,
+		}
+		if err := tester.sync("peer", nil, mode); err != nil {
+			t.Errorf("test %d: sync failed: %v", i, err)
+		}
+		tester.terminate()
+	}
+}
+
+type floodingTestPeer struct {
+	peer   Peer
+	tester *downloadTester
+}
+
+func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
+func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
+	return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
+}
+func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
+	return ftp.peer.RequestBodies(hashes)
+}
+func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
+	return ftp.peer.RequestReceipts(hashes)
+}
+func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
+	return ftp.peer.RequestNodeData(hashes)
+}
+
+func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
+	deliveriesDone := make(chan struct{}, 500)
+	for i := 0; i < cap(deliveriesDone)-1; i++ {
+		peer := fmt.Sprintf("fake-peer%d", i)
+		go func() {
+			ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
+			deliveriesDone <- struct{}{}
+		}()
+	}
+
+	// None of the extra deliveries should block.
+	timeout := time.After(60 * time.Second)
+	launched := false
+	for i := 0; i < cap(deliveriesDone); i++ {
+		select {
+		case <-deliveriesDone:
+			if !launched {
+				// Start delivering the requested headers
+				// after one of the flooding responses has arrived.
+				go func() {
+					ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
+					deliveriesDone <- struct{}{}
+				}()
+				launched = true
+			}
+		case <-timeout:
+			panic("blocked")
+		}
+	}
+	return nil
+}
diff --git a/dex/downloader/events.go b/dex/downloader/events.go
new file mode 100644
index 000000000..64905b8f2
--- /dev/null
+++ b/dex/downloader/events.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+type DoneEvent struct{}
+type StartEvent struct{}
+type FailedEvent struct{ Err error }
diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go
new file mode 100644
index 000000000..3e29357ba
--- /dev/null
+++ b/dex/downloader/fakepeer.go
@@ -0,0 +1,161 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"math/big"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/core"
+	"github.com/dexon-foundation/dexon/core/rawdb"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/ethdb"
+)
+
+// FakePeer is a mock downloader peer that operates on a local database instance
+// instead of being an actual live node. It's useful for testing and to implement
+// sync commands from an existing local database.
+type FakePeer struct {
+	id string
+	db ethdb.Database
+	hc *core.HeaderChain
+	dl *Downloader
+}
+
+// NewFakePeer creates a new mock downloader peer with the given data sources.
+func NewFakePeer(id string, db ethdb.Database, hc *core.HeaderChain, dl *Downloader) *FakePeer {
+	return &FakePeer{id: id, db: db, hc: hc, dl: dl}
+}
+
+// Head implements downloader.Peer, returning the current head hash and number
+// of the best known header.
+func (p *FakePeer) Head() (common.Hash, *big.Int) {
+	header := p.hc.CurrentHeader()
+	return header.Hash(), header.Number
+}
+
+// RequestHeadersByHash implements downloader.Peer, returning a batch of headers
+// defined by the origin hash and the associated query parameters.
+func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, reverse bool) error {
+	var (
+		headers []*types.Header
+		unknown bool
+	)
+	for !unknown && len(headers) < amount {
+		origin := p.hc.GetHeaderByHash(hash)
+		if origin == nil {
+			break
+		}
+		number := origin.Number.Uint64()
+		headers = append(headers, origin)
+		if reverse {
+			for i := 0; i <= skip; i++ {
+				if header := p.hc.GetHeader(hash, number); header != nil {
+					hash = header.ParentHash
+					number--
+				} else {
+					unknown = true
+					break
+				}
+			}
+		} else {
+			var (
+				current = origin.Number.Uint64()
+				next    = current + uint64(skip) + 1
+			)
+			if header := p.hc.GetHeaderByNumber(next); header != nil {
+				if p.hc.GetBlockHashesFromHash(header.Hash(), uint64(skip+1))[skip] == hash {
+					hash = header.Hash()
+				} else {
+					unknown = true
+				}
+			} else {
+				unknown = true
+			}
+		}
+	}
+	p.dl.DeliverHeaders(p.id, headers)
+	return nil
+}
+
+// RequestHeadersByNumber implements downloader.Peer, returning a batch of headers
+// defined by the origin number and the associated query parameters.
+func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, reverse bool) error {
+	var (
+		headers []*types.Header
+		unknown bool
+	)
+	for !unknown && len(headers) < amount {
+		origin := p.hc.GetHeaderByNumber(number)
+		if origin == nil {
+			break
+		}
+		if reverse {
+			if number >= uint64(skip+1) {
+				number -= uint64(skip + 1)
+			} else {
+				unknown = true
+			}
+		} else {
+			number += uint64(skip + 1)
+		}
+		headers = append(headers, origin)
+	}
+	p.dl.DeliverHeaders(p.id, headers)
+	return nil
+}
+
+// RequestBodies implements downloader.Peer, returning a batch of block bodies
+// corresponding to the specified block hashes.
+func (p *FakePeer) RequestBodies(hashes []common.Hash) error {
+	var (
+		txs    [][]*types.Transaction
+		uncles [][]*types.Header
+	)
+	for _, hash := range hashes {
+		block := rawdb.ReadBlock(p.db, hash, *p.hc.GetBlockNumber(hash))
+
+		txs = append(txs, block.Transactions())
+		uncles = append(uncles, block.Uncles())
+	}
+	p.dl.DeliverBodies(p.id, txs, uncles)
+	return nil
+}
+
+// RequestReceipts implements downloader.Peer, returning a batch of transaction
+// receipts corresponding to the specified block hashes.
+func (p *FakePeer) RequestReceipts(hashes []common.Hash) error {
+	var receipts [][]*types.Receipt
+	for _, hash := range hashes {
+		receipts = append(receipts, rawdb.ReadReceipts(p.db, hash, *p.hc.GetBlockNumber(hash)))
+	}
+	p.dl.DeliverReceipts(p.id, receipts)
+	return nil
+}
+
+// RequestNodeData implements downloader.Peer, returning a batch of state trie
+// nodes corresponding to the specified trie hashes.
+func (p *FakePeer) RequestNodeData(hashes []common.Hash) error {
+	var data [][]byte
+	for _, hash := range hashes {
+		if entry, err := p.db.Get(hash.Bytes()); err == nil {
+			data = append(data, entry)
+		}
+	}
+	p.dl.DeliverNodeData(p.id, data)
+	return nil
+}
diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go
new file mode 100644
index 000000000..0d6041712
--- /dev/null
+++ b/dex/downloader/metrics.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the metrics collected by the downloader.
+
+package downloader
+
+import (
+	"github.com/dexon-foundation/dexon/metrics"
+)
+
+var (
+	headerInMeter      = metrics.NewRegisteredMeter("dex/downloader/headers/in", nil)
+	headerReqTimer     = metrics.NewRegisteredTimer("dex/downloader/headers/req", nil)
+	headerDropMeter    = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil)
+	headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil)
+
+	bodyInMeter      = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil)
+	bodyReqTimer     = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil)
+	bodyDropMeter    = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil)
+	bodyTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/timeout", nil)
+
+	receiptInMeter      = metrics.NewRegisteredMeter("dex/downloader/receipts/in", nil)
+	receiptReqTimer     = metrics.NewRegisteredTimer("dex/downloader/receipts/req", nil)
+	receiptDropMeter    = metrics.NewRegisteredMeter("dex/downloader/receipts/drop", nil)
+	receiptTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/timeout", nil)
+
+	stateInMeter   = metrics.NewRegisteredMeter("dex/downloader/states/in", nil)
+	stateDropMeter = metrics.NewRegisteredMeter("dex/downloader/states/drop", nil)
+)
diff --git a/dex/downloader/modes.go b/dex/downloader/modes.go
new file mode 100644
index 000000000..8ecdf91f1
--- /dev/null
+++ b/dex/downloader/modes.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import "fmt"
+
+// SyncMode represents the synchronisation mode of the downloader.
+type SyncMode int
+
+const (
+	FullSync  SyncMode = iota // Synchronise the entire blockchain history from full blocks
+	FastSync                  // Quickly download the headers, full sync only at the chain head
+	LightSync                 // Download only the headers and terminate afterwards
+)
+
+func (mode SyncMode) IsValid() bool {
+	return mode >= FullSync && mode <= LightSync
+}
+
+// String implements the stringer interface.
+func (mode SyncMode) String() string {
+	switch mode {
+	case FullSync:
+		return "full"
+	case FastSync:
+		return "fast"
+	case LightSync:
+		return "light"
+	default:
+		return "unknown"
+	}
+}
+
+func (mode SyncMode) MarshalText() ([]byte, error) {
+	switch mode {
+	case FullSync:
+		return []byte("full"), nil
+	case FastSync:
+		return []byte("fast"), nil
+	case LightSync:
+		return []byte("light"), nil
+	default:
+		return nil, fmt.Errorf("unknown sync mode %d", mode)
+	}
+}
+
+func (mode *SyncMode) UnmarshalText(text []byte) error {
+	switch string(text) {
+	case "full":
+		*mode = FullSync
+	case "fast":
+		*mode = FastSync
+	case "light":
+		*mode = LightSync
+	default:
+		return fmt.Errorf(`unknown sync mode %q, want "full", "fast" or "light"`, text)
+	}
+	return nil
+}
diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go
new file mode 100644
index 000000000..1fd82fbe3
--- /dev/null
+++ b/dex/downloader/peer.go
@@ -0,0 +1,573 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the active peer-set of the downloader, maintaining both failures
+// as well as reputation metrics to prioritize the block retrievals.
+
+package downloader
+
+import (
+	"errors"
+	"fmt"
+	"math"
+	"math/big"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/event"
+	"github.com/dexon-foundation/dexon/log"
+)
+
+const (
+	maxLackingHashes  = 4096 // Maximum number of entries allowed on the list or lacking items
+	measurementImpact = 0.1  // The impact a single measurement has on a peer's final throughput value.
+)
+
+var (
+	errAlreadyFetching   = errors.New("already fetching blocks from peer")
+	errAlreadyRegistered = errors.New("peer is already registered")
+	errNotRegistered     = errors.New("peer is not registered")
+)
+
+// peerConnection represents an active peer from which hashes and blocks are retrieved.
+type peerConnection struct {
+	id string // Unique identifier of the peer
+
+	headerIdle  int32 // Current header activity state of the peer (idle = 0, active = 1)
+	blockIdle   int32 // Current block activity state of the peer (idle = 0, active = 1)
+	receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+	stateIdle   int32 // Current node data activity state of the peer (idle = 0, active = 1)
+
+	headerThroughput  float64 // Number of headers measured to be retrievable per second
+	blockThroughput   float64 // Number of blocks (bodies) measured to be retrievable per second
+	receiptThroughput float64 // Number of receipts measured to be retrievable per second
+	stateThroughput   float64 // Number of node data pieces measured to be retrievable per second
+
+	rtt time.Duration // Request round trip time to track responsiveness (QoS)
+
+	headerStarted  time.Time // Time instance when the last header fetch was started
+	blockStarted   time.Time // Time instance when the last block (body) fetch was started
+	receiptStarted time.Time // Time instance when the last receipt fetch was started
+	stateStarted   time.Time // Time instance when the last node data fetch was started
+
+	lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
+
+	peer Peer
+
+	version int        // Eth protocol version number to switch strategies
+	log     log.Logger // Contextual logger to add extra infos to peer logs
+	lock    sync.RWMutex
+}
+
+// LightPeer encapsulates the methods required to synchronise with a remote light peer.
+type LightPeer interface {
+	Head() (common.Hash, *big.Int)
+	RequestHeadersByHash(common.Hash, int, int, bool) error
+	RequestHeadersByNumber(uint64, int, int, bool) error
+}
+
+// Peer encapsulates the methods required to synchronise with a remote full peer.
+type Peer interface {
+	LightPeer
+	RequestBodies([]common.Hash) error
+	RequestReceipts([]common.Hash) error
+	RequestNodeData([]common.Hash) error
+}
+
+// lightPeerWrapper wraps a LightPeer struct, stubbing out the Peer-only methods.
+type lightPeerWrapper struct {
+	peer LightPeer
+}
+
+func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
+func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error {
+	return w.peer.RequestHeadersByHash(h, amount, skip, reverse)
+}
+func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error {
+	return w.peer.RequestHeadersByNumber(i, amount, skip, reverse)
+}
+func (w *lightPeerWrapper) RequestBodies([]common.Hash) error {
+	panic("RequestBodies not supported in light client mode sync")
+}
+func (w *lightPeerWrapper) RequestReceipts([]common.Hash) error {
+	panic("RequestReceipts not supported in light client mode sync")
+}
+func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error {
+	panic("RequestNodeData not supported in light client mode sync")
+}
+
+// newPeerConnection creates a new downloader peer.
+func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection {
+	return &peerConnection{
+		id:      id,
+		lacking: make(map[common.Hash]struct{}),
+
+		peer: peer,
+
+		version: version,
+		log:     logger,
+	}
+}
+
+// Reset clears the internal state of a peer entity.
+func (p *peerConnection) Reset() {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	atomic.StoreInt32(&p.headerIdle, 0)
+	atomic.StoreInt32(&p.blockIdle, 0)
+	atomic.StoreInt32(&p.receiptIdle, 0)
+	atomic.StoreInt32(&p.stateIdle, 0)
+
+	p.headerThroughput = 0
+	p.blockThroughput = 0
+	p.receiptThroughput = 0
+	p.stateThroughput = 0
+
+	p.lacking = make(map[common.Hash]struct{})
+}
+
+// FetchHeaders sends a header retrieval request to the remote peer.
+func (p *peerConnection) FetchHeaders(from uint64, count int) error {
+	// Sanity check the protocol version
+	if p.version < 62 {
+		panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
+	}
+	// Short circuit if the peer is already fetching
+	if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
+		return errAlreadyFetching
+	}
+	p.headerStarted = time.Now()
+
+	// Issue the header retrieval request (absolut upwards without gaps)
+	go p.peer.RequestHeadersByNumber(from, count, 0, false)
+
+	return nil
+}
+
+// FetchBodies sends a block body retrieval request to the remote peer.
+func (p *peerConnection) FetchBodies(request *fetchRequest) error {
+	// Sanity check the protocol version
+	if p.version < 62 {
+		panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version))
+	}
+	// Short circuit if the peer is already fetching
+	if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
+		return errAlreadyFetching
+	}
+	p.blockStarted = time.Now()
+
+	// Convert the header set to a retrievable slice
+	hashes := make([]common.Hash, 0, len(request.Headers))
+	for _, header := range request.Headers {
+		hashes = append(hashes, header.Hash())
+	}
+	go p.peer.RequestBodies(hashes)
+
+	return nil
+}
+
+// FetchReceipts sends a receipt retrieval request to the remote peer.
+func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
+	// Sanity check the protocol version
+	if p.version < 63 {
+		panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version))
+	}
+	// Short circuit if the peer is already fetching
+	if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
+		return errAlreadyFetching
+	}
+	p.receiptStarted = time.Now()
+
+	// Convert the header set to a retrievable slice
+	hashes := make([]common.Hash, 0, len(request.Headers))
+	for _, header := range request.Headers {
+		hashes = append(hashes, header.Hash())
+	}
+	go p.peer.RequestReceipts(hashes)
+
+	return nil
+}
+
+// FetchNodeData sends a node state data retrieval request to the remote peer.
+func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
+	// Sanity check the protocol version
+	if p.version < 63 {
+		panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
+	}
+	// Short circuit if the peer is already fetching
+	if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
+		return errAlreadyFetching
+	}
+	p.stateStarted = time.Now()
+
+	go p.peer.RequestNodeData(hashes)
+
+	return nil
+}
+
+// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
+// requests. Its estimated header retrieval throughput is updated with that measured
+// just now.
+func (p *peerConnection) SetHeadersIdle(delivered int) {
+	p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
+}
+
+// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
+// requests. Its estimated body retrieval throughput is updated with that measured
+// just now.
+func (p *peerConnection) SetBodiesIdle(delivered int) {
+	p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
+}
+
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
+// retrieval requests. Its estimated receipt retrieval throughput is updated
+// with that measured just now.
+func (p *peerConnection) SetReceiptsIdle(delivered int) {
+	p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
+}
+
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
+// data retrieval requests. Its estimated state retrieval throughput is updated
+// with that measured just now.
+func (p *peerConnection) SetNodeDataIdle(delivered int) {
+	p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
+}
+
+// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its estimated retrieval throughput is updated with that measured just now.
+func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+	// Irrelevant of the scaling, make sure the peer ends up idle
+	defer atomic.StoreInt32(idle, 0)
+
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	// If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
+	if delivered == 0 {
+		*throughput = 0
+		return
+	}
+	// Otherwise update the throughput with a new measurement
+	elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
+	measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
+
+	*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
+	p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
+
+	p.log.Trace("Peer throughput measurements updated",
+		"hps", p.headerThroughput, "bps", p.blockThroughput,
+		"rps", p.receiptThroughput, "sps", p.stateThroughput,
+		"miss", len(p.lacking), "rtt", p.rtt)
+}
+
+// HeaderCapacity retrieves the peers header download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
+}
+
+// BlockCapacity retrieves the peers block download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch)))
+}
+
+// ReceiptCapacity retrieves the peers receipt download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch)))
+}
+
+// NodeDataCapacity retrieves the peers state download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch)))
+}
+
+// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
+// that a peer is known not to have (i.e. have been requested before). If the
+// set reaches its maximum allowed capacity, items are randomly dropped off.
+func (p *peerConnection) MarkLacking(hash common.Hash) {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	for len(p.lacking) >= maxLackingHashes {
+		for drop := range p.lacking {
+			delete(p.lacking, drop)
+			break
+		}
+	}
+	p.lacking[hash] = struct{}{}
+}
+
+// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
+// list (i.e. whether we know that the peer does not have it).
+func (p *peerConnection) Lacks(hash common.Hash) bool {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	_, ok := p.lacking[hash]
+	return ok
+}
+
+// peerSet represents the collection of active peer participating in the chain
+// download procedure.
+type peerSet struct {
+	peers        map[string]*peerConnection
+	newPeerFeed  event.Feed
+	peerDropFeed event.Feed
+	lock         sync.RWMutex
+}
+
+// newPeerSet creates a new peer set top track the active download sources.
+func newPeerSet() *peerSet {
+	return &peerSet{
+		peers: make(map[string]*peerConnection),
+	}
+}
+
+// SubscribeNewPeers subscribes to peer arrival events.
+func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
+	return ps.newPeerFeed.Subscribe(ch)
+}
+
+// SubscribePeerDrops subscribes to peer departure events.
+func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
+	return ps.peerDropFeed.Subscribe(ch)
+}
+
+// Reset iterates over the current peer set, and resets each of the known peers
+// to prepare for a next batch of block retrieval.
+func (ps *peerSet) Reset() {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	for _, peer := range ps.peers {
+		peer.Reset()
+	}
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+//
+// The method also sets the starting throughput values of the new peer to the
+// average of all existing peers, to give it a realistic chance of being used
+// for data retrievals.
+func (ps *peerSet) Register(p *peerConnection) error {
+	// Retrieve the current median RTT as a sane default
+	p.rtt = ps.medianRTT()
+
+	// Register the new peer with some meaningful defaults
+	ps.lock.Lock()
+	if _, ok := ps.peers[p.id]; ok {
+		ps.lock.Unlock()
+		return errAlreadyRegistered
+	}
+	if len(ps.peers) > 0 {
+		p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
+
+		for _, peer := range ps.peers {
+			peer.lock.RLock()
+			p.headerThroughput += peer.headerThroughput
+			p.blockThroughput += peer.blockThroughput
+			p.receiptThroughput += peer.receiptThroughput
+			p.stateThroughput += peer.stateThroughput
+			peer.lock.RUnlock()
+		}
+		p.headerThroughput /= float64(len(ps.peers))
+		p.blockThroughput /= float64(len(ps.peers))
+		p.receiptThroughput /= float64(len(ps.peers))
+		p.stateThroughput /= float64(len(ps.peers))
+	}
+	ps.peers[p.id] = p
+	ps.lock.Unlock()
+
+	ps.newPeerFeed.Send(p)
+	return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+	ps.lock.Lock()
+	p, ok := ps.peers[id]
+	if !ok {
+		defer ps.lock.Unlock()
+		return errNotRegistered
+	}
+	delete(ps.peers, id)
+	ps.lock.Unlock()
+
+	ps.peerDropFeed.Send(p)
+	return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peerConnection {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	return len(ps.peers)
+}
+
+// AllPeers retrieves a flat list of all the peers within the set.
+func (ps *peerSet) AllPeers() []*peerConnection {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	list := make([]*peerConnection, 0, len(ps.peers))
+	for _, p := range ps.peers {
+		list = append(list, p)
+	}
+	return list
+}
+
+// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
+// within the active peer set, ordered by their reputation.
+func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
+	idle := func(p *peerConnection) bool {
+		return atomic.LoadInt32(&p.headerIdle) == 0
+	}
+	throughput := func(p *peerConnection) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.headerThroughput
+	}
+	return ps.idlePeers(62, 64, idle, throughput)
+}
+
+// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
+// the active peer set, ordered by their reputation.
+func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
+	idle := func(p *peerConnection) bool {
+		return atomic.LoadInt32(&p.blockIdle) == 0
+	}
+	throughput := func(p *peerConnection) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.blockThroughput
+	}
+	return ps.idlePeers(62, 64, idle, throughput)
+}
+
+// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
+// within the active peer set, ordered by their reputation.
+func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
+	idle := func(p *peerConnection) bool {
+		return atomic.LoadInt32(&p.receiptIdle) == 0
+	}
+	throughput := func(p *peerConnection) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.receiptThroughput
+	}
+	return ps.idlePeers(63, 64, idle, throughput)
+}
+
+// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
+// peers within the active peer set, ordered by their reputation.
+func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
+	idle := func(p *peerConnection) bool {
+		return atomic.LoadInt32(&p.stateIdle) == 0
+	}
+	throughput := func(p *peerConnection) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.stateThroughput
+	}
+	return ps.idlePeers(63, 64, idle, throughput)
+}
+
+// idlePeers retrieves a flat list of all currently idle peers satisfying the
+// protocol version constraints, using the provided function to check idleness.
+// The resulting set of peers are sorted by their measure throughput.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
+	for _, p := range ps.peers {
+		if p.version >= minProtocol && p.version <= maxProtocol {
+			if idleCheck(p) {
+				idle = append(idle, p)
+			}
+			total++
+		}
+	}
+	for i := 0; i < len(idle); i++ {
+		for j := i + 1; j < len(idle); j++ {
+			if throughput(idle[i]) < throughput(idle[j]) {
+				idle[i], idle[j] = idle[j], idle[i]
+			}
+		}
+	}
+	return idle, total
+}
+
+// medianRTT returns the median RTT of the peerset, considering only the tuning
+// peers if there are more peers available.
+func (ps *peerSet) medianRTT() time.Duration {
+	// Gather all the currently measured round trip times
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	rtts := make([]float64, 0, len(ps.peers))
+	for _, p := range ps.peers {
+		p.lock.RLock()
+		rtts = append(rtts, float64(p.rtt))
+		p.lock.RUnlock()
+	}
+	sort.Float64s(rtts)
+
+	median := rttMaxEstimate
+	if qosTuningPeers <= len(rtts) {
+		median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers
+	} else if len(rtts) > 0 {
+		median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
+	}
+	// Restrict the RTT into some QoS defaults, irrelevant of true RTT
+	if median < rttMinEstimate {
+		median = rttMinEstimate
+	}
+	if median > rttMaxEstimate {
+		median = rttMaxEstimate
+	}
+	return median
+}
diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go
new file mode 100644
index 000000000..12c75e793
--- /dev/null
+++ b/dex/downloader/queue.go
@@ -0,0 +1,885 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the block download scheduler to collect download tasks and schedule
+// them in an ordered, and throttled way.
+
+package downloader
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/common/prque"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/log"
+	"github.com/dexon-foundation/dexon/metrics"
+)
+
+var (
+	blockCacheItems      = 8192             // Maximum number of blocks to cache before throttling the download
+	blockCacheMemory     = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
+	blockCacheSizeWeight = 0.1              // Multiplier to approximate the average block size based on past ones
+)
+
+var (
+	errNoFetchesPending = errors.New("no fetches pending")
+	errStaleDelivery    = errors.New("stale delivery")
+)
+
+// fetchRequest is a currently running data retrieval operation.
+type fetchRequest struct {
+	Peer    *peerConnection // Peer to which the request was sent
+	From    uint64          // [eth/62] Requested chain element index (used for skeleton fills only)
+	Headers []*types.Header // [eth/62] Requested headers, sorted by request order
+	Time    time.Time       // Time when the request was made
+}
+
+// fetchResult is a struct collecting partial results from data fetchers until
+// all outstanding pieces complete and the result as a whole can be processed.
+type fetchResult struct {
+	Pending int         // Number of data fetches still pending
+	Hash    common.Hash // Hash of the header to prevent recalculating
+
+	Header       *types.Header
+	Uncles       []*types.Header
+	Transactions types.Transactions
+	Receipts     types.Receipts
+}
+
+// queue represents hashes that are either need fetching or are being fetched
+type queue struct {
+	mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
+
+	// Headers are "special", they download in batches, supported by a skeleton chain
+	headerHead      common.Hash                    // [eth/62] Hash of the last queued header to verify order
+	headerTaskPool  map[uint64]*types.Header       // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
+	headerTaskQueue *prque.Prque                   // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
+	headerPeerMiss  map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
+	headerPendPool  map[string]*fetchRequest       // [eth/62] Currently pending header retrieval operations
+	headerResults   []*types.Header                // [eth/62] Result cache accumulating the completed headers
+	headerProced    int                            // [eth/62] Number of headers already processed from the results
+	headerOffset    uint64                         // [eth/62] Number of the first header in the result cache
+	headerContCh    chan bool                      // [eth/62] Channel to notify when header download finishes
+
+	// All data retrievals below are based on an already assembles header chain
+	blockTaskPool  map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
+	blockTaskQueue *prque.Prque                  // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
+	blockPendPool  map[string]*fetchRequest      // [eth/62] Currently pending block (body) retrieval operations
+	blockDonePool  map[common.Hash]struct{}      // [eth/62] Set of the completed block (body) fetches
+
+	receiptTaskPool  map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
+	receiptTaskQueue *prque.Prque                  // [eth/63] Priority queue of the headers to fetch the receipts for
+	receiptPendPool  map[string]*fetchRequest      // [eth/63] Currently pending receipt retrieval operations
+	receiptDonePool  map[common.Hash]struct{}      // [eth/63] Set of the completed receipt fetches
+
+	resultCache  []*fetchResult     // Downloaded but not yet delivered fetch results
+	resultOffset uint64             // Offset of the first cached fetch result in the block chain
+	resultSize   common.StorageSize // Approximate size of a block (exponential moving average)
+
+	lock   *sync.Mutex
+	active *sync.Cond
+	closed bool
+}
+
+// newQueue creates a new download queue for scheduling block retrieval.
+func newQueue() *queue {
+	lock := new(sync.Mutex)
+	return &queue{
+		headerPendPool:   make(map[string]*fetchRequest),
+		headerContCh:     make(chan bool),
+		blockTaskPool:    make(map[common.Hash]*types.Header),
+		blockTaskQueue:   prque.New(nil),
+		blockPendPool:    make(map[string]*fetchRequest),
+		blockDonePool:    make(map[common.Hash]struct{}),
+		receiptTaskPool:  make(map[common.Hash]*types.Header),
+		receiptTaskQueue: prque.New(nil),
+		receiptPendPool:  make(map[string]*fetchRequest),
+		receiptDonePool:  make(map[common.Hash]struct{}),
+		resultCache:      make([]*fetchResult, blockCacheItems),
+		active:           sync.NewCond(lock),
+		lock:             lock,
+	}
+}
+
+// Reset clears out the queue contents.
+func (q *queue) Reset() {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	q.closed = false
+	q.mode = FullSync
+
+	q.headerHead = common.Hash{}
+	q.headerPendPool = make(map[string]*fetchRequest)
+
+	q.blockTaskPool = make(map[common.Hash]*types.Header)
+	q.blockTaskQueue.Reset()
+	q.blockPendPool = make(map[string]*fetchRequest)
+	q.blockDonePool = make(map[common.Hash]struct{})
+
+	q.receiptTaskPool = make(map[common.Hash]*types.Header)
+	q.receiptTaskQueue.Reset()
+	q.receiptPendPool = make(map[string]*fetchRequest)
+	q.receiptDonePool = make(map[common.Hash]struct{})
+
+	q.resultCache = make([]*fetchResult, blockCacheItems)
+	q.resultOffset = 0
+}
+
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+	q.lock.Lock()
+	q.closed = true
+	q.lock.Unlock()
+	q.active.Broadcast()
+}
+
+// PendingHeaders retrieves the number of header requests pending for retrieval.
+func (q *queue) PendingHeaders() int {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.headerTaskQueue.Size()
+}
+
+// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
+func (q *queue) PendingBlocks() int {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.blockTaskQueue.Size()
+}
+
+// PendingReceipts retrieves the number of block receipts pending for retrieval.
+func (q *queue) PendingReceipts() int {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.receiptTaskQueue.Size()
+}
+
+// InFlightHeaders retrieves whether there are header fetch requests currently
+// in flight.
+func (q *queue) InFlightHeaders() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return len(q.headerPendPool) > 0
+}
+
+// InFlightBlocks retrieves whether there are block fetch requests currently in
+// flight.
+func (q *queue) InFlightBlocks() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return len(q.blockPendPool) > 0
+}
+
+// InFlightReceipts retrieves whether there are receipt fetch requests currently
+// in flight.
+func (q *queue) InFlightReceipts() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return len(q.receiptPendPool) > 0
+}
+
+// Idle returns if the queue is fully idle or has some data still inside.
+func (q *queue) Idle() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
+	pending := len(q.blockPendPool) + len(q.receiptPendPool)
+	cached := len(q.blockDonePool) + len(q.receiptDonePool)
+
+	return (queued + pending + cached) == 0
+}
+
+// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
+// fetches exceed block cache).
+func (q *queue) ShouldThrottleBlocks() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0
+}
+
+// ShouldThrottleReceipts checks if the download should be throttled (active receipt
+// fetches exceed block cache).
+func (q *queue) ShouldThrottleReceipts() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0
+}
+
+// resultSlots calculates the number of results slots available for requests
+// whilst adhering to both the item and the memory limit too of the results
+// cache.
+func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
+	// Calculate the maximum length capped by the memory limit
+	limit := len(q.resultCache)
+	if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) {
+		limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
+	}
+	// Calculate the number of slots already finished
+	finished := 0
+	for _, result := range q.resultCache[:limit] {
+		if result == nil {
+			break
+		}
+		if _, ok := donePool[result.Hash]; ok {
+			finished++
+		}
+	}
+	// Calculate the number of slots currently downloading
+	pending := 0
+	for _, request := range pendPool {
+		for _, header := range request.Headers {
+			if header.Number.Uint64() < q.resultOffset+uint64(limit) {
+				pending++
+			}
+		}
+	}
+	// Return the free slots to distribute
+	return limit - finished - pending
+}
+
+// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
+// up an already retrieved header skeleton.
+func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	// No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
+	if q.headerResults != nil {
+		panic("skeleton assembly already in progress")
+	}
+	// Schedule all the header retrieval tasks for the skeleton assembly
+	q.headerTaskPool = make(map[uint64]*types.Header)
+	q.headerTaskQueue = prque.New(nil)
+	q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
+	q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
+	q.headerProced = 0
+	q.headerOffset = from
+	q.headerContCh = make(chan bool, 1)
+
+	for i, header := range skeleton {
+		index := from + uint64(i*MaxHeaderFetch)
+
+		q.headerTaskPool[index] = header
+		q.headerTaskQueue.Push(index, -int64(index))
+	}
+}
+
+// RetrieveHeaders retrieves the header chain assemble based on the scheduled
+// skeleton.
+func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	headers, proced := q.headerResults, q.headerProced
+	q.headerResults, q.headerProced = nil, 0
+
+	return headers, proced
+}
+
+// Schedule adds a set of headers for the download queue for scheduling, returning
+// the new headers encountered.
+func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	// Insert all the headers prioritised by the contained block number
+	inserts := make([]*types.Header, 0, len(headers))
+	for _, header := range headers {
+		// Make sure chain order is honoured and preserved throughout
+		hash := header.Hash()
+		if header.Number == nil || header.Number.Uint64() != from {
+			log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
+			break
+		}
+		if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
+			log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
+			break
+		}
+		// Make sure no duplicate requests are executed
+		if _, ok := q.blockTaskPool[hash]; ok {
+			log.Warn("Header  already scheduled for block fetch", "number", header.Number, "hash", hash)
+			continue
+		}
+		if _, ok := q.receiptTaskPool[hash]; ok {
+			log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
+			continue
+		}
+		// Queue the header for content retrieval
+		q.blockTaskPool[hash] = header
+		q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
+
+		if q.mode == FastSync {
+			q.receiptTaskPool[hash] = header
+			q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+		}
+		inserts = append(inserts, header)
+		q.headerHead = hash
+		from++
+	}
+	return inserts
+}
+
+// Results retrieves and permanently removes a batch of fetch results from
+// the cache. the result slice will be empty if the queue has been closed.
+func (q *queue) Results(block bool) []*fetchResult {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	// Count the number of items available for processing
+	nproc := q.countProcessableItems()
+	for nproc == 0 && !q.closed {
+		if !block {
+			return nil
+		}
+		q.active.Wait()
+		nproc = q.countProcessableItems()
+	}
+	// Since we have a batch limit, don't pull more into "dangling" memory
+	if nproc > maxResultsProcess {
+		nproc = maxResultsProcess
+	}
+	results := make([]*fetchResult, nproc)
+	copy(results, q.resultCache[:nproc])
+	if len(results) > 0 {
+		// Mark results as done before dropping them from the cache.
+		for _, result := range results {
+			hash := result.Header.Hash()
+			delete(q.blockDonePool, hash)
+			delete(q.receiptDonePool, hash)
+		}
+		// Delete the results from the cache and clear the tail.
+		copy(q.resultCache, q.resultCache[nproc:])
+		for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+			q.resultCache[i] = nil
+		}
+		// Advance the expected block number of the first cache entry.
+		q.resultOffset += uint64(nproc)
+
+		// Recalculate the result item weights to prevent memory exhaustion
+		for _, result := range results {
+			size := result.Header.Size()
+			for _, uncle := range result.Uncles {
+				size += uncle.Size()
+			}
+			for _, receipt := range result.Receipts {
+				size += receipt.Size()
+			}
+			for _, tx := range result.Transactions {
+				size += tx.Size()
+			}
+			q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
+		}
+	}
+	return results
+}
+
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
+	for i, result := range q.resultCache {
+		if result == nil || result.Pending > 0 {
+			return i
+		}
+	}
+	return len(q.resultCache)
+}
+
+// ReserveHeaders reserves a set of headers for the given peer, skipping any
+// previously failed batches.
+func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	// Short circuit if the peer's already downloading something (sanity check to
+	// not corrupt state)
+	if _, ok := q.headerPendPool[p.id]; ok {
+		return nil
+	}
+	// Retrieve a batch of hashes, skipping previously failed ones
+	send, skip := uint64(0), []uint64{}
+	for send == 0 && !q.headerTaskQueue.Empty() {
+		from, _ := q.headerTaskQueue.Pop()
+		if q.headerPeerMiss[p.id] != nil {
+			if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
+				skip = append(skip, from.(uint64))
+				continue
+			}
+		}
+		send = from.(uint64)
+	}
+	// Merge all the skipped batches back
+	for _, from := range skip {
+		q.headerTaskQueue.Push(from, -int64(from))
+	}
+	// Assemble and return the block download request
+	if send == 0 {
+		return nil
+	}
+	request := &fetchRequest{
+		Peer: p,
+		From: send,
+		Time: time.Now(),
+	}
+	q.headerPendPool[p.id] = request
+	return request
+}
+
+// ReserveBodies reserves a set of body fetches for the given peer, skipping any
+// previously failed downloads. Beside the next batch of needed fetches, it also
+// returns a flag whether empty blocks were queued requiring processing.
+func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
+	isNoop := func(header *types.Header) bool {
+		return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
+	}
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
+}
+
+// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
+// any previously failed downloads. Beside the next batch of needed fetches, it
+// also returns a flag whether empty receipts were queued requiring importing.
+func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
+	isNoop := func(header *types.Header) bool {
+		return header.ReceiptHash == types.EmptyRootHash
+	}
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
+}
+
+// reserveHeaders reserves a set of data download operations for a given peer,
+// skipping any previously failed ones. This method is a generic version used
+// by the individual special reservation functions.
+//
+// Note, this method expects the queue lock to be already held for writing. The
+// reason the lock is not obtained in here is because the parameters already need
+// to access the queue, so they already need a lock anyway.
+func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
+	// Short circuit if the pool has been depleted, or if the peer's already
+	// downloading something (sanity check not to corrupt state)
+	if taskQueue.Empty() {
+		return nil, false, nil
+	}
+	if _, ok := pendPool[p.id]; ok {
+		return nil, false, nil
+	}
+	// Calculate an upper limit on the items we might fetch (i.e. throttling)
+	space := q.resultSlots(pendPool, donePool)
+
+	// Retrieve a batch of tasks, skipping previously failed ones
+	send := make([]*types.Header, 0, count)
+	skip := make([]*types.Header, 0)
+
+	progress := false
+	for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
+		header := taskQueue.PopItem().(*types.Header)
+		hash := header.Hash()
+
+		// If we're the first to request this task, initialise the result container
+		index := int(header.Number.Int64() - int64(q.resultOffset))
+		if index >= len(q.resultCache) || index < 0 {
+			common.Report("index allocation went beyond available resultCache space")
+			return nil, false, errInvalidChain
+		}
+		if q.resultCache[index] == nil {
+			components := 1
+			if q.mode == FastSync {
+				components = 2
+			}
+			q.resultCache[index] = &fetchResult{
+				Pending: components,
+				Hash:    hash,
+				Header:  header,
+			}
+		}
+		// If this fetch task is a noop, skip this fetch operation
+		if isNoop(header) {
+			donePool[hash] = struct{}{}
+			delete(taskPool, hash)
+
+			space, proc = space-1, proc-1
+			q.resultCache[index].Pending--
+			progress = true
+			continue
+		}
+		// Otherwise unless the peer is known not to have the data, add to the retrieve list
+		if p.Lacks(hash) {
+			skip = append(skip, header)
+		} else {
+			send = append(send, header)
+		}
+	}
+	// Merge all the skipped headers back
+	for _, header := range skip {
+		taskQueue.Push(header, -int64(header.Number.Uint64()))
+	}
+	if progress {
+		// Wake WaitResults, resultCache was modified
+		q.active.Signal()
+	}
+	// Assemble and return the block download request
+	if len(send) == 0 {
+		return nil, progress, nil
+	}
+	request := &fetchRequest{
+		Peer:    p,
+		Headers: send,
+		Time:    time.Now(),
+	}
+	pendPool[p.id] = request
+
+	return request, progress, nil
+}
+
+// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
+func (q *queue) CancelHeaders(request *fetchRequest) {
+	q.cancel(request, q.headerTaskQueue, q.headerPendPool)
+}
+
+// CancelBodies aborts a body fetch request, returning all pending headers to the
+// task queue.
+func (q *queue) CancelBodies(request *fetchRequest) {
+	q.cancel(request, q.blockTaskQueue, q.blockPendPool)
+}
+
+// CancelReceipts aborts a body fetch request, returning all pending headers to
+// the task queue.
+func (q *queue) CancelReceipts(request *fetchRequest) {
+	q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
+}
+
+// Cancel aborts a fetch request, returning all pending hashes to the task queue.
+func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	if request.From > 0 {
+		taskQueue.Push(request.From, -int64(request.From))
+	}
+	for _, header := range request.Headers {
+		taskQueue.Push(header, -int64(header.Number.Uint64()))
+	}
+	delete(pendPool, request.Peer.id)
+}
+
+// Revoke cancels all pending requests belonging to a given peer. This method is
+// meant to be called during a peer drop to quickly reassign owned data fetches
+// to remaining nodes.
+func (q *queue) Revoke(peerID string) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	if request, ok := q.blockPendPool[peerID]; ok {
+		for _, header := range request.Headers {
+			q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
+		}
+		delete(q.blockPendPool, peerID)
+	}
+	if request, ok := q.receiptPendPool[peerID]; ok {
+		for _, header := range request.Headers {
+			q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+		}
+		delete(q.receiptPendPool, peerID)
+	}
+}
+
+// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
+// canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
+}
+
+// ExpireBodies checks for in flight block body requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
+}
+
+// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
+}
+
+// expire is the generic check that move expired tasks from a pending pool back
+// into a task pool, returning all entities caught with expired tasks.
+//
+// Note, this method expects the queue lock to be already held. The
+// reason the lock is not obtained in here is because the parameters already need
+// to access the queue, so they already need a lock anyway.
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
+	// Iterate over the expired requests and return each to the queue
+	expiries := make(map[string]int)
+	for id, request := range pendPool {
+		if time.Since(request.Time) > timeout {
+			// Update the metrics with the timeout
+			timeoutMeter.Mark(1)
+
+			// Return any non satisfied requests to the pool
+			if request.From > 0 {
+				taskQueue.Push(request.From, -int64(request.From))
+			}
+			for _, header := range request.Headers {
+				taskQueue.Push(header, -int64(header.Number.Uint64()))
+			}
+			// Add the peer to the expiry report along the number of failed requests
+			expiries[id] = len(request.Headers)
+
+			// Remove the expired requests from the pending pool directly
+			delete(pendPool, id)
+		}
+	}
+	return expiries
+}
+
+// DeliverHeaders injects a header retrieval response into the header results
+// cache. This method either accepts all headers it received, or none of them
+// if they do not map correctly to the skeleton.
+//
+// If the headers are accepted, the method makes an attempt to deliver the set
+// of ready headers to the processor to keep the pipeline full. However it will
+// not block to prevent stalling other pending deliveries.
+func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	// Short circuit if the data was never requested
+	request := q.headerPendPool[id]
+	if request == nil {
+		return 0, errNoFetchesPending
+	}
+	headerReqTimer.UpdateSince(request.Time)
+	delete(q.headerPendPool, id)
+
+	// Ensure headers can be mapped onto the skeleton chain
+	target := q.headerTaskPool[request.From].Hash()
+
+	accepted := len(headers) == MaxHeaderFetch
+	if accepted {
+		if headers[0].Number.Uint64() != request.From {
+			log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
+			accepted = false
+		} else if headers[len(headers)-1].Hash() != target {
+			log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
+			accepted = false
+		}
+	}
+	if accepted {
+		for i, header := range headers[1:] {
+			hash := header.Hash()
+			if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
+				log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
+				accepted = false
+				break
+			}
+			if headers[i].Hash() != header.ParentHash {
+				log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
+				accepted = false
+				break
+			}
+		}
+	}
+	// If the batch of headers wasn't accepted, mark as unavailable
+	if !accepted {
+		log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)
+
+		miss := q.headerPeerMiss[id]
+		if miss == nil {
+			q.headerPeerMiss[id] = make(map[uint64]struct{})
+			miss = q.headerPeerMiss[id]
+		}
+		miss[request.From] = struct{}{}
+
+		q.headerTaskQueue.Push(request.From, -int64(request.From))
+		return 0, errors.New("delivery not accepted")
+	}
+	// Clean up a successful fetch and try to deliver any sub-results
+	copy(q.headerResults[request.From-q.headerOffset:], headers)
+	delete(q.headerTaskPool, request.From)
+
+	ready := 0
+	for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
+		ready += MaxHeaderFetch
+	}
+	if ready > 0 {
+		// Headers are ready for delivery, gather them and push forward (non blocking)
+		process := make([]*types.Header, ready)
+		copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
+
+		select {
+		case headerProcCh <- process:
+			log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
+			q.headerProced += len(process)
+		default:
+		}
+	}
+	// Check for termination and return
+	if len(q.headerTaskPool) == 0 {
+		q.headerContCh <- false
+	}
+	return len(headers), nil
+}
+
+// DeliverBodies injects a block body retrieval response into the results queue.
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
+			return errInvalidBody
+		}
+		result.Transactions = txLists[index]
+		result.Uncles = uncleLists[index]
+		return nil
+	}
+	return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
+}
+
+// DeliverReceipts injects a receipt retrieval response into the results queue.
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+		if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
+			return errInvalidReceipt
+		}
+		result.Receipts = receiptList[index]
+		return nil
+	}
+	return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
+}
+
+// deliver injects a data retrieval response into the results queue.
+//
+// Note, this method expects the queue lock to be already held for writing. The
+// reason the lock is not obtained in here is because the parameters already need
+// to access the queue, so they already need a lock anyway.
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+	results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
+	// Short circuit if the data was never requested
+	request := pendPool[id]
+	if request == nil {
+		return 0, errNoFetchesPending
+	}
+	reqTimer.UpdateSince(request.Time)
+	delete(pendPool, id)
+
+	// If no data items were retrieved, mark them as unavailable for the origin peer
+	if results == 0 {
+		for _, header := range request.Headers {
+			request.Peer.MarkLacking(header.Hash())
+		}
+	}
+	// Assemble each of the results with their headers and retrieved data parts
+	var (
+		accepted int
+		failure  error
+		useful   bool
+	)
+	for i, header := range request.Headers {
+		// Short circuit assembly if no more fetch results are found
+		if i >= results {
+			break
+		}
+		// Reconstruct the next result if contents match up
+		index := int(header.Number.Int64() - int64(q.resultOffset))
+		if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
+			failure = errInvalidChain
+			break
+		}
+		if err := reconstruct(header, i, q.resultCache[index]); err != nil {
+			failure = err
+			break
+		}
+		hash := header.Hash()
+
+		donePool[hash] = struct{}{}
+		q.resultCache[index].Pending--
+		useful = true
+		accepted++
+
+		// Clean up a successful fetch
+		request.Headers[i] = nil
+		delete(taskPool, hash)
+	}
+	// Return all failed or missing fetches to the queue
+	for _, header := range request.Headers {
+		if header != nil {
+			taskQueue.Push(header, -int64(header.Number.Uint64()))
+		}
+	}
+	// Wake up WaitResults
+	if accepted > 0 {
+		q.active.Signal()
+	}
+	// If none of the data was good, it's a stale delivery
+	switch {
+	case failure == nil || failure == errInvalidChain:
+		return accepted, failure
+	case useful:
+		return accepted, fmt.Errorf("partial failure: %v", failure)
+	default:
+		return accepted, errStaleDelivery
+	}
+}
+
+// Prepare configures the result cache to allow accepting and caching inbound
+// fetch results.
+func (q *queue) Prepare(offset uint64, mode SyncMode) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	// Prepare the queue for sync results
+	if q.resultOffset < offset {
+		q.resultOffset = offset
+	}
+	q.mode = mode
+}
diff --git a/dex/downloader/statesync.go b/dex/downloader/statesync.go
new file mode 100644
index 000000000..49117abbb
--- /dev/null
+++ b/dex/downloader/statesync.go
@@ -0,0 +1,484 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"fmt"
+	"hash"
+	"sync"
+	"time"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/core/rawdb"
+	"github.com/dexon-foundation/dexon/core/state"
+	"github.com/dexon-foundation/dexon/ethdb"
+	"github.com/dexon-foundation/dexon/log"
+	"github.com/dexon-foundation/dexon/trie"
+	"golang.org/x/crypto/sha3"
+)
+
+// stateReq represents a batch of state fetch requests grouped together into
+// a single data retrieval network packet.
+type stateReq struct {
+	items    []common.Hash              // Hashes of the state items to download
+	tasks    map[common.Hash]*stateTask // Download tasks to track previous attempts
+	timeout  time.Duration              // Maximum round trip time for this to complete
+	timer    *time.Timer                // Timer to fire when the RTT timeout expires
+	peer     *peerConnection            // Peer that we're requesting from
+	response [][]byte                   // Response data of the peer (nil for timeouts)
+	dropped  bool                       // Flag whether the peer dropped off early
+}
+
+// timedOut returns if this request timed out.
+func (req *stateReq) timedOut() bool {
+	return req.response == nil
+}
+
+// stateSyncStats is a collection of progress stats to report during a state trie
+// sync to RPC requests as well as to display in user logs.
+type stateSyncStats struct {
+	processed  uint64 // Number of state entries processed
+	duplicate  uint64 // Number of state entries downloaded twice
+	unexpected uint64 // Number of non-requested state entries received
+	pending    uint64 // Number of still pending state entries
+}
+
+// syncState starts downloading state with the given root hash.
+func (d *Downloader) syncState(root common.Hash) *stateSync {
+	s := newStateSync(d, root)
+	select {
+	case d.stateSyncStart <- s:
+	case <-d.quitCh:
+		s.err = errCancelStateFetch
+		close(s.done)
+	}
+	return s
+}
+
+// stateFetcher manages the active state sync and accepts requests
+// on its behalf.
+func (d *Downloader) stateFetcher() {
+	for {
+		select {
+		case s := <-d.stateSyncStart:
+			for next := s; next != nil; {
+				next = d.runStateSync(next)
+			}
+		case <-d.stateCh:
+			// Ignore state responses while no sync is running.
+		case <-d.quitCh:
+			return
+		}
+	}
+}
+
+// runStateSync runs a state synchronisation until it completes or another root
+// hash is requested to be switched over to.
+func (d *Downloader) runStateSync(s *stateSync) *stateSync {
+	var (
+		active   = make(map[string]*stateReq) // Currently in-flight requests
+		finished []*stateReq                  // Completed or failed requests
+		timeout  = make(chan *stateReq)       // Timed out active requests
+	)
+	defer func() {
+		// Cancel active request timers on exit. Also set peers to idle so they're
+		// available for the next sync.
+		for _, req := range active {
+			req.timer.Stop()
+			req.peer.SetNodeDataIdle(len(req.items))
+		}
+	}()
+	// Run the state sync.
+	go s.run()
+	defer s.Cancel()
+
+	// Listen for peer departure events to cancel assigned tasks
+	peerDrop := make(chan *peerConnection, 1024)
+	peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
+	defer peerSub.Unsubscribe()
+
+	for {
+		// Enable sending of the first buffered element if there is one.
+		var (
+			deliverReq   *stateReq
+			deliverReqCh chan *stateReq
+		)
+		if len(finished) > 0 {
+			deliverReq = finished[0]
+			deliverReqCh = s.deliver
+		}
+
+		select {
+		// The stateSync lifecycle:
+		case next := <-d.stateSyncStart:
+			return next
+
+		case <-s.done:
+			return nil
+
+		// Send the next finished request to the current sync:
+		case deliverReqCh <- deliverReq:
+			// Shift out the first request, but also set the emptied slot to nil for GC
+			copy(finished, finished[1:])
+			finished[len(finished)-1] = nil
+			finished = finished[:len(finished)-1]
+
+		// Handle incoming state packs:
+		case pack := <-d.stateCh:
+			// Discard any data not requested (or previously timed out)
+			req := active[pack.PeerId()]
+			if req == nil {
+				log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
+				continue
+			}
+			// Finalize the request and queue up for processing
+			req.timer.Stop()
+			req.response = pack.(*statePack).states
+
+			finished = append(finished, req)
+			delete(active, pack.PeerId())
+
+			// Handle dropped peer connections:
+		case p := <-peerDrop:
+			// Skip if no request is currently pending
+			req := active[p.id]
+			if req == nil {
+				continue
+			}
+			// Finalize the request and queue up for processing
+			req.timer.Stop()
+			req.dropped = true
+
+			finished = append(finished, req)
+			delete(active, p.id)
+
+		// Handle timed-out requests:
+		case req := <-timeout:
+			// If the peer is already requesting something else, ignore the stale timeout.
+			// This can happen when the timeout and the delivery happens simultaneously,
+			// causing both pathways to trigger.
+			if active[req.peer.id] != req {
+				continue
+			}
+			// Move the timed out data back into the download queue
+			finished = append(finished, req)
+			delete(active, req.peer.id)
+
+		// Track outgoing state requests:
+		case req := <-d.trackStateReq:
+			// If an active request already exists for this peer, we have a problem. In
+			// theory the trie node schedule must never assign two requests to the same
+			// peer. In practice however, a peer might receive a request, disconnect and
+			// immediately reconnect before the previous times out. In this case the first
+			// request is never honored, alas we must not silently overwrite it, as that
+			// causes valid requests to go missing and sync to get stuck.
+			if old := active[req.peer.id]; old != nil {
+				log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
+
+				// Make sure the previous one doesn't get siletly lost
+				old.timer.Stop()
+				old.dropped = true
+
+				finished = append(finished, old)
+			}
+			// Start a timer to notify the sync loop if the peer stalled.
+			req.timer = time.AfterFunc(req.timeout, func() {
+				select {
+				case timeout <- req:
+				case <-s.done:
+					// Prevent leaking of timer goroutines in the unlikely case where a
+					// timer is fired just before exiting runStateSync.
+				}
+			})
+			active[req.peer.id] = req
+		}
+	}
+}
+
+// stateSync schedules requests for downloading a particular state trie defined
+// by a given state root.
+type stateSync struct {
+	d *Downloader // Downloader instance to access and manage current peerset
+
+	sched  *trie.Sync                 // State trie sync scheduler defining the tasks
+	keccak hash.Hash                  // Keccak256 hasher to verify deliveries with
+	tasks  map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
+
+	numUncommitted   int
+	bytesUncommitted int
+
+	deliver    chan *stateReq // Delivery channel multiplexing peer responses
+	cancel     chan struct{}  // Channel to signal a termination request
+	cancelOnce sync.Once      // Ensures cancel only ever gets called once
+	done       chan struct{}  // Channel to signal termination completion
+	err        error          // Any error hit during sync (set before completion)
+}
+
+// stateTask represents a single trie node download task, containing a set of
+// peers already attempted retrieval from to detect stalled syncs and abort.
+type stateTask struct {
+	attempts map[string]struct{}
+}
+
+// newStateSync creates a new state trie download scheduler. This method does not
+// yet start the sync. The user needs to call run to initiate.
+func newStateSync(d *Downloader, root common.Hash) *stateSync {
+	return &stateSync{
+		d:       d,
+		sched:   state.NewStateSync(root, d.stateDB),
+		keccak:  sha3.NewLegacyKeccak256(),
+		tasks:   make(map[common.Hash]*stateTask),
+		deliver: make(chan *stateReq),
+		cancel:  make(chan struct{}),
+		done:    make(chan struct{}),
+	}
+}
+
+// run starts the task assignment and response processing loop, blocking until
+// it finishes, and finally notifying any goroutines waiting for the loop to
+// finish.
+func (s *stateSync) run() {
+	s.err = s.loop()
+	close(s.done)
+}
+
+// Wait blocks until the sync is done or canceled.
+func (s *stateSync) Wait() error {
+	<-s.done
+	return s.err
+}
+
+// Cancel cancels the sync and waits until it has shut down.
+func (s *stateSync) Cancel() error {
+	s.cancelOnce.Do(func() { close(s.cancel) })
+	return s.Wait()
+}
+
+// loop is the main event loop of a state trie sync. It it responsible for the
+// assignment of new tasks to peers (including sending it to them) as well as
+// for the processing of inbound data. Note, that the loop does not directly
+// receive data from peers, rather those are buffered up in the downloader and
+// pushed here async. The reason is to decouple processing from data receipt
+// and timeouts.
+func (s *stateSync) loop() (err error) {
+	// Listen for new peer events to assign tasks to them
+	newPeer := make(chan *peerConnection, 1024)
+	peerSub := s.d.peers.SubscribeNewPeers(newPeer)
+	defer peerSub.Unsubscribe()
+	defer func() {
+		cerr := s.commit(true)
+		if err == nil {
+			err = cerr
+		}
+	}()
+
+	// Keep assigning new tasks until the sync completes or aborts
+	for s.sched.Pending() > 0 {
+		if err = s.commit(false); err != nil {
+			return err
+		}
+		s.assignTasks()
+		// Tasks assigned, wait for something to happen
+		select {
+		case <-newPeer:
+			// New peer arrived, try to assign it download tasks
+
+		case <-s.cancel:
+			return errCancelStateFetch
+
+		case <-s.d.cancelCh:
+			return errCancelStateFetch
+
+		case req := <-s.deliver:
+			// Response, disconnect or timeout triggered, drop the peer if stalling
+			log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
+			if len(req.items) <= 2 && !req.dropped && req.timedOut() {
+				// 2 items are the minimum requested, if even that times out, we've no use of
+				// this peer at the moment.
+				log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
+				s.d.dropPeer(req.peer.id)
+			}
+			// Process all the received blobs and check for stale delivery
+			delivered, err := s.process(req)
+			if err != nil {
+				log.Warn("Node data write error", "err", err)
+				return err
+			}
+			req.peer.SetNodeDataIdle(delivered)
+		}
+	}
+	return nil
+}
+
+func (s *stateSync) commit(force bool) error {
+	if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
+		return nil
+	}
+	start := time.Now()
+	b := s.d.stateDB.NewBatch()
+	if written, err := s.sched.Commit(b); written == 0 || err != nil {
+		return err
+	}
+	if err := b.Write(); err != nil {
+		return fmt.Errorf("DB write error: %v", err)
+	}
+	s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
+	s.numUncommitted = 0
+	s.bytesUncommitted = 0
+	return nil
+}
+
+// assignTasks attempts to assign new tasks to all idle peers, either from the
+// batch currently being retried, or fetching new data from the trie sync itself.
+func (s *stateSync) assignTasks() {
+	// Iterate over all idle peers and try to assign them state fetches
+	peers, _ := s.d.peers.NodeDataIdlePeers()
+	for _, p := range peers {
+		// Assign a batch of fetches proportional to the estimated latency/bandwidth
+		cap := p.NodeDataCapacity(s.d.requestRTT())
+		req := &stateReq{peer: p, timeout: s.d.requestTTL()}
+		s.fillTasks(cap, req)
+
+		// If the peer was assigned tasks to fetch, send the network request
+		if len(req.items) > 0 {
+			req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
+			select {
+			case s.d.trackStateReq <- req:
+				req.peer.FetchNodeData(req.items)
+			case <-s.cancel:
+			case <-s.d.cancelCh:
+			}
+		}
+	}
+}
+
+// fillTasks fills the given request object with a maximum of n state download
+// tasks to send to the remote peer.
+func (s *stateSync) fillTasks(n int, req *stateReq) {
+	// Refill available tasks from the scheduler.
+	if len(s.tasks) < n {
+		new := s.sched.Missing(n - len(s.tasks))
+		for _, hash := range new {
+			s.tasks[hash] = &stateTask{make(map[string]struct{})}
+		}
+	}
+	// Find tasks that haven't been tried with the request's peer.
+	req.items = make([]common.Hash, 0, n)
+	req.tasks = make(map[common.Hash]*stateTask, n)
+	for hash, t := range s.tasks {
+		// Stop when we've gathered enough requests
+		if len(req.items) == n {
+			break
+		}
+		// Skip any requests we've already tried from this peer
+		if _, ok := t.attempts[req.peer.id]; ok {
+			continue
+		}
+		// Assign the request to this peer
+		t.attempts[req.peer.id] = struct{}{}
+		req.items = append(req.items, hash)
+		req.tasks[hash] = t
+		delete(s.tasks, hash)
+	}
+}
+
+// process iterates over a batch of delivered state data, injecting each item
+// into a running state sync, re-queuing any items that were requested but not
+// delivered.
+// Returns whether the peer actually managed to deliver anything of value,
+// and any error that occurred
+func (s *stateSync) process(req *stateReq) (int, error) {
+	// Collect processing stats and update progress if valid data was received
+	duplicate, unexpected, successful := 0, 0, 0
+
+	defer func(start time.Time) {
+		if duplicate > 0 || unexpected > 0 {
+			s.updateStats(0, duplicate, unexpected, time.Since(start))
+		}
+	}(time.Now())
+
+	// Iterate over all the delivered data and inject one-by-one into the trie
+	progress := false
+	for _, blob := range req.response {
+		prog, hash, err := s.processNodeData(blob)
+		switch err {
+		case nil:
+			s.numUncommitted++
+			s.bytesUncommitted += len(blob)
+			progress = progress || prog
+			successful++
+		case trie.ErrNotRequested:
+			unexpected++
+		case trie.ErrAlreadyProcessed:
+			duplicate++
+		default:
+			return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
+		}
+		if _, ok := req.tasks[hash]; ok {
+			delete(req.tasks, hash)
+		}
+	}
+	// Put unfulfilled tasks back into the retry queue
+	npeers := s.d.peers.Len()
+	for hash, task := range req.tasks {
+		// If the node did deliver something, missing items may be due to a protocol
+		// limit or a previous timeout + delayed delivery. Both cases should permit
+		// the node to retry the missing items (to avoid single-peer stalls).
+		if len(req.response) > 0 || req.timedOut() {
+			delete(task.attempts, req.peer.id)
+		}
+		// If we've requested the node too many times already, it may be a malicious
+		// sync where nobody has the right data. Abort.
+		if len(task.attempts) >= npeers {
+			return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
+		}
+		// Missing item, place into the retry queue.
+		s.tasks[hash] = task
+	}
+	return successful, nil
+}
+
+// processNodeData tries to inject a trie node data blob delivered from a remote
+// peer into the state trie, returning whether anything useful was written or any
+// error occurred.
+func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
+	res := trie.SyncResult{Data: blob}
+	s.keccak.Reset()
+	s.keccak.Write(blob)
+	s.keccak.Sum(res.Hash[:0])
+	committed, _, err := s.sched.Process([]trie.SyncResult{res})
+	return committed, res.Hash, err
+}
+
+// updateStats bumps the various state sync progress counters and displays a log
+// message for the user to see.
+func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) {
+	s.d.syncStatsLock.Lock()
+	defer s.d.syncStatsLock.Unlock()
+
+	s.d.syncStatsState.pending = uint64(s.sched.Pending())
+	s.d.syncStatsState.processed += uint64(written)
+	s.d.syncStatsState.duplicate += uint64(duplicate)
+	s.d.syncStatsState.unexpected += uint64(unexpected)
+
+	if written > 0 || duplicate > 0 || unexpected > 0 {
+		log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
+	}
+	if written > 0 {
+		rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed)
+	}
+}
diff --git a/dex/downloader/testchain_test.go b/dex/downloader/testchain_test.go
new file mode 100644
index 000000000..e73bed513
--- /dev/null
+++ b/dex/downloader/testchain_test.go
@@ -0,0 +1,221 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"fmt"
+	"math/big"
+	"sync"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/consensus/ethash"
+	"github.com/dexon-foundation/dexon/core"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/crypto"
+	"github.com/dexon-foundation/dexon/ethdb"
+	"github.com/dexon-foundation/dexon/params"
+)
+
+// Test chain parameters.
+var (
+	testKey, _  = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+	testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
+	testDB      = ethdb.NewMemDatabase()
+	testGenesis = core.GenesisBlockForTesting(testDB, testAddress, big.NewInt(1000000000))
+)
+
+// The common prefix of all test chains:
+var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
+
+// Different forks on top of the base chain:
+var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
+
+func init() {
+	var forkLen = int(MaxForkAncestry + 50)
+	var wg sync.WaitGroup
+	wg.Add(3)
+	go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()
+	go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }()
+	go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }()
+	wg.Wait()
+}
+
+type testChain struct {
+	genesis  *types.Block
+	chain    []common.Hash
+	headerm  map[common.Hash]*types.Header
+	blockm   map[common.Hash]*types.Block
+	receiptm map[common.Hash][]*types.Receipt
+	tdm      map[common.Hash]*big.Int
+}
+
+// newTestChain creates a blockchain of the given length.
+func newTestChain(length int, genesis *types.Block) *testChain {
+	tc := new(testChain).copy(length)
+	tc.genesis = genesis
+	tc.chain = append(tc.chain, genesis.Hash())
+	tc.headerm[tc.genesis.Hash()] = tc.genesis.Header()
+	tc.tdm[tc.genesis.Hash()] = tc.genesis.Difficulty()
+	tc.blockm[tc.genesis.Hash()] = tc.genesis
+	tc.generate(length-1, 0, genesis, false)
+	return tc
+}
+
+// makeFork creates a fork on top of the test chain.
+func (tc *testChain) makeFork(length int, heavy bool, seed byte) *testChain {
+	fork := tc.copy(tc.len() + length)
+	fork.generate(length, seed, tc.headBlock(), heavy)
+	return fork
+}
+
+// shorten creates a copy of the chain with the given length. It panics if the
+// length is longer than the number of available blocks.
+func (tc *testChain) shorten(length int) *testChain {
+	if length > tc.len() {
+		panic(fmt.Errorf("can't shorten test chain to %d blocks, it's only %d blocks long", length, tc.len()))
+	}
+	return tc.copy(length)
+}
+
+func (tc *testChain) copy(newlen int) *testChain {
+	cpy := &testChain{
+		genesis:  tc.genesis,
+		headerm:  make(map[common.Hash]*types.Header, newlen),
+		blockm:   make(map[common.Hash]*types.Block, newlen),
+		receiptm: make(map[common.Hash][]*types.Receipt, newlen),
+		tdm:      make(map[common.Hash]*big.Int, newlen),
+	}
+	for i := 0; i < len(tc.chain) && i < newlen; i++ {
+		hash := tc.chain[i]
+		cpy.chain = append(cpy.chain, tc.chain[i])
+		cpy.tdm[hash] = tc.tdm[hash]
+		cpy.blockm[hash] = tc.blockm[hash]
+		cpy.headerm[hash] = tc.headerm[hash]
+		cpy.receiptm[hash] = tc.receiptm[hash]
+	}
+	return cpy
+}
+
+// generate creates a chain of n blocks starting at and including parent.
+// the returned hash chain is ordered head->parent. In addition, every 22th block
+// contains a transaction and every 5th an uncle to allow testing correct block
+// reassembly.
+func (tc *testChain) generate(n int, seed byte, parent *types.Block, heavy bool) {
+	// start := time.Now()
+	// defer func() { fmt.Printf("test chain generated in %v\n", time.Since(start)) }()
+
+	blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
+		block.SetCoinbase(common.Address{seed})
+		// If a heavy chain is requested, delay blocks to raise difficulty
+		if heavy {
+			block.OffsetTime(-1)
+		}
+		// Include transactions to the miner to make blocks more interesting.
+		if parent == tc.genesis && i%22 == 0 {
+			signer := types.MakeSigner(params.TestChainConfig, block.Number())
+			tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
+			if err != nil {
+				panic(err)
+			}
+			block.AddTx(tx)
+		}
+		// if the block number is a multiple of 5, add a bonus uncle to the block
+		if i > 0 && i%5 == 0 {
+			block.AddUncle(&types.Header{
+				ParentHash: block.PrevBlock(i - 1).Hash(),
+				Number:     big.NewInt(block.Number().Int64() - 1),
+			})
+		}
+	})
+
+	// Convert the block-chain into a hash-chain and header/block maps
+	td := new(big.Int).Set(tc.td(parent.Hash()))
+	for i, b := range blocks {
+		td := td.Add(td, b.Difficulty())
+		hash := b.Hash()
+		tc.chain = append(tc.chain, hash)
+		tc.blockm[hash] = b
+		tc.headerm[hash] = b.Header()
+		tc.receiptm[hash] = receipts[i]
+		tc.tdm[hash] = new(big.Int).Set(td)
+	}
+}
+
+// len returns the total number of blocks in the chain.
+func (tc *testChain) len() int {
+	return len(tc.chain)
+}
+
+// headBlock returns the head of the chain.
+func (tc *testChain) headBlock() *types.Block {
+	return tc.blockm[tc.chain[len(tc.chain)-1]]
+}
+
+// td returns the total difficulty of the given block.
+func (tc *testChain) td(hash common.Hash) *big.Int {
+	return tc.tdm[hash]
+}
+
+// headersByHash returns headers in ascending order from the given hash.
+func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.Header {
+	num, _ := tc.hashToNumber(origin)
+	return tc.headersByNumber(num, amount, skip)
+}
+
+// headersByNumber returns headers in ascending order from the given number.
+func (tc *testChain) headersByNumber(origin uint64, amount int, skip int) []*types.Header {
+	result := make([]*types.Header, 0, amount)
+	for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 {
+		if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
+			result = append(result, header)
+		}
+	}
+	return result
+}
+
+// receipts returns the receipts of the given block hashes.
+func (tc *testChain) receipts(hashes []common.Hash) [][]*types.Receipt {
+	results := make([][]*types.Receipt, 0, len(hashes))
+	for _, hash := range hashes {
+		if receipt, ok := tc.receiptm[hash]; ok {
+			results = append(results, receipt)
+		}
+	}
+	return results
+}
+
+// bodies returns the block bodies of the given block hashes.
+func (tc *testChain) bodies(hashes []common.Hash) ([][]*types.Transaction, [][]*types.Header) {
+	transactions := make([][]*types.Transaction, 0, len(hashes))
+	uncles := make([][]*types.Header, 0, len(hashes))
+	for _, hash := range hashes {
+		if block, ok := tc.blockm[hash]; ok {
+			transactions = append(transactions, block.Transactions())
+			uncles = append(uncles, block.Uncles())
+		}
+	}
+	return transactions, uncles
+}
+
+func (tc *testChain) hashToNumber(target common.Hash) (uint64, bool) {
+	for num, hash := range tc.chain {
+		if hash == target {
+			return uint64(num), true
+		}
+	}
+	return 0, false
+}
diff --git a/dex/downloader/types.go b/dex/downloader/types.go
new file mode 100644
index 000000000..d320b7590
--- /dev/null
+++ b/dex/downloader/types.go
@@ -0,0 +1,79 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"fmt"
+
+	"github.com/dexon-foundation/dexon/core/types"
+)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
+
+// dataPack is a data message returned by a peer for some query.
+type dataPack interface {
+	PeerId() string
+	Items() int
+	Stats() string
+}
+
+// headerPack is a batch of block headers returned by a peer.
+type headerPack struct {
+	peerID  string
+	headers []*types.Header
+}
+
+func (p *headerPack) PeerId() string { return p.peerID }
+func (p *headerPack) Items() int     { return len(p.headers) }
+func (p *headerPack) Stats() string  { return fmt.Sprintf("%d", len(p.headers)) }
+
+// bodyPack is a batch of block bodies returned by a peer.
+type bodyPack struct {
+	peerID       string
+	transactions [][]*types.Transaction
+	uncles       [][]*types.Header
+}
+
+func (p *bodyPack) PeerId() string { return p.peerID }
+func (p *bodyPack) Items() int {
+	if len(p.transactions) <= len(p.uncles) {
+		return len(p.transactions)
+	}
+	return len(p.uncles)
+}
+func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
+
+// receiptPack is a batch of receipts returned by a peer.
+type receiptPack struct {
+	peerID   string
+	receipts [][]*types.Receipt
+}
+
+func (p *receiptPack) PeerId() string { return p.peerID }
+func (p *receiptPack) Items() int     { return len(p.receipts) }
+func (p *receiptPack) Stats() string  { return fmt.Sprintf("%d", len(p.receipts)) }
+
+// statePack is a batch of states returned by a peer.
+type statePack struct {
+	peerID string
+	states [][]byte
+}
+
+func (p *statePack) PeerId() string { return p.peerID }
+func (p *statePack) Items() int     { return len(p.states) }
+func (p *statePack) Stats() string  { return fmt.Sprintf("%d", len(p.states)) }
diff --git a/dex/fetcher/fetcher.go b/dex/fetcher/fetcher.go
new file mode 100644
index 000000000..f6807b5a5
--- /dev/null
+++ b/dex/fetcher/fetcher.go
@@ -0,0 +1,736 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package fetcher contains the block announcement based synchronisation.
+package fetcher
+
+import (
+	"errors"
+	"math/rand"
+	"time"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/common/prque"
+	"github.com/dexon-foundation/dexon/consensus"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/log"
+)
+
+const (
+	arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+	gatherSlack   = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
+	fetchTimeout  = 5 * time.Second        // Maximum allotted time to return an explicitly requested block
+	maxUncleDist  = 7                      // Maximum allowed backward distance from the chain head
+	maxQueueDist  = 32                     // Maximum allowed distance from the chain head to queue
+	hashLimit     = 256                    // Maximum number of unique blocks a peer may have announced
+	blockLimit    = 64                     // Maximum number of unique blocks a peer may have delivered
+)
+
+var (
+	errTerminated = errors.New("terminated")
+)
+
+// blockRetrievalFn is a callback type for retrieving a block from the local chain.
+type blockRetrievalFn func(common.Hash) *types.Block
+
+// headerRequesterFn is a callback type for sending a header retrieval request.
+type headerRequesterFn func(common.Hash) error
+
+// bodyRequesterFn is a callback type for sending a body retrieval request.
+type bodyRequesterFn func([]common.Hash) error
+
+// headerVerifierFn is a callback type to verify a block's header for fast propagation.
+type headerVerifierFn func(header *types.Header) error
+
+// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
+type blockBroadcasterFn func(block *types.Block, propagate bool)
+
+// chainHeightFn is a callback type to retrieve the current chain height.
+type chainHeightFn func() uint64
+
+// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type chainInsertFn func(types.Blocks) (int, error)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
+
+// announce is the hash notification of the availability of a new block in the
+// network.
+type announce struct {
+	hash   common.Hash   // Hash of the block being announced
+	number uint64        // Number of the block being announced (0 = unknown | old protocol)
+	header *types.Header // Header of the block partially reassembled (new protocol)
+	time   time.Time     // Timestamp of the announcement
+
+	origin string // Identifier of the peer originating the notification
+
+	fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
+	fetchBodies bodyRequesterFn   // Fetcher function to retrieve the body of an announced block
+}
+
+// headerFilterTask represents a batch of headers needing fetcher filtering.
+type headerFilterTask struct {
+	peer    string          // The source peer of block headers
+	headers []*types.Header // Collection of headers to filter
+	time    time.Time       // Arrival time of the headers
+}
+
+// bodyFilterTask represents a batch of block bodies (transactions and uncles)
+// needing fetcher filtering.
+type bodyFilterTask struct {
+	peer         string                 // The source peer of block bodies
+	transactions [][]*types.Transaction // Collection of transactions per block bodies
+	uncles       [][]*types.Header      // Collection of uncles per block bodies
+	time         time.Time              // Arrival time of the blocks' contents
+}
+
+// inject represents a schedules import operation.
+type inject struct {
+	origin string
+	block  *types.Block
+}
+
+// Fetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type Fetcher struct {
+	// Various event channels
+	notify chan *announce
+	inject chan *inject
+
+	blockFilter  chan chan []*types.Block
+	headerFilter chan chan *headerFilterTask
+	bodyFilter   chan chan *bodyFilterTask
+
+	done chan common.Hash
+	quit chan struct{}
+
+	// Announce states
+	announces  map[string]int              // Per peer announce counts to prevent memory exhaustion
+	announced  map[common.Hash][]*announce // Announced blocks, scheduled for fetching
+	fetching   map[common.Hash]*announce   // Announced blocks, currently fetching
+	fetched    map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
+	completing map[common.Hash]*announce   // Blocks with headers, currently body-completing
+
+	// Block cache
+	queue  *prque.Prque            // Queue containing the import operations (block number sorted)
+	queues map[string]int          // Per peer block counts to prevent memory exhaustion
+	queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
+
+	// Callbacks
+	getBlock       blockRetrievalFn   // Retrieves a block from the local chain
+	verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work
+	broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
+	chainHeight    chainHeightFn      // Retrieves the current chain's height
+	insertChain    chainInsertFn      // Injects a batch of blocks into the chain
+	dropPeer       peerDropFn         // Drops a peer for misbehaving
+
+	// Testing hooks
+	announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
+	queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
+	fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
+	completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
+	importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
+}
+
+// New creates a block fetcher to retrieve blocks based on hash announcements.
+func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
+	return &Fetcher{
+		notify:         make(chan *announce),
+		inject:         make(chan *inject),
+		blockFilter:    make(chan chan []*types.Block),
+		headerFilter:   make(chan chan *headerFilterTask),
+		bodyFilter:     make(chan chan *bodyFilterTask),
+		done:           make(chan common.Hash),
+		quit:           make(chan struct{}),
+		announces:      make(map[string]int),
+		announced:      make(map[common.Hash][]*announce),
+		fetching:       make(map[common.Hash]*announce),
+		fetched:        make(map[common.Hash][]*announce),
+		completing:     make(map[common.Hash]*announce),
+		queue:          prque.New(nil),
+		queues:         make(map[string]int),
+		queued:         make(map[common.Hash]*inject),
+		getBlock:       getBlock,
+		verifyHeader:   verifyHeader,
+		broadcastBlock: broadcastBlock,
+		chainHeight:    chainHeight,
+		insertChain:    insertChain,
+		dropPeer:       dropPeer,
+	}
+}
+
+// Start boots up the announcement based synchroniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *Fetcher) Start() {
+	go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *Fetcher) Stop() {
+	close(f.quit)
+}
+
+// Notify announces the fetcher of the potential availability of a new block in
+// the network.
+func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
+	headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
+	block := &announce{
+		hash:        hash,
+		number:      number,
+		time:        time,
+		origin:      peer,
+		fetchHeader: headerFetcher,
+		fetchBodies: bodyFetcher,
+	}
+	select {
+	case f.notify <- block:
+		return nil
+	case <-f.quit:
+		return errTerminated
+	}
+}
+
+// Enqueue tries to fill gaps the fetcher's future import queue.
+func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
+	op := &inject{
+		origin: peer,
+		block:  block,
+	}
+	select {
+	case f.inject <- op:
+		return nil
+	case <-f.quit:
+		return errTerminated
+	}
+}
+
+// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
+// returning those that should be handled differently.
+func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
+	log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
+
+	// Send the filter channel to the fetcher
+	filter := make(chan *headerFilterTask)
+
+	select {
+	case f.headerFilter <- filter:
+	case <-f.quit:
+		return nil
+	}
+	// Request the filtering of the header list
+	select {
+	case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
+	case <-f.quit:
+		return nil
+	}
+	// Retrieve the headers remaining after filtering
+	select {
+	case task := <-filter:
+		return task.headers
+	case <-f.quit:
+		return nil
+	}
+}
+
+// FilterBodies extracts all the block bodies that were explicitly requested by
+// the fetcher, returning those that should be handled differently.
+func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
+	log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
+
+	// Send the filter channel to the fetcher
+	filter := make(chan *bodyFilterTask)
+
+	select {
+	case f.bodyFilter <- filter:
+	case <-f.quit:
+		return nil, nil
+	}
+	// Request the filtering of the body list
+	select {
+	case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
+	case <-f.quit:
+		return nil, nil
+	}
+	// Retrieve the bodies remaining after filtering
+	select {
+	case task := <-filter:
+		return task.transactions, task.uncles
+	case <-f.quit:
+		return nil, nil
+	}
+}
+
+// Loop is the main fetcher loop, checking and processing various notification
+// events.
+func (f *Fetcher) loop() {
+	// Iterate the block fetching until a quit is requested
+	fetchTimer := time.NewTimer(0)
+	completeTimer := time.NewTimer(0)
+
+	for {
+		// Clean up any expired block fetches
+		for hash, announce := range f.fetching {
+			if time.Since(announce.time) > fetchTimeout {
+				f.forgetHash(hash)
+			}
+		}
+		// Import any queued blocks that could potentially fit
+		height := f.chainHeight()
+		for !f.queue.Empty() {
+			op := f.queue.PopItem().(*inject)
+			hash := op.block.Hash()
+			if f.queueChangeHook != nil {
+				f.queueChangeHook(hash, false)
+			}
+			// If too high up the chain or phase, continue later
+			number := op.block.NumberU64()
+			if number > height+1 {
+				f.queue.Push(op, -int64(number))
+				if f.queueChangeHook != nil {
+					f.queueChangeHook(hash, true)
+				}
+				break
+			}
+			// Otherwise if fresh and still unknown, try and import
+			if number+maxUncleDist < height || f.getBlock(hash) != nil {
+				f.forgetBlock(hash)
+				continue
+			}
+			f.insert(op.origin, op.block)
+		}
+		// Wait for an outside event to occur
+		select {
+		case <-f.quit:
+			// Fetcher terminating, abort all operations
+			return
+
+		case notification := <-f.notify:
+			// A block was announced, make sure the peer isn't DOSing us
+			propAnnounceInMeter.Mark(1)
+
+			count := f.announces[notification.origin] + 1
+			if count > hashLimit {
+				log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
+				propAnnounceDOSMeter.Mark(1)
+				break
+			}
+			// If we have a valid block number, check that it's potentially useful
+			if notification.number > 0 {
+				if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
+					log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
+					propAnnounceDropMeter.Mark(1)
+					break
+				}
+			}
+			// All is well, schedule the announce if block's not yet downloading
+			if _, ok := f.fetching[notification.hash]; ok {
+				break
+			}
+			if _, ok := f.completing[notification.hash]; ok {
+				break
+			}
+			f.announces[notification.origin] = count
+			f.announced[notification.hash] = append(f.announced[notification.hash], notification)
+			if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
+				f.announceChangeHook(notification.hash, true)
+			}
+			if len(f.announced) == 1 {
+				f.rescheduleFetch(fetchTimer)
+			}
+
+		case op := <-f.inject:
+			// A direct block insertion was requested, try and fill any pending gaps
+			propBroadcastInMeter.Mark(1)
+			f.enqueue(op.origin, op.block)
+
+		case hash := <-f.done:
+			// A pending import finished, remove all traces of the notification
+			f.forgetHash(hash)
+			f.forgetBlock(hash)
+
+		case <-fetchTimer.C:
+			// At least one block's timer ran out, check for needing retrieval
+			request := make(map[string][]common.Hash)
+
+			for hash, announces := range f.announced {
+				if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
+					// Pick a random peer to retrieve from, reset all others
+					announce := announces[rand.Intn(len(announces))]
+					f.forgetHash(hash)
+
+					// If the block still didn't arrive, queue for fetching
+					if f.getBlock(hash) == nil {
+						request[announce.origin] = append(request[announce.origin], hash)
+						f.fetching[hash] = announce
+					}
+				}
+			}
+			// Send out all block header requests
+			for peer, hashes := range request {
+				log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
+
+				// Create a closure of the fetch and schedule in on a new thread
+				fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
+				go func() {
+					if f.fetchingHook != nil {
+						f.fetchingHook(hashes)
+					}
+					for _, hash := range hashes {
+						headerFetchMeter.Mark(1)
+						fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
+					}
+				}()
+			}
+			// Schedule the next fetch if blocks are still pending
+			f.rescheduleFetch(fetchTimer)
+
+		case <-completeTimer.C:
+			// At least one header's timer ran out, retrieve everything
+			request := make(map[string][]common.Hash)
+
+			for hash, announces := range f.fetched {
+				// Pick a random peer to retrieve from, reset all others
+				announce := announces[rand.Intn(len(announces))]
+				f.forgetHash(hash)
+
+				// If the block still didn't arrive, queue for completion
+				if f.getBlock(hash) == nil {
+					request[announce.origin] = append(request[announce.origin], hash)
+					f.completing[hash] = announce
+				}
+			}
+			// Send out all block body requests
+			for peer, hashes := range request {
+				log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
+
+				// Create a closure of the fetch and schedule in on a new thread
+				if f.completingHook != nil {
+					f.completingHook(hashes)
+				}
+				bodyFetchMeter.Mark(int64(len(hashes)))
+				go f.completing[hashes[0]].fetchBodies(hashes)
+			}
+			// Schedule the next fetch if blocks are still pending
+			f.rescheduleComplete(completeTimer)
+
+		case filter := <-f.headerFilter:
+			// Headers arrived from a remote peer. Extract those that were explicitly
+			// requested by the fetcher, and return everything else so it's delivered
+			// to other parts of the system.
+			var task *headerFilterTask
+			select {
+			case task = <-filter:
+			case <-f.quit:
+				return
+			}
+			headerFilterInMeter.Mark(int64(len(task.headers)))
+
+			// Split the batch of headers into unknown ones (to return to the caller),
+			// known incomplete ones (requiring body retrievals) and completed blocks.
+			unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
+			for _, header := range task.headers {
+				hash := header.Hash()
+
+				// Filter fetcher-requested headers from other synchronisation algorithms
+				if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
+					// If the delivered header does not match the promised number, drop the announcer
+					if header.Number.Uint64() != announce.number {
+						log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
+						f.dropPeer(announce.origin)
+						f.forgetHash(hash)
+						continue
+					}
+					// Only keep if not imported by other means
+					if f.getBlock(hash) == nil {
+						announce.header = header
+						announce.time = task.time
+
+						// If the block is empty (header only), short circuit into the final import queue
+						if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
+							log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
+
+							block := types.NewBlockWithHeader(header)
+							block.ReceivedAt = task.time
+
+							complete = append(complete, block)
+							f.completing[hash] = announce
+							continue
+						}
+						// Otherwise add to the list of blocks needing completion
+						incomplete = append(incomplete, announce)
+					} else {
+						log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
+						f.forgetHash(hash)
+					}
+				} else {
+					// Fetcher doesn't know about it, add to the return list
+					unknown = append(unknown, header)
+				}
+			}
+			headerFilterOutMeter.Mark(int64(len(unknown)))
+			select {
+			case filter <- &headerFilterTask{headers: unknown, time: task.time}:
+			case <-f.quit:
+				return
+			}
+			// Schedule the retrieved headers for body completion
+			for _, announce := range incomplete {
+				hash := announce.header.Hash()
+				if _, ok := f.completing[hash]; ok {
+					continue
+				}
+				f.fetched[hash] = append(f.fetched[hash], announce)
+				if len(f.fetched) == 1 {
+					f.rescheduleComplete(completeTimer)
+				}
+			}
+			// Schedule the header-only blocks for import
+			for _, block := range complete {
+				if announce := f.completing[block.Hash()]; announce != nil {
+					f.enqueue(announce.origin, block)
+				}
+			}
+
+		case filter := <-f.bodyFilter:
+			// Block bodies arrived, extract any explicitly requested blocks, return the rest
+			var task *bodyFilterTask
+			select {
+			case task = <-filter:
+			case <-f.quit:
+				return
+			}
+			bodyFilterInMeter.Mark(int64(len(task.transactions)))
+
+			blocks := []*types.Block{}
+			for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
+				// Match up a body to any possible completion request
+				matched := false
+
+				for hash, announce := range f.completing {
+					if f.queued[hash] == nil {
+						txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
+						uncleHash := types.CalcUncleHash(task.uncles[i])
+
+						if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
+							// Mark the body matched, reassemble if still unknown
+							matched = true
+
+							if f.getBlock(hash) == nil {
+								block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
+								block.ReceivedAt = task.time
+
+								blocks = append(blocks, block)
+							} else {
+								f.forgetHash(hash)
+							}
+						}
+					}
+				}
+				if matched {
+					task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
+					task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
+					i--
+					continue
+				}
+			}
+
+			bodyFilterOutMeter.Mark(int64(len(task.transactions)))
+			select {
+			case filter <- task:
+			case <-f.quit:
+				return
+			}
+			// Schedule the retrieved blocks for ordered import
+			for _, block := range blocks {
+				if announce := f.completing[block.Hash()]; announce != nil {
+					f.enqueue(announce.origin, block)
+				}
+			}
+		}
+	}
+}
+
+// rescheduleFetch resets the specified fetch timer to the next announce timeout.
+func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
+	// Short circuit if no blocks are announced
+	if len(f.announced) == 0 {
+		return
+	}
+	// Otherwise find the earliest expiring announcement
+	earliest := time.Now()
+	for _, announces := range f.announced {
+		if earliest.After(announces[0].time) {
+			earliest = announces[0].time
+		}
+	}
+	fetch.Reset(arriveTimeout - time.Since(earliest))
+}
+
+// rescheduleComplete resets the specified completion timer to the next fetch timeout.
+func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
+	// Short circuit if no headers are fetched
+	if len(f.fetched) == 0 {
+		return
+	}
+	// Otherwise find the earliest expiring announcement
+	earliest := time.Now()
+	for _, announces := range f.fetched {
+		if earliest.After(announces[0].time) {
+			earliest = announces[0].time
+		}
+	}
+	complete.Reset(gatherSlack - time.Since(earliest))
+}
+
+// enqueue schedules a new future import operation, if the block to be imported
+// has not yet been seen.
+func (f *Fetcher) enqueue(peer string, block *types.Block) {
+	hash := block.Hash()
+
+	// Ensure the peer isn't DOSing us
+	count := f.queues[peer] + 1
+	if count > blockLimit {
+		log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
+		propBroadcastDOSMeter.Mark(1)
+		f.forgetHash(hash)
+		return
+	}
+	// Discard any past or too distant blocks
+	if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
+		log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
+		propBroadcastDropMeter.Mark(1)
+		f.forgetHash(hash)
+		return
+	}
+	// Schedule the block for future importing
+	if _, ok := f.queued[hash]; !ok {
+		op := &inject{
+			origin: peer,
+			block:  block,
+		}
+		f.queues[peer] = count
+		f.queued[hash] = op
+		f.queue.Push(op, -int64(block.NumberU64()))
+		if f.queueChangeHook != nil {
+			f.queueChangeHook(op.block.Hash(), true)
+		}
+		log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
+	}
+}
+
+// insert spawns a new goroutine to run a block insertion into the chain. If the
+// block's number is at the same height as the current import phase, it updates
+// the phase states accordingly.
+func (f *Fetcher) insert(peer string, block *types.Block) {
+	hash := block.Hash()
+
+	// Run the import on a new thread
+	log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
+	go func() {
+		defer func() { f.done <- hash }()
+
+		// If the parent's unknown, abort insertion
+		parent := f.getBlock(block.ParentHash())
+		if parent == nil {
+			log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
+			return
+		}
+		// Quickly validate the header and propagate the block if it passes
+		switch err := f.verifyHeader(block.Header()); err {
+		case nil:
+			// All ok, quickly propagate to our peers
+			propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
+			go f.broadcastBlock(block, true)
+
+		case consensus.ErrFutureBlock:
+			// Weird future block, don't fail, but neither propagate
+
+		default:
+			// Something went very wrong, drop the peer
+			log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
+			f.dropPeer(peer)
+			return
+		}
+		// Run the actual import and log any issues
+		if _, err := f.insertChain(types.Blocks{block}); err != nil {
+			log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
+			return
+		}
+		// If import succeeded, broadcast the block
+		propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
+		go f.broadcastBlock(block, false)
+
+		// Invoke the testing hook if needed
+		if f.importedHook != nil {
+			f.importedHook(block)
+		}
+	}()
+}
+
+// forgetHash removes all traces of a block announcement from the fetcher's
+// internal state.
+func (f *Fetcher) forgetHash(hash common.Hash) {
+	// Remove all pending announces and decrement DOS counters
+	for _, announce := range f.announced[hash] {
+		f.announces[announce.origin]--
+		if f.announces[announce.origin] == 0 {
+			delete(f.announces, announce.origin)
+		}
+	}
+	delete(f.announced, hash)
+	if f.announceChangeHook != nil {
+		f.announceChangeHook(hash, false)
+	}
+	// Remove any pending fetches and decrement the DOS counters
+	if announce := f.fetching[hash]; announce != nil {
+		f.announces[announce.origin]--
+		if f.announces[announce.origin] == 0 {
+			delete(f.announces, announce.origin)
+		}
+		delete(f.fetching, hash)
+	}
+
+	// Remove any pending completion requests and decrement the DOS counters
+	for _, announce := range f.fetched[hash] {
+		f.announces[announce.origin]--
+		if f.announces[announce.origin] == 0 {
+			delete(f.announces, announce.origin)
+		}
+	}
+	delete(f.fetched, hash)
+
+	// Remove any pending completions and decrement the DOS counters
+	if announce := f.completing[hash]; announce != nil {
+		f.announces[announce.origin]--
+		if f.announces[announce.origin] == 0 {
+			delete(f.announces, announce.origin)
+		}
+		delete(f.completing, hash)
+	}
+}
+
+// forgetBlock removes all traces of a queued block from the fetcher's internal
+// state.
+func (f *Fetcher) forgetBlock(hash common.Hash) {
+	if insert := f.queued[hash]; insert != nil {
+		f.queues[insert.origin]--
+		if f.queues[insert.origin] == 0 {
+			delete(f.queues, insert.origin)
+		}
+		delete(f.queued, hash)
+	}
+}
diff --git a/dex/fetcher/fetcher_test.go b/dex/fetcher/fetcher_test.go
new file mode 100644
index 000000000..24611a8a0
--- /dev/null
+++ b/dex/fetcher/fetcher_test.go
@@ -0,0 +1,790 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package fetcher
+
+import (
+	"errors"
+	"math/big"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/dexon-foundation/dexon/common"
+	"github.com/dexon-foundation/dexon/consensus/ethash"
+	"github.com/dexon-foundation/dexon/core"
+	"github.com/dexon-foundation/dexon/core/types"
+	"github.com/dexon-foundation/dexon/crypto"
+	"github.com/dexon-foundation/dexon/ethdb"
+	"github.com/dexon-foundation/dexon/params"
+)
+
+var (
+	testdb       = ethdb.NewMemDatabase()
+	testKey, _   = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+	testAddress  = crypto.PubkeyToAddress(testKey.PublicKey)
+	genesis      = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
+	unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil)
+)
+
+// makeChain creates a chain of n blocks starting at and including parent.
+// the returned hash chain is ordered head->parent. In addition, every 3rd block
+// contains a transaction and every 5th an uncle to allow testing correct block
+// reassembly.
+func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
+	blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
+		block.SetCoinbase(common.Address{seed})
+
+		// If the block number is multiple of 3, send a bonus transaction to the miner
+		if parent == genesis && i%3 == 0 {
+			signer := types.MakeSigner(params.TestChainConfig, block.Number())
+			tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
+			if err != nil {
+				panic(err)
+			}
+			block.AddTx(tx)
+		}
+		// If the block number is a multiple of 5, add a bonus uncle to the block
+		if i%5 == 0 {
+			block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
+		}
+	})
+	hashes := make([]common.Hash, n+1)
+	hashes[len(hashes)-1] = parent.Hash()
+	blockm := make(map[common.Hash]*types.Block, n+1)
+	blockm[parent.Hash()] = parent
+	for i, b := range blocks {
+		hashes[len(hashes)-i-2] = b.Hash()
+		blockm[b.Hash()] = b
+	}
+	return hashes, blockm
+}
+
+// fetcherTester is a test simulator for mocking out local block chain.
+type fetcherTester struct {
+	fetcher *Fetcher
+
+	hashes []common.Hash                // Hash chain belonging to the tester
+	blocks map[common.Hash]*types.Block // Blocks belonging to the tester
+	drops  map[string]bool              // Map of peers dropped by the fetcher
+
+	lock sync.RWMutex
+}
+
+// newTester creates a new fetcher test mocker.
+func newTester() *fetcherTester {
+	tester := &fetcherTester{
+		hashes: []common.Hash{genesis.Hash()},
+		blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
+		drops:  make(map[string]bool),
+	}
+	tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
+	tester.fetcher.Start()
+
+	return tester
+}
+
+// getBlock retrieves a block from the tester's block chain.
+func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
+	f.lock.RLock()
+	defer f.lock.RUnlock()
+
+	return f.blocks[hash]
+}
+
+// verifyHeader is a nop placeholder for the block header verification.
+func (f *fetcherTester) verifyHeader(header *types.Header) error {
+	return nil
+}
+
+// broadcastBlock is a nop placeholder for the block broadcasting.
+func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
+}
+
+// chainHeight retrieves the current height (block number) of the chain.
+func (f *fetcherTester) chainHeight() uint64 {
+	f.lock.RLock()
+	defer f.lock.RUnlock()
+
+	return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
+}
+
+// insertChain injects a new blocks into the simulated chain.
+func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	for i, block := range blocks {
+		// Make sure the parent in known
+		if _, ok := f.blocks[block.ParentHash()]; !ok {
+			return i, errors.New("unknown parent")
+		}
+		// Discard any new blocks if the same height already exists
+		if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
+			return i, nil
+		}
+		// Otherwise build our current chain
+		f.hashes = append(f.hashes, block.Hash())
+		f.blocks[block.Hash()] = block
+	}
+	return 0, nil
+}
+
+// dropPeer is an emulator for the peer removal, simply accumulating the various
+// peers dropped by the fetcher.
+func (f *fetcherTester) dropPeer(peer string) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	f.drops[peer] = true
+}
+
+// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
+func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
+	closure := make(map[common.Hash]*types.Block)
+	for hash, block := range blocks {
+		closure[hash] = block
+	}
+	// Create a function that return a header from the closure
+	return func(hash common.Hash) error {
+		// Gather the blocks to return
+		headers := make([]*types.Header, 0, 1)
+		if block, ok := closure[hash]; ok {
+			headers = append(headers, block.Header())
+		}
+		// Return on a new thread
+		go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
+
+		return nil
+	}
+}
+
+// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
+func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
+	closure := make(map[common.Hash]*types.Block)
+	for hash, block := range blocks {
+		closure[hash] = block
+	}
+	// Create a function that returns blocks from the closure
+	return func(hashes []common.Hash) error {
+		// Gather the block bodies to return
+		transactions := make([][]*types.Transaction, 0, len(hashes))
+		uncles := make([][]*types.Header, 0, len(hashes))
+
+		for _, hash := range hashes {
+			if block, ok := closure[hash]; ok {
+				transactions = append(transactions, block.Transactions())
+				uncles = append(uncles, block.Uncles())
+			}
+		}
+		// Return on a new thread
+		go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
+
+		return nil
+	}
+}
+
+// verifyFetchingEvent verifies that one single event arrive on a fetching channel.
+func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
+	if arrive {
+		select {
+		case <-fetching:
+		case <-time.After(time.Second):
+			t.Fatalf("fetching timeout")
+		}
+	} else {
+		select {
+		case <-fetching:
+			t.Fatalf("fetching invoked")
+		case <-time.After(10 * time.Millisecond):
+		}
+	}
+}
+
+// verifyCompletingEvent verifies that one single event arrive on an completing channel.
+func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
+	if arrive {
+		select {
+		case <-completing:
+		case <-time.After(time.Second):
+			t.Fatalf("completing timeout")
+		}
+	} else {
+		select {
+		case <-completing:
+			t.Fatalf("completing invoked")
+		case <-time.After(10 * time.Millisecond):
+		}
+	}
+}
+
+// verifyImportEvent verifies that one single event arrive on an import channel.
+func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
+	if arrive {
+		select {
+		case <-imported:
+		case <-time.After(time.Second):
+			t.Fatalf("import timeout")
+		}
+	} else {
+		select {
+		case <-imported:
+			t.Fatalf("import invoked")
+		case <-time.After(10 * time.Millisecond):
+		}
+	}
+}
+
+// verifyImportCount verifies that exactly count number of events arrive on an
+// import hook channel.
+func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
+	for i := 0; i < count; i++ {
+		select {
+		case <-imported:
+		case <-time.After(time.Second):
+			t.Fatalf("block %d: import timeout", i+1)
+		}
+	}
+	verifyImportDone(t, imported)
+}
+
+// verifyImportDone verifies that no more events are arriving on an import channel.
+func verifyImportDone(t *testing.T, imported chan *types.Block) {
+	select {
+	case <-imported:
+		t.Fatalf("extra block imported")
+	case <-time.After(50 * time.Millisecond):
+	}
+}
+
+// Tests that a fetcher accepts block announcements and initiates retrievals for
+// them, successfully importing into the local chain.
+func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
+func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
+func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
+
+func testSequentialAnnouncements(t *testing.T, protocol int) {
+	// Create a chain of blocks to import
+	targetBlocks := 4 * hashLimit
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	// Iteratively announce blocks until all are imported
+	imported := make(chan *types.Block)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	for i := len(hashes) - 2; i >= 0; i-- {
+		tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+		verifyImportEvent(t, imported, true)
+	}
+	verifyImportDone(t, imported)
+}
+
+// Tests that if blocks are announced by multiple peers (or even the same buggy
+// peer), they will only get downloaded at most once.
+func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
+func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
+func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
+
+func testConcurrentAnnouncements(t *testing.T, protocol int) {
+	// Create a chain of blocks to import
+	targetBlocks := 4 * hashLimit
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+
+	// Assemble a tester with a built in counter for the requests
+	tester := newTester()
+	firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
+	firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
+	secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
+	secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
+
+	counter := uint32(0)
+	firstHeaderWrapper := func(hash common.Hash) error {
+		atomic.AddUint32(&counter, 1)
+		return firstHeaderFetcher(hash)
+	}
+	secondHeaderWrapper := func(hash common.Hash) error {
+		atomic.AddUint32(&counter, 1)
+		return secondHeaderFetcher(hash)
+	}
+	// Iteratively announce blocks until all are imported
+	imported := make(chan *types.Block)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	for i := len(hashes) - 2; i >= 0; i-- {
+		tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
+		tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
+		tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
+		verifyImportEvent(t, imported, true)
+	}
+	verifyImportDone(t, imported)
+
+	// Make sure no blocks were retrieved twice
+	if int(counter) != targetBlocks {
+		t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
+	}
+}
+
+// Tests that announcements arriving while a previous is being fetched still
+// results in a valid import.
+func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
+func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
+func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
+
+func testOverlappingAnnouncements(t *testing.T, protocol int) {
+	// Create a chain of blocks to import
+	targetBlocks := 4 * hashLimit
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	// Iteratively announce blocks, but overlap them continuously
+	overlap := 16
+	imported := make(chan *types.Block, len(hashes)-1)
+	for i := 0; i < overlap; i++ {
+		imported <- nil
+	}
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	for i := len(hashes) - 2; i >= 0; i-- {
+		tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+		select {
+		case <-imported:
+		case <-time.After(time.Second):
+			t.Fatalf("block %d: import timeout", len(hashes)-i)
+		}
+	}
+	// Wait for all the imports to complete and check count
+	verifyImportCount(t, imported, overlap)
+}
+
+// Tests that announces already being retrieved will not be duplicated.
+func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
+func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
+func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
+
+func testPendingDeduplication(t *testing.T, protocol int) {
+	// Create a hash and corresponding block
+	hashes, blocks := makeChain(1, 0, genesis)
+
+	// Assemble a tester with a built in counter and delayed fetcher
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
+
+	delay := 50 * time.Millisecond
+	counter := uint32(0)
+	headerWrapper := func(hash common.Hash) error {
+		atomic.AddUint32(&counter, 1)
+
+		// Simulate a long running fetch
+		go func() {
+			time.Sleep(delay)
+			headerFetcher(hash)
+		}()
+		return nil
+	}
+	// Announce the same block many times until it's fetched (wait for any pending ops)
+	for tester.getBlock(hashes[0]) == nil {
+		tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
+		time.Sleep(time.Millisecond)
+	}
+	time.Sleep(delay)
+
+	// Check that all blocks were imported and none fetched twice
+	if imported := len(tester.blocks); imported != 2 {
+		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2)
+	}
+	if int(counter) != 1 {
+		t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
+	}
+}
+
+// Tests that announcements retrieved in a random order are cached and eventually
+// imported when all the gaps are filled in.
+func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
+func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
+func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
+
+func testRandomArrivalImport(t *testing.T, protocol int) {
+	// Create a chain of blocks to import, and choose one to delay
+	targetBlocks := maxQueueDist
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+	skip := targetBlocks / 2
+
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	// Iteratively announce blocks, skipping one entry
+	imported := make(chan *types.Block, len(hashes)-1)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	for i := len(hashes) - 1; i >= 0; i-- {
+		if i != skip {
+			tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+			time.Sleep(time.Millisecond)
+		}
+	}
+	// Finally announce the skipped entry and check full import
+	tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+	verifyImportCount(t, imported, len(hashes)-1)
+}
+
+// Tests that direct block enqueues (due to block propagation vs. hash announce)
+// are correctly schedule, filling and import queue gaps.
+func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
+func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
+func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
+
+func testQueueGapFill(t *testing.T, protocol int) {
+	// Create a chain of blocks to import, and choose one to not announce at all
+	targetBlocks := maxQueueDist
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+	skip := targetBlocks / 2
+
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	// Iteratively announce blocks, skipping one entry
+	imported := make(chan *types.Block, len(hashes)-1)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	for i := len(hashes) - 1; i >= 0; i-- {
+		if i != skip {
+			tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+			time.Sleep(time.Millisecond)
+		}
+	}
+	// Fill the missing block directly as if propagated
+	tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
+	verifyImportCount(t, imported, len(hashes)-1)
+}
+
+// Tests that blocks arriving from various sources (multiple propagations, hash
+// announces, etc) do not get scheduled for import multiple times.
+func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
+func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
+func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
+
+func testImportDeduplication(t *testing.T, protocol int) {
+	// Create two blocks to import (one for duplication, the other for stalling)
+	hashes, blocks := makeChain(2, 0, genesis)
+
+	// Create the tester and wrap the importer with a counter
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	counter := uint32(0)
+	tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
+		atomic.AddUint32(&counter, uint32(len(blocks)))
+		return tester.insertChain(blocks)
+	}
+	// Instrument the fetching and imported events
+	fetching := make(chan []common.Hash)
+	imported := make(chan *types.Block, len(hashes)-1)
+	tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	// Announce the duplicating block, wait for retrieval, and also propagate directly
+	tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+	<-fetching
+
+	tester.fetcher.Enqueue("valid", blocks[hashes[0]])
+	tester.fetcher.Enqueue("valid", blocks[hashes[0]])
+	tester.fetcher.Enqueue("valid", blocks[hashes[0]])
+
+	// Fill the missing block directly as if propagated, and check import uniqueness
+	tester.fetcher.Enqueue("valid", blocks[hashes[1]])
+	verifyImportCount(t, imported, 2)
+
+	if counter != 2 {
+		t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
+	}
+}
+
+// Tests that blocks with numbers much lower or higher than out current head get
+// discarded to prevent wasting resources on useless blocks from faulty peers.
+func TestDistantPropagationDiscarding(t *testing.T) {
+	// Create a long chain to import and define the discard boundaries
+	hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
+	head := hashes[len(hashes)/2]
+
+	low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
+
+	// Create a tester and simulate a head block being the middle of the above chain
+	tester := newTester()
+
+	tester.lock.Lock()
+	tester.hashes = []common.Hash{head}
+	tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+	tester.lock.Unlock()
+
+	// Ensure that a block with a lower number than the threshold is discarded
+	tester.fetcher.Enqueue("lower", blocks[hashes[low]])
+	time.Sleep(10 * time.Millisecond)
+	if !tester.fetcher.queue.Empty() {
+		t.Fatalf("fetcher queued stale block")
+	}
+	// Ensure that a block with a higher number than the threshold is discarded
+	tester.fetcher.Enqueue("higher", blocks[hashes[high]])
+	time.Sleep(10 * time.Millisecond)
+	if !tester.fetcher.queue.Empty() {
+		t.Fatalf("fetcher queued future block")
+	}
+}
+
+// Tests that announcements with numbers much lower or higher than out current
+// head get discarded to prevent wasting resources on useless blocks from faulty
+// peers.
+func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
+func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
+func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
+
+func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
+	// Create a long chain to import and define the discard boundaries
+	hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
+	head := hashes[len(hashes)/2]
+
+	low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
+
+	// Create a tester and simulate a head block being the middle of the above chain
+	tester := newTester()
+
+	tester.lock.Lock()
+	tester.hashes = []common.Hash{head}
+	tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+	tester.lock.Unlock()
+
+	headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
+
+	fetching := make(chan struct{}, 2)
+	tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
+
+	// Ensure that a block with a lower number than the threshold is discarded
+	tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+	select {
+	case <-time.After(50 * time.Millisecond):
+	case <-fetching:
+		t.Fatalf("fetcher requested stale header")
+	}
+	// Ensure that a block with a higher number than the threshold is discarded
+	tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+	select {
+	case <-time.After(50 * time.Millisecond):
+	case <-fetching:
+		t.Fatalf("fetcher requested future header")
+	}
+}
+
+// Tests that peers announcing blocks with invalid numbers (i.e. not matching
+// the headers provided afterwards) get dropped as malicious.
+func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
+func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
+func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
+
+func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
+	// Create a single block to import and check numbers against
+	hashes, blocks := makeChain(1, 0, genesis)
+
+	tester := newTester()
+	badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
+	badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
+
+	imported := make(chan *types.Block)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	// Announce a block with a bad number, check for immediate drop
+	tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
+	verifyImportEvent(t, imported, false)
+
+	tester.lock.RLock()
+	dropped := tester.drops["bad"]
+	tester.lock.RUnlock()
+
+	if !dropped {
+		t.Fatalf("peer with invalid numbered announcement not dropped")
+	}
+
+	goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
+	goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
+	// Make sure a good announcement passes without a drop
+	tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
+	verifyImportEvent(t, imported, true)
+
+	tester.lock.RLock()
+	dropped = tester.drops["good"]
+	tester.lock.RUnlock()
+
+	if dropped {
+		t.Fatalf("peer with valid numbered announcement dropped")
+	}
+	verifyImportDone(t, imported)
+}
+
+// Tests that if a block is empty (i.e. header only), no body request should be
+// made, and instead the header should be assembled into a whole block in itself.
+func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
+func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
+func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
+
+func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
+	// Create a chain of blocks to import
+	hashes, blocks := makeChain(32, 0, genesis)
+
+	tester := newTester()
+	headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	// Add a monitoring hook for all internal events
+	fetching := make(chan []common.Hash)
+	tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
+
+	completing := make(chan []common.Hash)
+	tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
+
+	imported := make(chan *types.Block)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+	// Iteratively announce blocks until all are imported
+	for i := len(hashes) - 2; i >= 0; i-- {
+		tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+
+		// All announces should fetch the header
+		verifyFetchingEvent(t, fetching, true)
+
+		// Only blocks with data contents should request bodies
+		verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
+
+		// Irrelevant of the construct, import should succeed
+		verifyImportEvent(t, imported, true)
+	}
+	verifyImportDone(t, imported)
+}
+
+// Tests that a peer is unable to use unbounded memory with sending infinite
+// block announcements to a node, but that even in the face of such an attack,
+// the fetcher remains operational.
+func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
+func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
+func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
+
+func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
+	// Create a tester with instrumented import hooks
+	tester := newTester()
+
+	imported, announces := make(chan *types.Block), int32(0)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+	tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
+		if added {
+			atomic.AddInt32(&announces, 1)
+		} else {
+			atomic.AddInt32(&announces, -1)
+		}
+	}
+	// Create a valid chain and an infinite junk chain
+	targetBlocks := hashLimit + 2*maxQueueDist
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+	validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+	validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+	attack, _ := makeChain(targetBlocks, 0, unknownBlock)
+	attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
+	attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
+
+	// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
+	for i := 0; i < len(attack); i++ {
+		if i < maxQueueDist {
+			tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
+		}
+		tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
+	}
+	if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
+		t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
+	}
+	// Wait for fetches to complete
+	verifyImportCount(t, imported, maxQueueDist)
+
+	// Feed the remaining valid hashes to ensure DOS protection state remains clean
+	for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
+		tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
+		verifyImportEvent(t, imported, true)
+	}
+	verifyImportDone(t, imported)
+}
+
+// Tests that blocks sent to the fetcher (either through propagation or via hash
+// announces and retrievals) don't pile up indefinitely, exhausting available
+// system memory.
+func TestBlockMemoryExhaustionAttack(t *testing.T) {
+	// Create a tester with instrumented import hooks
+	tester := newTester()
+
+	imported, enqueued := make(chan *types.Block), int32(0)
+	tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+	tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
+		if added {
+			atomic.AddInt32(&enqueued, 1)
+		} else {
+			atomic.AddInt32(&enqueued, -1)
+		}
+	}
+	// Create a valid chain and a batch of dangling (but in range) blocks
+	targetBlocks := hashLimit + 2*maxQueueDist
+	hashes, blocks := makeChain(targetBlocks, 0, genesis)
+	attack := make(map[common.Hash]*types.Block)
+	for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
+		hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
+		for _, hash := range hashes[:maxQueueDist-2] {
+			attack[hash] = blocks[hash]
+		}
+	}
+	// Try to feed all the attacker blocks make sure only a limited batch is accepted
+	for _, block := range attack {
+		tester.fetcher.Enqueue("attacker", block)
+	}
+	time.Sleep(200 * time.Millisecond)
+	if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
+		t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
+	}
+	// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
+	for i := 0; i < maxQueueDist-1; i++ {
+		tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
+	}
+	time.Sleep(100 * time.Millisecond)
+	if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
+		t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
+	}
+	// Insert the missing piece (and sanity check the import)
+	tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
+	verifyImportCount(t, imported, maxQueueDist)
+
+	// Insert the remaining blocks in chunks to ensure clean DOS protection
+	for i := maxQueueDist; i < len(hashes)-1; i++ {
+		tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
+		verifyImportEvent(t, imported, true)
+	}
+	verifyImportDone(t, imported)
+}
diff --git a/dex/fetcher/metrics.go b/dex/fetcher/metrics.go
new file mode 100644
index 000000000..23b670549
--- /dev/null
+++ b/dex/fetcher/metrics.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the metrics collected by the fetcher.
+
+package fetcher
+
+import (
+	"github.com/dexon-foundation/dexon/metrics"
+)
+
+var (
+	propAnnounceInMeter   = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/in", nil)
+	propAnnounceOutTimer  = metrics.NewRegisteredTimer("dex/fetcher/prop/announces/out", nil)
+	propAnnounceDropMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/drop", nil)
+	propAnnounceDOSMeter  = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/dos", nil)
+
+	propBroadcastInMeter   = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/in", nil)
+	propBroadcastOutTimer  = metrics.NewRegisteredTimer("dex/fetcher/prop/broadcasts/out", nil)
+	propBroadcastDropMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/drop", nil)
+	propBroadcastDOSMeter  = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/dos", nil)
+
+	headerFetchMeter = metrics.NewRegisteredMeter("dex/fetcher/fetch/headers", nil)
+	bodyFetchMeter   = metrics.NewRegisteredMeter("dex/fetcher/fetch/bodies", nil)
+
+	headerFilterInMeter  = metrics.NewRegisteredMeter("dex/fetcher/filter/headers/in", nil)
+	headerFilterOutMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/headers/out", nil)
+	bodyFilterInMeter    = metrics.NewRegisteredMeter("dex/fetcher/filter/bodies/in", nil)
+	bodyFilterOutMeter   = metrics.NewRegisteredMeter("dex/fetcher/filter/bodies/out", nil)
+)
-- 
cgit v1.2.3