aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/downloader/api.go166
-rw-r--r--dex/downloader/downloader.go1684
-rw-r--r--dex/downloader/downloader_test.go1481
-rw-r--r--dex/downloader/events.go21
-rw-r--r--dex/downloader/fakepeer.go161
-rw-r--r--dex/downloader/metrics.go43
-rw-r--r--dex/downloader/modes.go73
-rw-r--r--dex/downloader/peer.go573
-rw-r--r--dex/downloader/queue.go885
-rw-r--r--dex/downloader/statesync.go484
-rw-r--r--dex/downloader/testchain_test.go221
-rw-r--r--dex/downloader/types.go79
-rw-r--r--dex/fetcher/fetcher.go736
-rw-r--r--dex/fetcher/fetcher_test.go790
-rw-r--r--dex/fetcher/metrics.go43
15 files changed, 7440 insertions, 0 deletions
diff --git a/dex/downloader/api.go b/dex/downloader/api.go
new file mode 100644
index 000000000..721818e75
--- /dev/null
+++ b/dex/downloader/api.go
@@ -0,0 +1,166 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "context"
+ "sync"
+
+ ethereum "github.com/dexon-foundation/dexon"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/rpc"
+)
+
+// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
+// It offers only methods that operates on data that can be available to anyone without security risks.
+type PublicDownloaderAPI struct {
+ d *Downloader
+ mux *event.TypeMux
+ installSyncSubscription chan chan interface{}
+ uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
+}
+
+// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
+// listens for events from the downloader through the global event mux. In case it receives one of
+// these events it broadcasts it to all syncing subscriptions that are installed through the
+// installSyncSubscription channel.
+func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
+ api := &PublicDownloaderAPI{
+ d: d,
+ mux: m,
+ installSyncSubscription: make(chan chan interface{}),
+ uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
+ }
+
+ go api.eventLoop()
+
+ return api
+}
+
+// eventLoop runs a loop until the event mux closes. It will install and uninstall new
+// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
+func (api *PublicDownloaderAPI) eventLoop() {
+ var (
+ sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+ syncSubscriptions = make(map[chan interface{}]struct{})
+ )
+
+ for {
+ select {
+ case i := <-api.installSyncSubscription:
+ syncSubscriptions[i] = struct{}{}
+ case u := <-api.uninstallSyncSubscription:
+ delete(syncSubscriptions, u.c)
+ close(u.uninstalled)
+ case event := <-sub.Chan():
+ if event == nil {
+ return
+ }
+
+ var notification interface{}
+ switch event.Data.(type) {
+ case StartEvent:
+ notification = &SyncingResult{
+ Syncing: true,
+ Status: api.d.Progress(),
+ }
+ case DoneEvent, FailedEvent:
+ notification = false
+ }
+ // broadcast
+ for c := range syncSubscriptions {
+ c <- notification
+ }
+ }
+ }
+}
+
+// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ statuses := make(chan interface{})
+ sub := api.SubscribeSyncStatus(statuses)
+
+ for {
+ select {
+ case status := <-statuses:
+ notifier.Notify(rpcSub.ID, status)
+ case <-rpcSub.Err():
+ sub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ sub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// SyncingResult provides information about the current synchronisation status for this node.
+type SyncingResult struct {
+ Syncing bool `json:"syncing"`
+ Status ethereum.SyncProgress `json:"status"`
+}
+
+// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
+type uninstallSyncSubscriptionRequest struct {
+ c chan interface{}
+ uninstalled chan interface{}
+}
+
+// SyncStatusSubscription represents a syncing subscription.
+type SyncStatusSubscription struct {
+ api *PublicDownloaderAPI // register subscription in event loop of this api instance
+ c chan interface{} // channel where events are broadcasted to
+ unsubOnce sync.Once // make sure unsubscribe logic is executed once
+}
+
+// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
+// The status channel that was passed to subscribeSyncStatus isn't used anymore
+// after this method returns.
+func (s *SyncStatusSubscription) Unsubscribe() {
+ s.unsubOnce.Do(func() {
+ req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
+ s.api.uninstallSyncSubscription <- &req
+
+ for {
+ select {
+ case <-s.c:
+ // drop new status events until uninstall confirmation
+ continue
+ case <-req.uninstalled:
+ return
+ }
+ }
+ })
+}
+
+// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
+// The given channel must receive interface values, the result can either
+func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
+ api.installSyncSubscription <- status
+ return &SyncStatusSubscription{api: api, c: status}
+}
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go
new file mode 100644
index 000000000..0383a3709
--- /dev/null
+++ b/dex/downloader/downloader.go
@@ -0,0 +1,1684 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package downloader contains the manual full chain synchronisation.
+package downloader
+
+import (
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ ethereum "github.com/dexon-foundation/dexon"
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core/rawdb"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/metrics"
+ "github.com/dexon-foundation/dexon/params"
+)
+
+var (
+ MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
+ MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
+ MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
+ MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
+ MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
+ MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
+ MaxStateFetch = 384 // Amount of node state values to allow fetching per request
+
+ MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation
+ rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
+ rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests
+ rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
+ ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
+ ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
+
+ qosTuningPeers = 5 // Number of peers to tune based on (best peers)
+ qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
+ qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
+
+ maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
+ maxResultsProcess = 2048 // Number of content download results to import at once into the chain
+
+ reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
+ reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
+
+ fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
+)
+
+var (
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errNoPeers = errors.New("no peers to keep download active")
+ errTimeout = errors.New("timeout")
+ errEmptyHeaderSet = errors.New("empty header set by peer")
+ errPeersUnavailable = errors.New("no peers available or all tried for download")
+ errInvalidAncestor = errors.New("retrieved ancestor is invalid")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errInvalidBlock = errors.New("retrieved block is invalid")
+ errInvalidBody = errors.New("retrieved block body is invalid")
+ errInvalidReceipt = errors.New("retrieved receipt is invalid")
+ errCancelBlockFetch = errors.New("block download canceled (requested)")
+ errCancelHeaderFetch = errors.New("block header download canceled (requested)")
+ errCancelBodyFetch = errors.New("block body download canceled (requested)")
+ errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
+ errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
+ errCancelContentProcessing = errors.New("content processing canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
+ errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
+)
+
+type Downloader struct {
+ mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
+ mux *event.TypeMux // Event multiplexer to announce sync operation events
+
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ stateDB ethdb.Database
+
+ rttEstimate uint64 // Round trip time to target for download requests
+ rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
+
+ // Statistics
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsState stateSyncStats
+ syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
+
+ lightchain LightChain
+ blockchain BlockChain
+
+ // Callbacks
+ dropPeer peerDropFn // Drops a peer for misbehaving
+
+ // Status
+ synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
+ synchronising int32
+ notified int32
+ committed int32
+
+ // Channels
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+
+ // for stateFetcher
+ stateSyncStart chan *stateSync
+ trackStateReq chan *stateReq
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+
+ // Cancellation and termination
+ cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
+ cancelCh chan struct{} // Channel to cancel mid-flight syncs
+ cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
+ cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.
+
+ quitCh chan struct{} // Quit channel to signal termination
+ quitLock sync.RWMutex // Lock to prevent double closes
+
+ // Testing hooks
+ syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
+ bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
+ receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
+ chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
+}
+
+// LightChain encapsulates functions required to synchronise a light chain.
+type LightChain interface {
+ // HasHeader verifies a header's presence in the local chain.
+ HasHeader(common.Hash, uint64) bool
+
+ // GetHeaderByHash retrieves a header from the local chain.
+ GetHeaderByHash(common.Hash) *types.Header
+
+ // CurrentHeader retrieves the head header from the local chain.
+ CurrentHeader() *types.Header
+
+ // GetTd returns the total difficulty of a local block.
+ GetTd(common.Hash, uint64) *big.Int
+
+ // InsertHeaderChain inserts a batch of headers into the local chain.
+ InsertHeaderChain([]*types.Header, int) (int, error)
+
+ // Rollback removes a few recently added elements from the local chain.
+ Rollback([]common.Hash)
+}
+
+// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
+type BlockChain interface {
+ LightChain
+
+ // HasBlock verifies a block's presence in the local chain.
+ HasBlock(common.Hash, uint64) bool
+
+ // GetBlockByHash retrieves a block from the local chain.
+ GetBlockByHash(common.Hash) *types.Block
+
+ // CurrentBlock retrieves the head block from the local chain.
+ CurrentBlock() *types.Block
+
+ // CurrentFastBlock retrieves the head fast block from the local chain.
+ CurrentFastBlock() *types.Block
+
+ // FastSyncCommitHead directly commits the head block to a certain entity.
+ FastSyncCommitHead(common.Hash) error
+
+ // InsertChain inserts a batch of blocks into the local chain.
+ InsertChain(types.Blocks) (int, error)
+
+ // InsertReceiptChain inserts a batch of receipts into the local chain.
+ InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
+}
+
+// New creates a new downloader to fetch hashes and blocks from remote peers.
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
+ if lightchain == nil {
+ lightchain = chain
+ }
+
+ dl := &Downloader{
+ mode: mode,
+ stateDB: stateDb,
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ rttEstimate: uint64(rttMaxEstimate),
+ rttConfidence: uint64(1000000),
+ blockchain: chain,
+ lightchain: lightchain,
+ dropPeer: dropPeer,
+ headerCh: make(chan dataPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
+ headerProcCh: make(chan []*types.Header, 1),
+ quitCh: make(chan struct{}),
+ stateCh: make(chan dataPack),
+ stateSyncStart: make(chan *stateSync),
+ syncStatsState: stateSyncStats{
+ processed: rawdb.ReadFastTrieProgress(stateDb),
+ },
+ trackStateReq: make(chan *stateReq),
+ }
+ go dl.qosTuner()
+ go dl.stateFetcher()
+ return dl
+}
+
+// Progress retrieves the synchronisation boundaries, specifically the origin
+// block where synchronisation started at (may have failed/suspended); the block
+// or header sync is currently at; and the latest known block which the sync targets.
+//
+// In addition, during the state download phase of fast synchronisation the number
+// of processed and the total number of known states are also returned. Otherwise
+// these are zero.
+func (d *Downloader) Progress() ethereum.SyncProgress {
+ // Lock the current stats and return the progress
+ d.syncStatsLock.RLock()
+ defer d.syncStatsLock.RUnlock()
+
+ current := uint64(0)
+ switch d.mode {
+ case FullSync:
+ current = d.blockchain.CurrentBlock().NumberU64()
+ case FastSync:
+ current = d.blockchain.CurrentFastBlock().NumberU64()
+ case LightSync:
+ current = d.lightchain.CurrentHeader().Number.Uint64()
+ }
+ return ethereum.SyncProgress{
+ StartingBlock: d.syncStatsChainOrigin,
+ CurrentBlock: current,
+ HighestBlock: d.syncStatsChainHeight,
+ PulledStates: d.syncStatsState.processed,
+ KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
+ }
+}
+
+// Synchronising returns whether the downloader is currently retrieving blocks.
+func (d *Downloader) Synchronising() bool {
+ return atomic.LoadInt32(&d.synchronising) > 0
+}
+
+// RegisterPeer injects a new download peer into the set of block source to be
+// used for fetching hashes and blocks from.
+func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
+ logger := log.New("peer", id)
+ logger.Trace("Registering sync peer")
+ if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
+ logger.Error("Failed to register sync peer", "err", err)
+ return err
+ }
+ d.qosReduceConfidence()
+
+ return nil
+}
+
+// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
+func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
+ return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
+}
+
+// UnregisterPeer remove a peer from the known list, preventing any action from
+// the specified peer. An effort is also made to return any pending fetches into
+// the queue.
+func (d *Downloader) UnregisterPeer(id string) error {
+ // Unregister the peer from the active peer set and revoke any fetch tasks
+ logger := log.New("peer", id)
+ logger.Trace("Unregistering sync peer")
+ if err := d.peers.Unregister(id); err != nil {
+ logger.Error("Failed to unregister sync peer", "err", err)
+ return err
+ }
+ d.queue.Revoke(id)
+
+ // If this peer was the master peer, abort sync immediately
+ d.cancelLock.RLock()
+ master := id == d.cancelPeer
+ d.cancelLock.RUnlock()
+
+ if master {
+ d.cancel()
+ }
+ return nil
+}
+
+// Synchronise tries to sync up our local block chain with a remote peer, both
+// adding various sanity checks as well as wrapping it with various log entries.
+func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
+ err := d.synchronise(id, head, td, mode)
+ switch err {
+ case nil:
+ case errBusy:
+
+ case errTimeout, errBadPeer, errStallingPeer,
+ errEmptyHeaderSet, errPeersUnavailable, errTooOld,
+ errInvalidAncestor, errInvalidChain:
+ log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
+ if d.dropPeer == nil {
+ // The dropPeer method is nil when `--copydb` is used for a local copy.
+ // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+ log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
+ } else {
+ d.dropPeer(id)
+ }
+ default:
+ log.Warn("Synchronisation failed, retrying", "err", err)
+ }
+ return err
+}
+
+// synchronise will select the peer and use it for synchronising. If an empty string is given
+// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
+// checks fail an error will be returned. This method is synchronous
+func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
+ // Mock out the synchronisation if testing
+ if d.synchroniseMock != nil {
+ return d.synchroniseMock(id, hash)
+ }
+ // Make sure only one goroutine is ever allowed past this point at once
+ if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
+ return errBusy
+ }
+ defer atomic.StoreInt32(&d.synchronising, 0)
+
+ // Post a user notification of the sync (only once per session)
+ if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
+ log.Info("Block synchronisation started")
+ }
+ // Reset the queue, peer set and wake channels to clean any internal leftover state
+ d.queue.Reset()
+ d.peers.Reset()
+
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case <-ch:
+ default:
+ }
+ }
+ for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
+ for empty := false; !empty; {
+ select {
+ case <-ch:
+ default:
+ empty = true
+ }
+ }
+ }
+ for empty := false; !empty; {
+ select {
+ case <-d.headerProcCh:
+ default:
+ empty = true
+ }
+ }
+ // Create cancel channel for aborting mid-flight and mark the master peer
+ d.cancelLock.Lock()
+ d.cancelCh = make(chan struct{})
+ d.cancelPeer = id
+ d.cancelLock.Unlock()
+
+ defer d.Cancel() // No matter what, we can't leave the cancel channel open
+
+ // Set the requested sync mode, unless it's forbidden
+ d.mode = mode
+
+ // Retrieve the origin peer and initiate the downloading process
+ p := d.peers.Peer(id)
+ if p == nil {
+ return errUnknownPeer
+ }
+ return d.syncWithPeer(p, hash, td)
+}
+
+// syncWithPeer starts a block synchronization based on the hash chain from the
+// specified peer and head hash.
+func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
+ d.mux.Post(StartEvent{})
+ defer func() {
+ // reset on error
+ if err != nil {
+ d.mux.Post(FailedEvent{err})
+ } else {
+ d.mux.Post(DoneEvent{})
+ }
+ }()
+ if p.version < 62 {
+ return errTooOld
+ }
+
+ log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
+ defer func(start time.Time) {
+ log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
+ }(time.Now())
+
+ // Look up the sync boundaries: the common ancestor and the target block
+ latest, err := d.fetchHeight(p)
+ if err != nil {
+ return err
+ }
+ height := latest.Number.Uint64()
+
+ origin, err := d.findAncestor(p, height)
+ if err != nil {
+ return err
+ }
+ d.syncStatsLock.Lock()
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
+ }
+ d.syncStatsChainHeight = height
+ d.syncStatsLock.Unlock()
+
+ // Ensure our origin point is below any fast sync pivot point
+ pivot := uint64(0)
+ if d.mode == FastSync {
+ if height <= uint64(fsMinFullBlocks) {
+ origin = 0
+ } else {
+ pivot = height - uint64(fsMinFullBlocks)
+ if pivot <= origin {
+ origin = pivot - 1
+ }
+ }
+ }
+ d.committed = 1
+ if d.mode == FastSync && pivot != 0 {
+ d.committed = 0
+ }
+ // Initiate the sync using a concurrent header and content retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode)
+ if d.syncInitHook != nil {
+ d.syncInitHook(origin, height)
+ }
+
+ fetchers := []func() error{
+ func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, pivot, td) },
+ }
+ if d.mode == FastSync {
+ fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
+ } else if d.mode == FullSync {
+ fetchers = append(fetchers, d.processFullSyncContent)
+ }
+ return d.spawnSync(fetchers)
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers []func() error) error {
+ errc := make(chan error, len(fetchers))
+ d.cancelWg.Add(len(fetchers))
+ for _, fn := range fetchers {
+ fn := fn
+ go func() { defer d.cancelWg.Done(); errc <- fn() }()
+ }
+ // Wait for the first error, then terminate the others.
+ var err error
+ for i := 0; i < len(fetchers); i++ {
+ if i == len(fetchers)-1 {
+ // Close the queue when all fetchers have exited.
+ // This will cause the block processor to end when
+ // it has processed the queue.
+ d.queue.Close()
+ }
+ if err = <-errc; err != nil {
+ break
+ }
+ }
+ d.queue.Close()
+ d.Cancel()
+ return err
+}
+
+// cancel aborts all of the operations and resets the queue. However, cancel does
+// not wait for the running download goroutines to finish. This method should be
+// used when cancelling the downloads from inside the downloader.
+func (d *Downloader) cancel() {
+ // Close the current cancel channel
+ d.cancelLock.Lock()
+ if d.cancelCh != nil {
+ select {
+ case <-d.cancelCh:
+ // Channel was already closed
+ default:
+ close(d.cancelCh)
+ }
+ }
+ d.cancelLock.Unlock()
+}
+
+// Cancel aborts all of the operations and waits for all download goroutines to
+// finish before returning.
+func (d *Downloader) Cancel() {
+ d.cancel()
+ d.cancelWg.Wait()
+}
+
+// Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
+func (d *Downloader) Terminate() {
+ // Close the termination channel (make sure double close is allowed)
+ d.quitLock.Lock()
+ select {
+ case <-d.quitCh:
+ default:
+ close(d.quitCh)
+ }
+ d.quitLock.Unlock()
+
+ // Cancel any pending download requests
+ d.Cancel()
+}
+
+// fetchHeight retrieves the head header of the remote peer to aid in estimating
+// the total time a pending synchronisation would take.
+func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
+ p.log.Debug("Retrieving remote chain height")
+
+ // Request the advertised remote head block and wait for the response
+ head, _ := p.peer.Head()
+ go p.peer.RequestHeadersByHash(head, 1, 0, false)
+
+ ttl := d.requestTTL()
+ timeout := time.After(ttl)
+ for {
+ select {
+ case <-d.cancelCh:
+ return nil, errCancelBlockFetch
+
+ case packet := <-d.headerCh:
+ // Discard anything not from the origin peer
+ if packet.PeerId() != p.id {
+ log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
+ break
+ }
+ // Make sure the peer actually gave something valid
+ headers := packet.(*headerPack).headers
+ if len(headers) != 1 {
+ p.log.Debug("Multiple headers for single request", "headers", len(headers))
+ return nil, errBadPeer
+ }
+ head := headers[0]
+ p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
+ return head, nil
+
+ case <-timeout:
+ p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+ return nil, errTimeout
+
+ case <-d.bodyCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
+ }
+ }
+}
+
+// findAncestor tries to locate the common ancestor link of the local chain and
+// a remote peers blockchain. In the general case when our node was in sync and
+// on the correct chain, checking the top N links should already get us a match.
+// In the rare scenario when we ended up on a long reorganisation (i.e. none of
+// the head links match), we do a binary search to find the common ancestor.
+func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
+ // Figure out the valid ancestor range to prevent rewrite attacks
+ floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
+
+ if d.mode == FullSync {
+ ceil = d.blockchain.CurrentBlock().NumberU64()
+ } else if d.mode == FastSync {
+ ceil = d.blockchain.CurrentFastBlock().NumberU64()
+ }
+ if ceil >= MaxForkAncestry {
+ floor = int64(ceil - MaxForkAncestry)
+ }
+ p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
+
+ // Request the topmost blocks to short circuit binary ancestor lookup
+ head := ceil
+ if head > height {
+ head = height
+ }
+ from := int64(head) - int64(MaxHeaderFetch)
+ if from < 0 {
+ from = 0
+ }
+ // Span out with 15 block gaps into the future to catch bad head reports
+ limit := 2 * MaxHeaderFetch / 16
+ count := 1 + int((int64(ceil)-from)/16)
+ if count > limit {
+ count = limit
+ }
+ go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
+
+ // Wait for the remote response to the head fetch
+ number, hash := uint64(0), common.Hash{}
+
+ ttl := d.requestTTL()
+ timeout := time.After(ttl)
+
+ for finished := false; !finished; {
+ select {
+ case <-d.cancelCh:
+ return 0, errCancelHeaderFetch
+
+ case packet := <-d.headerCh:
+ // Discard anything not from the origin peer
+ if packet.PeerId() != p.id {
+ log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
+ break
+ }
+ // Make sure the peer actually gave something valid
+ headers := packet.(*headerPack).headers
+ if len(headers) == 0 {
+ p.log.Warn("Empty head header set")
+ return 0, errEmptyHeaderSet
+ }
+ // Make sure the peer's reply conforms to the request
+ for i := 0; i < len(headers); i++ {
+ if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
+ p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
+ return 0, errInvalidChain
+ }
+ }
+ // Check if a common ancestor was found
+ finished = true
+ for i := len(headers) - 1; i >= 0; i-- {
+ // Skip any headers that underflow/overflow our requested set
+ if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
+ continue
+ }
+ // Otherwise check if we already know the header or not
+ h := headers[i].Hash()
+ n := headers[i].Number.Uint64()
+ if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) {
+ number, hash = n, h
+
+ // If every header is known, even future ones, the peer straight out lied about its head
+ if number > height && i == limit-1 {
+ p.log.Warn("Lied about chain head", "reported", height, "found", number)
+ return 0, errStallingPeer
+ }
+ break
+ }
+ }
+
+ case <-timeout:
+ p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+ return 0, errTimeout
+
+ case <-d.bodyCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
+ }
+ }
+ // If the head fetch already found an ancestor, return
+ if hash != (common.Hash{}) {
+ if int64(number) <= floor {
+ p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
+ return 0, errInvalidAncestor
+ }
+ p.log.Debug("Found common ancestor", "number", number, "hash", hash)
+ return number, nil
+ }
+ // Ancestor not found, we need to binary search over our chain
+ start, end := uint64(0), head
+ if floor > 0 {
+ start = uint64(floor)
+ }
+ for start+1 < end {
+ // Split our chain interval in two, and request the hash to cross check
+ check := (start + end) / 2
+
+ ttl := d.requestTTL()
+ timeout := time.After(ttl)
+
+ go p.peer.RequestHeadersByNumber(check, 1, 0, false)
+
+ // Wait until a reply arrives to this request
+ for arrived := false; !arrived; {
+ select {
+ case <-d.cancelCh:
+ return 0, errCancelHeaderFetch
+
+ case packer := <-d.headerCh:
+ // Discard anything not from the origin peer
+ if packer.PeerId() != p.id {
+ log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
+ break
+ }
+ // Make sure the peer actually gave something valid
+ headers := packer.(*headerPack).headers
+ if len(headers) != 1 {
+ p.log.Debug("Multiple headers for single request", "headers", len(headers))
+ return 0, errBadPeer
+ }
+ arrived = true
+
+ // Modify the search interval based on the response
+ h := headers[0].Hash()
+ n := headers[0].Number.Uint64()
+ if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) {
+ end = check
+ break
+ }
+ header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
+ if header.Number.Uint64() != check {
+ p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
+ return 0, errBadPeer
+ }
+ start = check
+ hash = h
+
+ case <-timeout:
+ p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
+ return 0, errTimeout
+
+ case <-d.bodyCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
+ }
+ }
+ }
+ // Ensure valid ancestry and return
+ if int64(start) <= floor {
+ p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
+ return 0, errInvalidAncestor
+ }
+ p.log.Debug("Found common ancestor", "number", start, "hash", hash)
+ return start, nil
+}
+
+// fetchHeaders keeps retrieving headers concurrently from the number
+// requested, until no more are returned, potentially throttling on the way. To
+// facilitate concurrency but still protect against malicious nodes sending bad
+// headers, we construct a header chain skeleton using the "origin" peer we are
+// syncing with, and fill in the missing headers using anyone else. Headers from
+// other peers are only accepted if they map cleanly to the skeleton. If no one
+// can fill in the skeleton - not even the origin peer - it's assumed invalid and
+// the origin is dropped.
+func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
+ p.log.Debug("Directing header downloads", "origin", from)
+ defer p.log.Debug("Header download terminated")
+
+ // Create a timeout timer, and the associated header fetcher
+ skeleton := true // Skeleton assembly phase or finishing up
+ request := time.Now() // time of the last skeleton fetch request
+ timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
+ <-timeout.C // timeout channel should be initially empty
+ defer timeout.Stop()
+
+ var ttl time.Duration
+ getHeaders := func(from uint64) {
+ request = time.Now()
+
+ ttl = d.requestTTL()
+ timeout.Reset(ttl)
+
+ if skeleton {
+ p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
+ go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ } else {
+ p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
+ go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
+ }
+ }
+ // Start pulling the header chain skeleton until all is done
+ getHeaders(from)
+
+ for {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+
+ case packet := <-d.headerCh:
+ // Make sure the active peer is giving us the skeleton headers
+ if packet.PeerId() != p.id {
+ log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
+ break
+ }
+ headerReqTimer.UpdateSince(request)
+ timeout.Stop()
+
+ // If the skeleton's finished, pull any remaining head headers directly from the origin
+ if packet.Items() == 0 && skeleton {
+ skeleton = false
+ getHeaders(from)
+ continue
+ }
+ // If no more headers are inbound, notify the content fetchers and return
+ if packet.Items() == 0 {
+ // Don't abort header fetches while the pivot is downloading
+ if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
+ p.log.Debug("No headers, waiting for pivot commit")
+ select {
+ case <-time.After(fsHeaderContCheck):
+ getHeaders(from)
+ continue
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ }
+ // Pivot done (or not in fast sync) and no more headers, terminate the process
+ p.log.Debug("No more headers available")
+ select {
+ case d.headerProcCh <- nil:
+ return nil
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ }
+ headers := packet.(*headerPack).headers
+
+ // If we received a skeleton batch, resolve internals concurrently
+ if skeleton {
+ filled, proced, err := d.fillHeaderSkeleton(from, headers)
+ if err != nil {
+ p.log.Debug("Skeleton chain invalid", "err", err)
+ return errInvalidChain
+ }
+ headers = filled[proced:]
+ from += uint64(proced)
+ } else {
+ // If we're closing in on the chain head, but haven't yet reached it, delay
+ // the last few headers so mini reorgs on the head don't cause invalid hash
+ // chain errors.
+ if n := len(headers); n > 0 {
+ // Retrieve the current head we're at
+ head := uint64(0)
+ if d.mode == LightSync {
+ head = d.lightchain.CurrentHeader().Number.Uint64()
+ } else {
+ head = d.blockchain.CurrentFastBlock().NumberU64()
+ if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
+ head = full
+ }
+ }
+ // If the head is way older than this batch, delay the last few headers
+ if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
+ delay := reorgProtHeaderDelay
+ if delay > n {
+ delay = n
+ }
+ headers = headers[:n-delay]
+ }
+ }
+ }
+ // Insert all the new headers and fetch the next batch
+ if len(headers) > 0 {
+ p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
+ select {
+ case d.headerProcCh <- headers:
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ from += uint64(len(headers))
+ getHeaders(from)
+ } else {
+ // No headers delivered, or all of them being delayed, sleep a bit and retry
+ p.log.Trace("All headers delayed, waiting")
+ select {
+ case <-time.After(fsHeaderContCheck):
+ getHeaders(from)
+ continue
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ }
+
+ case <-timeout.C:
+ if d.dropPeer == nil {
+ // The dropPeer method is nil when `--copydb` is used for a local copy.
+ // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+ p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id)
+ break
+ }
+ // Header retrieval timed out, consider the peer bad and drop
+ p.log.Debug("Header request timed out", "elapsed", ttl)
+ headerTimeoutMeter.Mark(1)
+ d.dropPeer(p.id)
+
+ // Finish the sync gracefully instead of dumping the gathered data though
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
+ }
+ select {
+ case d.headerProcCh <- nil:
+ case <-d.cancelCh:
+ }
+ return errBadPeer
+ }
+ }
+}
+
+// fillHeaderSkeleton concurrently retrieves headers from all our available peers
+// and maps them to the provided skeleton header chain.
+//
+// Any partial results from the beginning of the skeleton is (if possible) forwarded
+// immediately to the header processor to keep the rest of the pipeline full even
+// in the case of header stalls.
+//
+// The method returns the entire filled skeleton and also the number of headers
+// already forwarded for processing.
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
+ log.Debug("Filling up skeleton", "from", from)
+ d.queue.ScheduleSkeleton(from, skeleton)
+
+ var (
+ deliver = func(packet dataPack) (int, error) {
+ pack := packet.(*headerPack)
+ return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
+ }
+ expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
+ throttle = func() bool { return false }
+ reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
+ return d.queue.ReserveHeaders(p, count), false, nil
+ }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
+ capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
+ )
+ err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
+ d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
+ nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
+
+ log.Debug("Skeleton fill terminated", "err", err)
+
+ filled, proced := d.queue.RetrieveHeaders()
+ return filled, proced, err
+}
+
+// fetchBodies iteratively downloads the scheduled block bodies, taking any
+// available peers, reserving a chunk of blocks for each, waiting for delivery
+// and also periodically checking for timeouts.
+func (d *Downloader) fetchBodies(from uint64) error {
+ log.Debug("Downloading block bodies", "origin", from)
+
+ var (
+ deliver = func(packet dataPack) (int, error) {
+ pack := packet.(*bodyPack)
+ return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
+ }
+ expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
+ capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
+ )
+ err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
+ d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
+ d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
+
+ log.Debug("Block body download terminated", "err", err)
+ return err
+}
+
+// fetchReceipts iteratively downloads the scheduled block receipts, taking any
+// available peers, reserving a chunk of receipts for each, waiting for delivery
+// and also periodically checking for timeouts.
+func (d *Downloader) fetchReceipts(from uint64) error {
+ log.Debug("Downloading transaction receipts", "origin", from)
+
+ var (
+ deliver = func(packet dataPack) (int, error) {
+ pack := packet.(*receiptPack)
+ return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
+ }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
+ capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
+ )
+ err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
+ d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
+ d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
+
+ log.Debug("Transaction receipt download terminated", "err", err)
+ return err
+}
+
+// fetchParts iteratively downloads scheduled block parts, taking any available
+// peers, reserving a chunk of fetch requests for each, waiting for delivery and
+// also periodically checking for timeouts.
+//
+// As the scheduling/timeout logic mostly is the same for all downloaded data
+// types, this method is used by each for data gathering and is instrumented with
+// various callbacks to handle the slight differences between processing them.
+//
+// The instrumentation parameters:
+// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
+// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
+// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
+// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
+// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
+// - pending: task callback for the number of requests still needing download (detect completion/non-completability)
+// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
+// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
+// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
+// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
+// - fetch: network callback to actually send a particular download request to a physical remote peer
+// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
+// - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
+// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
+// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
+// - kind: textual label of the type being downloaded to display in log mesages
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
+ fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
+ idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
+
+ // Create a ticker to detect expired retrieval tasks
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+
+ update := make(chan struct{}, 1)
+
+ // Prepare the queue and fetch block parts until the block header fetcher's done
+ finished := false
+ for {
+ select {
+ case <-d.cancelCh:
+ return errCancel
+
+ case packet := <-deliveryCh:
+ // If the peer was previously banned and failed to deliver its pack
+ // in a reasonable time frame, ignore its message.
+ if peer := d.peers.Peer(packet.PeerId()); peer != nil {
+ // Deliver the received chunk of data and check chain validity
+ accepted, err := deliver(packet)
+ if err == errInvalidChain {
+ return err
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ setIdle(peer, accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && packet.Items() == 0:
+ peer.log.Trace("Requested data not delivered", "type", kind)
+ case err == nil:
+ peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
+ default:
+ peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
+ }
+ }
+ // Blocks assembled, try to update the progress
+ select {
+ case update <- struct{}{}:
+ default:
+ }
+
+ case cont := <-wakeCh:
+ // The header fetcher sent a continuation flag, check if it's done
+ if !cont {
+ finished = true
+ }
+ // Headers arrive, try to update the progress
+ select {
+ case update <- struct{}{}:
+ default:
+ }
+
+ case <-ticker.C:
+ // Sanity check update the progress
+ select {
+ case update <- struct{}{}:
+ default:
+ }
+
+ case <-update:
+ // Short circuit if we lost all our peers
+ if d.peers.Len() == 0 {
+ return errNoPeers
+ }
+ // Check for fetch request timeouts and demote the responsible peers
+ for pid, fails := range expire() {
+ if peer := d.peers.Peer(pid); peer != nil {
+ // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
+ // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
+ // out that sync wise we need to get rid of the peer.
+ //
+ // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
+ // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
+ // how response times reacts, to it always requests one more than the minimum (i.e. min 2).
+ if fails > 2 {
+ peer.log.Trace("Data delivery timed out", "type", kind)
+ setIdle(peer, 0)
+ } else {
+ peer.log.Debug("Stalling delivery, dropping", "type", kind)
+ if d.dropPeer == nil {
+ // The dropPeer method is nil when `--copydb` is used for a local copy.
+ // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+ peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
+ } else {
+ d.dropPeer(pid)
+ }
+ }
+ }
+ }
+ // If there's nothing more to fetch, wait or terminate
+ if pending() == 0 {
+ if !inFlight() && finished {
+ log.Debug("Data fetching completed", "type", kind)
+ return nil
+ }
+ break
+ }
+ // Send a download request to all idle peers, until throttled
+ progressed, throttled, running := false, false, inFlight()
+ idles, total := idle()
+
+ for _, peer := range idles {
+ // Short circuit if throttling activated
+ if throttle() {
+ throttled = true
+ break
+ }
+ // Short circuit if there is no more available task.
+ if pending() == 0 {
+ break
+ }
+ // Reserve a chunk of fetches for a peer. A nil can mean either that
+ // no more headers are available, or that the peer is known not to
+ // have them.
+ request, progress, err := reserve(peer, capacity(peer))
+ if err != nil {
+ return err
+ }
+ if progress {
+ progressed = true
+ }
+ if request == nil {
+ continue
+ }
+ if request.From > 0 {
+ peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
+ } else {
+ peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
+ }
+ // Fetch the chunk and make sure any errors return the hashes to the queue
+ if fetchHook != nil {
+ fetchHook(request.Headers)
+ }
+ if err := fetch(peer, request); err != nil {
+ // Although we could try and make an attempt to fix this, this error really
+ // means that we've double allocated a fetch task to a peer. If that is the
+ // case, the internal state of the downloader and the queue is very wrong so
+ // better hard crash and note the error instead of silently accumulating into
+ // a much bigger issue.
+ panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
+ }
+ running = true
+ }
+ // Make sure that we have peers available for fetching. If all peers have been tried
+ // and all failed throw an error
+ if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
+ return errPeersUnavailable
+ }
+ }
+ }
+}
+
+// processHeaders takes batches of retrieved headers from an input channel and
+// keeps processing and scheduling them into the header chain and downloader's
+// queue until the stream ends or a failure occurs.
+func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
+ // Keep a count of uncertain headers to roll back
+ rollback := []*types.Header{}
+ defer func() {
+ if len(rollback) > 0 {
+ // Flatten the headers and roll them back
+ hashes := make([]common.Hash, len(rollback))
+ for i, header := range rollback {
+ hashes[i] = header.Hash()
+ }
+ lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
+ if d.mode != LightSync {
+ lastFastBlock = d.blockchain.CurrentFastBlock().Number()
+ lastBlock = d.blockchain.CurrentBlock().Number()
+ }
+ d.lightchain.Rollback(hashes)
+ curFastBlock, curBlock := common.Big0, common.Big0
+ if d.mode != LightSync {
+ curFastBlock = d.blockchain.CurrentFastBlock().Number()
+ curBlock = d.blockchain.CurrentBlock().Number()
+ }
+ log.Warn("Rolled back headers", "count", len(hashes),
+ "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
+ "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
+ "block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
+ }
+ }()
+
+ // Wait for batches of headers to process
+ gotHeaders := false
+
+ for {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+
+ case headers := <-d.headerProcCh:
+ // Terminate header processing if we synced up
+ if len(headers) == 0 {
+ // Notify everyone that headers are fully processed
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
+ }
+ // If no headers were retrieved at all, the peer violated its TD promise that it had a
+ // better chain compared to ours. The only exception is if its promised blocks were
+ // already imported by other means (e.g. fecher):
+ //
+ // R <remote peer>, L <local node>: Both at block 10
+ // R: Mine block 11, and propagate it to L
+ // L: Queue block 11 for import
+ // L: Notice that R's head and TD increased compared to ours, start sync
+ // L: Import of block 11 finishes
+ // L: Sync begins, and finds common ancestor at 11
+ // L: Request new headers up from 11 (R's TD was higher, it must have something)
+ // R: Nothing to give
+ if d.mode != LightSync {
+ head := d.blockchain.CurrentBlock()
+ if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
+ return errStallingPeer
+ }
+ }
+ // If fast or light syncing, ensure promised headers are indeed delivered. This is
+ // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
+ // of delivering the post-pivot blocks that would flag the invalid content.
+ //
+ // This check cannot be executed "as is" for full imports, since blocks may still be
+ // queued for processing when the header download completes. However, as long as the
+ // peer gave us something useful, we're already happy/progressed (above check).
+ if d.mode == FastSync || d.mode == LightSync {
+ head := d.lightchain.CurrentHeader()
+ if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
+ return errStallingPeer
+ }
+ }
+ // Disable any rollback and return
+ rollback = nil
+ return nil
+ }
+ // Otherwise split the chunk of headers into batches and process them
+ gotHeaders = true
+
+ for len(headers) > 0 {
+ // Terminate if something failed in between processing chunks
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+ default:
+ }
+ // Select the next chunk of headers to import
+ limit := maxHeadersProcess
+ if limit > len(headers) {
+ limit = len(headers)
+ }
+ chunk := headers[:limit]
+
+ // In case of header only syncing, validate the chunk immediately
+ if d.mode == FastSync || d.mode == LightSync {
+ // Collect the yet unknown headers to mark them as uncertain
+ unknown := make([]*types.Header, 0, len(headers))
+ for _, header := range chunk {
+ if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
+ unknown = append(unknown, header)
+ }
+ }
+ // If we're importing pure headers, verify based on their recentness
+ frequency := fsHeaderCheckFrequency
+ if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
+ frequency = 1
+ }
+ if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
+ // If some headers were inserted, add them too to the rollback list
+ if n > 0 {
+ rollback = append(rollback, chunk[:n]...)
+ }
+ log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
+ return errInvalidChain
+ }
+ // All verifications passed, store newly found uncertain headers
+ rollback = append(rollback, unknown...)
+ if len(rollback) > fsHeaderSafetyNet {
+ rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
+ }
+ }
+ // Unless we're doing light chains, schedule the headers for associated content retrieval
+ if d.mode == FullSync || d.mode == FastSync {
+ // If we've reached the allowed number of pending headers, stall a bit
+ for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+ case <-time.After(time.Second):
+ }
+ }
+ // Otherwise insert the headers for content retrieval
+ inserts := d.queue.Schedule(chunk, origin)
+ if len(inserts) != len(chunk) {
+ log.Debug("Stale headers")
+ return errBadPeer
+ }
+ }
+ headers = headers[limit:]
+ origin += uint64(limit)
+ }
+
+ // Update the highest block number we know if a higher one is found.
+ d.syncStatsLock.Lock()
+ if d.syncStatsChainHeight < origin {
+ d.syncStatsChainHeight = origin - 1
+ }
+ d.syncStatsLock.Unlock()
+
+ // Signal the content downloaders of the availablility of new tasks
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case ch <- true:
+ default:
+ }
+ }
+ }
+ }
+}
+
+// processFullSyncContent takes fetch results from the queue and imports them into the chain.
+func (d *Downloader) processFullSyncContent() error {
+ for {
+ results := d.queue.Results(true)
+ if len(results) == 0 {
+ return nil
+ }
+ if d.chainInsertHook != nil {
+ d.chainInsertHook(results)
+ }
+ if err := d.importBlockResults(results); err != nil {
+ return err
+ }
+ }
+}
+
+func (d *Downloader) importBlockResults(results []*fetchResult) error {
+ // Check for any early termination requests
+ if len(results) == 0 {
+ return nil
+ }
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ first, last := results[0].Header, results[len(results)-1].Header
+ log.Debug("Inserting downloaded chain", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnum", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, len(results))
+ for i, result := range results {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
+ }
+ return nil
+}
+
+// processFastSyncContent takes fetch results from the queue and writes them to the
+// database. It also controls the synchronisation of state nodes of the pivot block.
+func (d *Downloader) processFastSyncContent(latest *types.Header) error {
+ // Start syncing state of the reported head block. This should get us most of
+ // the state of the pivot block.
+ stateSync := d.syncState(latest.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+ // Figure out the ideal pivot block. Note, that this goalpost may move if the
+ // sync takes long enough for the chain head to move significantly.
+ pivot := uint64(0)
+ if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
+ pivot = height - uint64(fsMinFullBlocks)
+ }
+ // To cater for moving pivot points, track the pivot block and subsequently
+ // accumulated download results separately.
+ var (
+ oldPivot *fetchResult // Locked in pivot block, might change eventually
+ oldTail []*fetchResult // Downloaded content after the pivot
+ )
+ for {
+ // Wait for the next batch of downloaded data to be available, and if the pivot
+ // block became stale, move the goalpost
+ results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
+ if len(results) == 0 {
+ // If pivot sync is done, stop
+ if oldPivot == nil {
+ return stateSync.Cancel()
+ }
+ // If sync failed, stop
+ select {
+ case <-d.cancelCh:
+ return stateSync.Cancel()
+ default:
+ }
+ }
+ if d.chainInsertHook != nil {
+ d.chainInsertHook(results)
+ }
+ if oldPivot != nil {
+ results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
+ }
+ // Split around the pivot block and process the two sides via fast/full sync
+ if atomic.LoadInt32(&d.committed) == 0 {
+ latest = results[len(results)-1].Header
+ if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
+ log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
+ pivot = height - uint64(fsMinFullBlocks)
+ }
+ }
+ P, beforeP, afterP := splitAroundPivot(pivot, results)
+ if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
+ return err
+ }
+ if P != nil {
+ // If new pivot block found, cancel old state retrieval and restart
+ if oldPivot != P {
+ stateSync.Cancel()
+
+ stateSync = d.syncState(P.Header.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+ oldPivot = P
+ }
+ // Wait for completion, occasionally checking for pivot staleness
+ select {
+ case <-stateSync.done:
+ if stateSync.err != nil {
+ return stateSync.err
+ }
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
+ }
+ oldPivot = nil
+
+ case <-time.After(time.Second):
+ oldTail = afterP
+ continue
+ }
+ }
+ // Fast sync done, pivot commit done, full import
+ if err := d.importBlockResults(afterP); err != nil {
+ return err
+ }
+ }
+}
+
+func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+ for _, result := range results {
+ num := result.Header.Number.Uint64()
+ switch {
+ case num < pivot:
+ before = append(before, result)
+ case num == pivot:
+ p = result
+ default:
+ after = append(after, result)
+ }
+ }
+ return p, before, after
+}
+
+func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
+ // Check for any early termination requests
+ if len(results) == 0 {
+ return nil
+ }
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
+ }
+ default:
+ }
+ // Retrieve the a batch of results to import
+ first, last := results[0].Header, results[len(results)-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, len(results))
+ receipts := make([]types.Receipts, len(results))
+ for i, result := range results {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
+ }
+ return nil
+}
+
+func (d *Downloader) commitPivotBlock(result *fetchResult) error {
+ block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
+ if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
+ return err
+ }
+ if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
+ return err
+ }
+ atomic.StoreInt32(&d.committed, 1)
+ return nil
+}
+
+// DeliverHeaders injects a new batch of block headers received from a remote
+// node into the download schedule.
+func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
+ return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
+}
+
+// DeliverBodies injects a new batch of block bodies received from a remote node.
+func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
+ return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
+}
+
+// DeliverReceipts injects a new batch of receipts received from a remote node.
+func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
+ return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
+}
+
+// DeliverNodeData injects a new batch of node state data received from a remote node.
+func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
+ return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
+}
+
+// deliver injects a new batch of data received from a remote node.
+func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
+ // Update the delivery metrics for both good and failed deliveries
+ inMeter.Mark(int64(packet.Items()))
+ defer func() {
+ if err != nil {
+ dropMeter.Mark(int64(packet.Items()))
+ }
+ }()
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
+ if cancel == nil {
+ return errNoSyncActive
+ }
+ select {
+ case destCh <- packet:
+ return nil
+ case <-cancel:
+ return errNoSyncActive
+ }
+}
+
+// qosTuner is the quality of service tuning loop that occasionally gathers the
+// peer latency statistics and updates the estimated request round trip time.
+func (d *Downloader) qosTuner() {
+ for {
+ // Retrieve the current median RTT and integrate into the previoust target RTT
+ rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
+ atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
+
+ // A new RTT cycle passed, increase our confidence in the estimated RTT
+ conf := atomic.LoadUint64(&d.rttConfidence)
+ conf = conf + (1000000-conf)/2
+ atomic.StoreUint64(&d.rttConfidence, conf)
+
+ // Log the new QoS values and sleep until the next RTT
+ log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
+ select {
+ case <-d.quitCh:
+ return
+ case <-time.After(rtt):
+ }
+ }
+}
+
+// qosReduceConfidence is meant to be called when a new peer joins the downloader's
+// peer set, needing to reduce the confidence we have in out QoS estimates.
+func (d *Downloader) qosReduceConfidence() {
+ // If we have a single peer, confidence is always 1
+ peers := uint64(d.peers.Len())
+ if peers == 0 {
+ // Ensure peer connectivity races don't catch us off guard
+ return
+ }
+ if peers == 1 {
+ atomic.StoreUint64(&d.rttConfidence, 1000000)
+ return
+ }
+ // If we have a ton of peers, don't drop confidence)
+ if peers >= uint64(qosConfidenceCap) {
+ return
+ }
+ // Otherwise drop the confidence factor
+ conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
+ if float64(conf)/1000000 < rttMinConfidence {
+ conf = uint64(rttMinConfidence * 1000000)
+ }
+ atomic.StoreUint64(&d.rttConfidence, conf)
+
+ rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
+ log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
+}
+
+// requestRTT returns the current target round trip time for a download request
+// to complete in.
+//
+// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
+// the downloader tries to adapt queries to the RTT, so multiple RTT values can
+// be adapted to, but smaller ones are preferred (stabler download stream).
+func (d *Downloader) requestRTT() time.Duration {
+ return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
+}
+
+// requestTTL returns the current timeout allowance for a single download request
+// to finish under.
+func (d *Downloader) requestTTL() time.Duration {
+ var (
+ rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
+ conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
+ )
+ ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
+ if ttl > ttlLimit {
+ ttl = ttlLimit
+ }
+ return ttl
+}
diff --git a/dex/downloader/downloader_test.go b/dex/downloader/downloader_test.go
new file mode 100644
index 000000000..093b751ca
--- /dev/null
+++ b/dex/downloader/downloader_test.go
@@ -0,0 +1,1481 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ ethereum "github.com/dexon-foundation/dexon"
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/trie"
+)
+
+// Reduce some of the parameters to make the tester faster.
+func init() {
+ MaxForkAncestry = uint64(10000)
+ blockCacheItems = 1024
+ fsHeaderContCheck = 500 * time.Millisecond
+}
+
+// downloadTester is a test simulator for mocking out local block chain.
+type downloadTester struct {
+ downloader *Downloader
+
+ genesis *types.Block // Genesis blocks used by the tester and peers
+ stateDb ethdb.Database // Database used by the tester for syncing from peers
+ peerDb ethdb.Database // Database of the peers containing all data
+ peers map[string]*downloadTesterPeer
+
+ ownHashes []common.Hash // Hash chain belonging to the tester
+ ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
+ ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
+ ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
+ ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
+
+ lock sync.RWMutex
+}
+
+// newTester creates a new downloader test mocker.
+func newTester() *downloadTester {
+ tester := &downloadTester{
+ genesis: testGenesis,
+ peerDb: testDB,
+ peers: make(map[string]*downloadTesterPeer),
+ ownHashes: []common.Hash{testGenesis.Hash()},
+ ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
+ ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
+ ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
+ ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
+ }
+ tester.stateDb = ethdb.NewMemDatabase()
+ tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
+ tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
+ return tester
+}
+
+// terminate aborts any operations on the embedded downloader and releases all
+// held resources.
+func (dl *downloadTester) terminate() {
+ dl.downloader.Terminate()
+}
+
+// sync starts synchronizing with a remote peer, blocking until it completes.
+func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
+ dl.lock.RLock()
+ hash := dl.peers[id].chain.headBlock().Hash()
+ // If no particular TD was requested, load from the peer's blockchain
+ if td == nil {
+ td = dl.peers[id].chain.td(hash)
+ }
+ dl.lock.RUnlock()
+
+ // Synchronise with the chosen peer and ensure proper cleanup afterwards
+ err := dl.downloader.synchronise(id, hash, td, mode)
+ select {
+ case <-dl.downloader.cancelCh:
+ // Ok, downloader fully cancelled after sync cycle
+ default:
+ // Downloader is still accepting packets, can block a peer up
+ panic("downloader active post sync cycle") // panic will be caught by tester
+ }
+ return err
+}
+
+// HasHeader checks if a header is present in the testers canonical chain.
+func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
+ return dl.GetHeaderByHash(hash) != nil
+}
+
+// HasBlock checks if a block is present in the testers canonical chain.
+func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
+ return dl.GetBlockByHash(hash) != nil
+}
+
+// GetHeader retrieves a header from the testers canonical chain.
+func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ return dl.ownHeaders[hash]
+}
+
+// GetBlock retrieves a block from the testers canonical chain.
+func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ return dl.ownBlocks[hash]
+}
+
+// CurrentHeader retrieves the current head header from the canonical chain.
+func (dl *downloadTester) CurrentHeader() *types.Header {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
+ return header
+ }
+ }
+ return dl.genesis.Header()
+}
+
+// CurrentBlock retrieves the current head block from the canonical chain.
+func (dl *downloadTester) CurrentBlock() *types.Block {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
+ if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
+ return block
+ }
+ }
+ }
+ return dl.genesis
+}
+
+// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
+func (dl *downloadTester) CurrentFastBlock() *types.Block {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
+ return block
+ }
+ }
+ return dl.genesis
+}
+
+// FastSyncCommitHead manually sets the head block to a given hash.
+func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
+ // For now only check that the state trie is correct
+ if block := dl.GetBlockByHash(hash); block != nil {
+ _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb), 0)
+ return err
+ }
+ return fmt.Errorf("non existent block: %x", hash[:4])
+}
+
+// GetTd retrieves the block's total difficulty from the canonical chain.
+func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ return dl.ownChainTd[hash]
+}
+
+// InsertHeaderChain injects a new batch of headers into the simulated chain.
+func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
+ if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
+ return 0, errors.New("unknown parent")
+ }
+ for i := 1; i < len(headers); i++ {
+ if headers[i].ParentHash != headers[i-1].Hash() {
+ return i, errors.New("unknown parent")
+ }
+ }
+ // Do a full insert if pre-checks passed
+ for i, header := range headers {
+ if _, ok := dl.ownHeaders[header.Hash()]; ok {
+ continue
+ }
+ if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ dl.ownHashes = append(dl.ownHashes, header.Hash())
+ dl.ownHeaders[header.Hash()] = header
+ dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
+ }
+ return len(headers), nil
+}
+
+// InsertChain injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ for i, block := range blocks {
+ if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
+ return i, errors.New("unknown parent")
+ } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
+ return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
+ }
+ if _, ok := dl.ownHeaders[block.Hash()]; !ok {
+ dl.ownHashes = append(dl.ownHashes, block.Hash())
+ dl.ownHeaders[block.Hash()] = block.Header()
+ }
+ dl.ownBlocks[block.Hash()] = block
+ dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
+ dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty())
+ }
+ return len(blocks), nil
+}
+
+// InsertReceiptChain injects a new batch of receipts into the simulated chain.
+func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ for i := 0; i < len(blocks) && i < len(receipts); i++ {
+ if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
+ return i, errors.New("unknown owner")
+ }
+ if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ dl.ownBlocks[blocks[i].Hash()] = blocks[i]
+ dl.ownReceipts[blocks[i].Hash()] = receipts[i]
+ }
+ return len(blocks), nil
+}
+
+// Rollback removes some recently added elements from the chain.
+func (dl *downloadTester) Rollback(hashes []common.Hash) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ for i := len(hashes) - 1; i >= 0; i-- {
+ if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] {
+ dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1]
+ }
+ delete(dl.ownChainTd, hashes[i])
+ delete(dl.ownHeaders, hashes[i])
+ delete(dl.ownReceipts, hashes[i])
+ delete(dl.ownBlocks, hashes[i])
+ }
+}
+
+// newPeer registers a new block download source into the downloader.
+func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ peer := &downloadTesterPeer{dl: dl, id: id, chain: chain}
+ dl.peers[id] = peer
+ return dl.downloader.RegisterPeer(id, version, peer)
+}
+
+// dropPeer simulates a hard peer removal from the connection pool.
+func (dl *downloadTester) dropPeer(id string) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ delete(dl.peers, id)
+ dl.downloader.UnregisterPeer(id)
+}
+
+type downloadTesterPeer struct {
+ dl *downloadTester
+ id string
+ lock sync.RWMutex
+ chain *testChain
+ missingStates map[common.Hash]bool // State entries that fast sync should not return
+}
+
+// Head constructs a function to retrieve a peer's current head hash
+// and total difficulty.
+func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
+ b := dlp.chain.headBlock()
+ return b.Hash(), dlp.chain.td(b.Hash())
+}
+
+// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
+// origin; associated with a particular peer in the download tester. The returned
+// function can be used to retrieve batches of headers from the particular peer.
+func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
+ if reverse {
+ panic("reverse header requests not supported")
+ }
+
+ result := dlp.chain.headersByHash(origin, amount, skip)
+ go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
+ return nil
+}
+
+// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
+// origin; associated with a particular peer in the download tester. The returned
+// function can be used to retrieve batches of headers from the particular peer.
+func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
+ if reverse {
+ panic("reverse header requests not supported")
+ }
+
+ result := dlp.chain.headersByNumber(origin, amount, skip)
+ go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
+ return nil
+}
+
+// RequestBodies constructs a getBlockBodies method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of block bodies from the particularly requested peer.
+func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
+ txs, uncles := dlp.chain.bodies(hashes)
+ go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles)
+ return nil
+}
+
+// RequestReceipts constructs a getReceipts method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of block receipts from the particularly requested peer.
+func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
+ receipts := dlp.chain.receipts(hashes)
+ go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts)
+ return nil
+}
+
+// RequestNodeData constructs a getNodeData method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of node state data from the particularly requested peer.
+func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
+ dlp.dl.lock.RLock()
+ defer dlp.dl.lock.RUnlock()
+
+ results := make([][]byte, 0, len(hashes))
+ for _, hash := range hashes {
+ if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
+ if !dlp.missingStates[hash] {
+ results = append(results, data)
+ }
+ }
+ }
+ go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
+ return nil
+}
+
+// assertOwnChain checks if the local chain contains the correct number of items
+// of the various chain components.
+func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
+ assertOwnForkedChain(t, tester, 1, []int{length})
+}
+
+// assertOwnForkedChain checks if the local forked chain contains the correct
+// number of items of the various chain components.
+func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
+ // Initialize the counters for the first fork
+ headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks
+
+ if receipts < 0 {
+ receipts = 1
+ }
+ // Update the counters for each subsequent fork
+ for _, length := range lengths[1:] {
+ headers += length - common
+ blocks += length - common
+ receipts += length - common - fsMinFullBlocks
+ }
+ switch tester.downloader.mode {
+ case FullSync:
+ receipts = 1
+ case LightSync:
+ blocks, receipts = 1, 1
+ }
+ if hs := len(tester.ownHeaders); hs != headers {
+ t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
+ }
+ if bs := len(tester.ownBlocks); bs != blocks {
+ t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
+ }
+ if rs := len(tester.ownReceipts); rs != receipts {
+ t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
+ }
+}
+
+// Tests that simple synchronization against a canonical chain works correctly.
+// In this test common ancestor lookup should be short circuited and not require
+// binary searching.
+func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) }
+func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
+func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
+func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) }
+func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) }
+func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
+
+func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create a small enough block chain to download
+ chain := testChainBase.shorten(blockCacheItems - 15)
+ tester.newPeer("peer", protocol, chain)
+
+ // Synchronise with the peer and make sure all relevant data was retrieved
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that if a large batch of blocks are being downloaded, it is throttled
+// until the cached blocks are retrieved.
+func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) }
+func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
+func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
+func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
+func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
+
+func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create a long block chain to download and the tester
+ targetBlocks := testChainBase.len() - 1
+ tester.newPeer("peer", protocol, testChainBase)
+
+ // Wrap the importer to allow stepping
+ blocked, proceed := uint32(0), make(chan struct{})
+ tester.downloader.chainInsertHook = func(results []*fetchResult) {
+ atomic.StoreUint32(&blocked, uint32(len(results)))
+ <-proceed
+ }
+ // Start a synchronisation concurrently
+ errc := make(chan error)
+ go func() {
+ errc <- tester.sync("peer", nil, mode)
+ }()
+ // Iteratively take some blocks, always checking the retrieval count
+ for {
+ // Check the retrieval count synchronously (! reason for this ugly block)
+ tester.lock.RLock()
+ retrieved := len(tester.ownBlocks)
+ tester.lock.RUnlock()
+ if retrieved >= targetBlocks+1 {
+ break
+ }
+ // Wait a bit for sync to throttle itself
+ var cached, frozen int
+ for start := time.Now(); time.Since(start) < 3*time.Second; {
+ time.Sleep(25 * time.Millisecond)
+
+ tester.lock.Lock()
+ tester.downloader.queue.lock.Lock()
+ cached = len(tester.downloader.queue.blockDonePool)
+ if mode == FastSync {
+ if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
+ cached = receipts
+ }
+ }
+ frozen = int(atomic.LoadUint32(&blocked))
+ retrieved = len(tester.ownBlocks)
+ tester.downloader.queue.lock.Unlock()
+ tester.lock.Unlock()
+
+ if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
+ break
+ }
+ }
+ // Make sure we filled up the cache, then exhaust it
+ time.Sleep(25 * time.Millisecond) // give it a chance to screw up
+
+ tester.lock.RLock()
+ retrieved = len(tester.ownBlocks)
+ tester.lock.RUnlock()
+ if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
+ t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
+ }
+ // Permit the blocked blocks to import
+ if atomic.LoadUint32(&blocked) > 0 {
+ atomic.StoreUint32(&blocked, uint32(0))
+ proceed <- struct{}{}
+ }
+ }
+ // Check that we haven't pulled more blocks than available
+ assertOwnChain(t, tester, targetBlocks+1)
+ if err := <-errc; err != nil {
+ t.Fatalf("block synchronization failed: %v", err)
+ }
+}
+
+// Tests that simple synchronization against a forked chain works correctly. In
+// this test common ancestor lookup should *not* be short circuited, and a full
+// binary search should be executed.
+func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) }
+func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) }
+func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) }
+func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) }
+func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) }
+func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
+
+func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
+ chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
+ tester.newPeer("fork A", protocol, chainA)
+ tester.newPeer("fork B", protocol, chainB)
+
+ // Synchronise with the peer and make sure all blocks were retrieved
+ if err := tester.sync("fork A", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chainA.len())
+
+ // Synchronise with the second peer and make sure that fork is pulled too
+ if err := tester.sync("fork B", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
+}
+
+// Tests that synchronising against a much shorter but much heavyer fork works
+// corrently and is not dropped.
+func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) }
+func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) }
+func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) }
+func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) }
+func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) }
+func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) }
+
+func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
+ chainB := testChainForkHeavy.shorten(testChainBase.len() + 80)
+ tester.newPeer("light", protocol, chainA)
+ tester.newPeer("heavy", protocol, chainB)
+
+ // Synchronise with the peer and make sure all blocks were retrieved
+ if err := tester.sync("light", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chainA.len())
+
+ // Synchronise with the second peer and make sure that fork is pulled too
+ if err := tester.sync("heavy", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
+}
+
+// Tests that chain forks are contained within a certain interval of the current
+// chain head, ensuring that malicious peers cannot waste resources by feeding
+// long dead chains.
+func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) }
+func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) }
+func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) }
+func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) }
+func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) }
+func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) }
+
+func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chainA := testChainForkLightA
+ chainB := testChainForkLightB
+ tester.newPeer("original", protocol, chainA)
+ tester.newPeer("rewriter", protocol, chainB)
+
+ // Synchronise with the peer and make sure all blocks were retrieved
+ if err := tester.sync("original", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chainA.len())
+
+ // Synchronise with the second peer and ensure that the fork is rejected to being too old
+ if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor {
+ t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
+ }
+}
+
+// Tests that chain forks are contained within a certain interval of the current
+// chain head for short but heavy forks too. These are a bit special because they
+// take different ancestor lookup paths.
+func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) }
+func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) }
+func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) }
+func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) }
+func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) }
+func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) }
+
+func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create a long enough forked chain
+ chainA := testChainForkLightA
+ chainB := testChainForkHeavy
+ tester.newPeer("original", protocol, chainA)
+ tester.newPeer("heavy-rewriter", protocol, chainB)
+
+ // Synchronise with the peer and make sure all blocks were retrieved
+ if err := tester.sync("original", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chainA.len())
+
+ // Synchronise with the second peer and ensure that the fork is rejected to being too old
+ if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
+ t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
+ }
+}
+
+// Tests that an inactive downloader will not accept incoming block headers and
+// bodies.
+func TestInactiveDownloader62(t *testing.T) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Check that neither block headers nor bodies are accepted
+ if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+ if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+}
+
+// Tests that an inactive downloader will not accept incoming block headers,
+// bodies and receipts.
+func TestInactiveDownloader63(t *testing.T) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Check that neither block headers nor bodies are accepted
+ if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+ if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+ if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+}
+
+// Tests that a canceled download wipes all previously accumulated state.
+func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) }
+func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
+func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
+func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
+func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
+func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
+
+func testCancel(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chain := testChainBase.shorten(MaxHeaderFetch)
+ tester.newPeer("peer", protocol, chain)
+
+ // Make sure canceling works with a pristine downloader
+ tester.downloader.Cancel()
+ if !tester.downloader.queue.Idle() {
+ t.Errorf("download queue not idle")
+ }
+ // Synchronise with the peer, but cancel afterwards
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ tester.downloader.Cancel()
+ if !tester.downloader.queue.Idle() {
+ t.Errorf("download queue not idle")
+ }
+}
+
+// Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
+func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) }
+func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
+func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
+func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) }
+func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) }
+func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
+
+func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create various peers with various parts of the chain
+ targetPeers := 8
+ chain := testChainBase.shorten(targetPeers * 100)
+
+ for i := 0; i < targetPeers; i++ {
+ id := fmt.Sprintf("peer #%d", i)
+ tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1)))
+ }
+ if err := tester.sync("peer #0", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that synchronisations behave well in multi-version protocol environments
+// and not wreak havoc on other nodes in the network.
+func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) }
+func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
+func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
+func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) }
+func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) }
+func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
+
+func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create a small enough block chain to download
+ chain := testChainBase.shorten(blockCacheItems - 15)
+
+ // Create peers of every type
+ tester.newPeer("peer 62", 62, chain)
+ tester.newPeer("peer 63", 63, chain)
+ tester.newPeer("peer 64", 64, chain)
+
+ // Synchronise with the requested peer and make sure all blocks were retrieved
+ if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chain.len())
+
+ // Check that no peers have been dropped off
+ for _, version := range []int{62, 63, 64} {
+ peer := fmt.Sprintf("peer %d", version)
+ if _, ok := tester.peers[peer]; !ok {
+ t.Errorf("%s dropped", peer)
+ }
+ }
+}
+
+// Tests that if a block is empty (e.g. header only), no body request should be
+// made, and instead the header should be assembled into a whole block in itself.
+func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) }
+func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) }
+func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) }
+func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) }
+func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) }
+func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
+
+func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create a block chain to download
+ chain := testChainBase
+ tester.newPeer("peer", protocol, chain)
+
+ // Instrument the downloader to signal body requests
+ bodiesHave, receiptsHave := int32(0), int32(0)
+ tester.downloader.bodyFetchHook = func(headers []*types.Header) {
+ atomic.AddInt32(&bodiesHave, int32(len(headers)))
+ }
+ tester.downloader.receiptFetchHook = func(headers []*types.Header) {
+ atomic.AddInt32(&receiptsHave, int32(len(headers)))
+ }
+ // Synchronise with the peer and make sure all blocks were retrieved
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chain.len())
+
+ // Validate the number of block bodies that should have been requested
+ bodiesNeeded, receiptsNeeded := 0, 0
+ for _, block := range chain.blockm {
+ if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
+ bodiesNeeded++
+ }
+ }
+ for _, receipt := range chain.receiptm {
+ if mode == FastSync && len(receipt) > 0 {
+ receiptsNeeded++
+ }
+ }
+ if int(bodiesHave) != bodiesNeeded {
+ t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
+ }
+ if int(receiptsHave) != receiptsNeeded {
+ t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
+ }
+}
+
+// Tests that headers are enqueued continuously, preventing malicious nodes from
+// stalling the downloader by feeding gapped header chains.
+func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) }
+func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) }
+func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) }
+func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) }
+func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) }
+func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
+
+func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chain := testChainBase.shorten(blockCacheItems - 15)
+ brokenChain := chain.shorten(chain.len())
+ delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
+ tester.newPeer("attack", protocol, brokenChain)
+
+ if err := tester.sync("attack", nil, mode); err == nil {
+ t.Fatalf("succeeded attacker synchronisation")
+ }
+ // Synchronise with the valid peer and make sure sync succeeds
+ tester.newPeer("valid", protocol, chain)
+ if err := tester.sync("valid", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that if requested headers are shifted (i.e. first is missing), the queue
+// detects the invalid numbering.
+func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) }
+func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) }
+func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) }
+func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) }
+func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) }
+func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
+
+func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chain := testChainBase.shorten(blockCacheItems - 15)
+
+ // Attempt a full sync with an attacker feeding shifted headers
+ brokenChain := chain.shorten(chain.len())
+ delete(brokenChain.headerm, brokenChain.chain[1])
+ delete(brokenChain.blockm, brokenChain.chain[1])
+ delete(brokenChain.receiptm, brokenChain.chain[1])
+ tester.newPeer("attack", protocol, brokenChain)
+ if err := tester.sync("attack", nil, mode); err == nil {
+ t.Fatalf("succeeded attacker synchronisation")
+ }
+
+ // Synchronise with the valid peer and make sure sync succeeds
+ tester.newPeer("valid", protocol, chain)
+ if err := tester.sync("valid", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ assertOwnChain(t, tester, chain.len())
+}
+
+// Tests that upon detecting an invalid header, the recent ones are rolled back
+// for various failure scenarios. Afterwards a full sync is attempted to make
+// sure no state was corrupted.
+func TestInvalidHeaderRollback63Fast(t *testing.T) { testInvalidHeaderRollback(t, 63, FastSync) }
+func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) }
+func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
+
+func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ // Create a small enough block chain to download
+ targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
+ chain := testChainBase.shorten(targetBlocks)
+
+ // Attempt to sync with an attacker that feeds junk during the fast sync phase.
+ // This should result in the last fsHeaderSafetyNet headers being rolled back.
+ missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
+ fastAttackChain := chain.shorten(chain.len())
+ delete(fastAttackChain.headerm, fastAttackChain.chain[missing])
+ tester.newPeer("fast-attack", protocol, fastAttackChain)
+
+ if err := tester.sync("fast-attack", nil, mode); err == nil {
+ t.Fatalf("succeeded fast attacker synchronisation")
+ }
+ if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
+ t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
+ }
+
+ // Attempt to sync with an attacker that feeds junk during the block import phase.
+ // This should result in both the last fsHeaderSafetyNet number of headers being
+ // rolled back, and also the pivot point being reverted to a non-block status.
+ missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
+ blockAttackChain := chain.shorten(chain.len())
+ delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in
+ delete(blockAttackChain.headerm, blockAttackChain.chain[missing])
+ tester.newPeer("block-attack", protocol, blockAttackChain)
+
+ if err := tester.sync("block-attack", nil, mode); err == nil {
+ t.Fatalf("succeeded block attacker synchronisation")
+ }
+ if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
+ t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
+ }
+ if mode == FastSync {
+ if head := tester.CurrentBlock().NumberU64(); head != 0 {
+ t.Errorf("fast sync pivot block #%d not rolled back", head)
+ }
+ }
+
+ // Attempt to sync with an attacker that withholds promised blocks after the
+ // fast sync pivot point. This could be a trial to leave the node with a bad
+ // but already imported pivot block.
+ withholdAttackChain := chain.shorten(chain.len())
+ tester.newPeer("withhold-attack", protocol, withholdAttackChain)
+ tester.downloader.syncInitHook = func(uint64, uint64) {
+ for i := missing; i < withholdAttackChain.len(); i++ {
+ delete(withholdAttackChain.headerm, withholdAttackChain.chain[i])
+ }
+ tester.downloader.syncInitHook = nil
+ }
+ if err := tester.sync("withhold-attack", nil, mode); err == nil {
+ t.Fatalf("succeeded withholding attacker synchronisation")
+ }
+ if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
+ t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
+ }
+ if mode == FastSync {
+ if head := tester.CurrentBlock().NumberU64(); head != 0 {
+ t.Errorf("fast sync pivot block #%d not rolled back", head)
+ }
+ }
+
+ // synchronise with the valid peer and make sure sync succeeds. Since the last rollback
+ // should also disable fast syncing for this process, verify that we did a fresh full
+ // sync. Note, we can't assert anything about the receipts since we won't purge the
+ // database of them, hence we can't use assertOwnChain.
+ tester.newPeer("valid", protocol, chain)
+ if err := tester.sync("valid", nil, mode); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ if hs := len(tester.ownHeaders); hs != chain.len() {
+ t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len())
+ }
+ if mode != LightSync {
+ if bs := len(tester.ownBlocks); bs != chain.len() {
+ t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
+ }
+ }
+}
+
+// Tests that a peer advertising an high TD doesn't get to stall the downloader
+// afterwards by not sending any useful hashes.
+func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) }
+func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
+func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
+func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) }
+func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) }
+func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
+
+func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+
+ chain := testChainBase.shorten(1)
+ tester.newPeer("attack", protocol, chain)
+ if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
+ }
+}
+
+// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
+func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
+func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
+func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
+
+func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
+ t.Parallel()
+
+ // Define the disconnection requirement for individual hash fetch errors
+ tests := []struct {
+ result error
+ drop bool
+ }{
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ }
+ // Run the tests and check disconnection status
+ tester := newTester()
+ defer tester.terminate()
+ chain := testChainBase.shorten(1)
+
+ for i, tt := range tests {
+ // Register a new peer and ensure it's presence
+ id := fmt.Sprintf("test %d", i)
+ if err := tester.newPeer(id, protocol, chain); err != nil {
+ t.Fatalf("test %d: failed to register new peer: %v", i, err)
+ }
+ if _, ok := tester.peers[id]; !ok {
+ t.Fatalf("test %d: registered peer not found", i)
+ }
+ // Simulate a synchronisation and check the required result
+ tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
+
+ tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
+ if _, ok := tester.peers[id]; !ok != tt.drop {
+ t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
+ }
+ }
+}
+
+// Tests that synchronisation progress (origin block number, current block number
+// and highest block number) is tracked and updated correctly.
+func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) }
+func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) }
+func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) }
+func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) }
+func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
+func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
+
+func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+ chain := testChainBase.shorten(blockCacheItems - 15)
+
+ // Set a sync init hook to catch progress changes
+ starting := make(chan struct{})
+ progress := make(chan struct{})
+
+ tester.downloader.syncInitHook = func(origin, latest uint64) {
+ starting <- struct{}{}
+ <-progress
+ }
+ checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+ // Synchronise half the blocks and check initial progress
+ tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2))
+ pending := new(sync.WaitGroup)
+ pending.Add(1)
+
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("peer-half", nil, mode); err != nil {
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+ HighestBlock: uint64(chain.len()/2 - 1),
+ })
+ progress <- struct{}{}
+ pending.Wait()
+
+ // Synchronise all the blocks and check continuation progress
+ tester.newPeer("peer-full", protocol, chain)
+ pending.Add(1)
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("peer-full", nil, mode); err != nil {
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
+ StartingBlock: uint64(chain.len()/2 - 1),
+ CurrentBlock: uint64(chain.len()/2 - 1),
+ HighestBlock: uint64(chain.len() - 1),
+ })
+
+ // Check final progress after successful sync
+ progress <- struct{}{}
+ pending.Wait()
+ checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+ StartingBlock: uint64(chain.len()/2 - 1),
+ CurrentBlock: uint64(chain.len() - 1),
+ HighestBlock: uint64(chain.len() - 1),
+ })
+}
+
+func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
+ t.Helper()
+ p := d.Progress()
+ p.KnownStates, p.PulledStates = 0, 0
+ want.KnownStates, want.PulledStates = 0, 0
+ if p != want {
+ t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want)
+ }
+}
+
+// Tests that synchronisation progress (origin block number and highest block
+// number) is tracked and updated correctly in case of a fork (or manual head
+// revertal).
+func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) }
+func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) }
+func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) }
+func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) }
+func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) }
+func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
+
+func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+ chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch)
+ chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch)
+
+ // Set a sync init hook to catch progress changes
+ starting := make(chan struct{})
+ progress := make(chan struct{})
+
+ tester.downloader.syncInitHook = func(origin, latest uint64) {
+ starting <- struct{}{}
+ <-progress
+ }
+ checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+ // Synchronise with one of the forks and check progress
+ tester.newPeer("fork A", protocol, chainA)
+ pending := new(sync.WaitGroup)
+ pending.Add(1)
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("fork A", nil, mode); err != nil {
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+ }
+ }()
+ <-starting
+
+ checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+ HighestBlock: uint64(chainA.len() - 1),
+ })
+ progress <- struct{}{}
+ pending.Wait()
+
+ // Simulate a successful sync above the fork
+ tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
+
+ // Synchronise with the second fork and check progress resets
+ tester.newPeer("fork B", protocol, chainB)
+ pending.Add(1)
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("fork B", nil, mode); err != nil {
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{
+ StartingBlock: uint64(testChainBase.len()) - 1,
+ CurrentBlock: uint64(chainA.len() - 1),
+ HighestBlock: uint64(chainB.len() - 1),
+ })
+
+ // Check final progress after successful sync
+ progress <- struct{}{}
+ pending.Wait()
+ checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+ StartingBlock: uint64(testChainBase.len()) - 1,
+ CurrentBlock: uint64(chainB.len() - 1),
+ HighestBlock: uint64(chainB.len() - 1),
+ })
+}
+
+// Tests that if synchronisation is aborted due to some failure, then the progress
+// origin is not updated in the next sync cycle, as it should be considered the
+// continuation of the previous sync and not a new instance.
+func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) }
+func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) }
+func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) }
+func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) }
+func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) }
+func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
+
+func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+ chain := testChainBase.shorten(blockCacheItems - 15)
+
+ // Set a sync init hook to catch progress changes
+ starting := make(chan struct{})
+ progress := make(chan struct{})
+
+ tester.downloader.syncInitHook = func(origin, latest uint64) {
+ starting <- struct{}{}
+ <-progress
+ }
+ checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+ // Attempt a full sync with a faulty peer
+ brokenChain := chain.shorten(chain.len())
+ missing := brokenChain.len() / 2
+ delete(brokenChain.headerm, brokenChain.chain[missing])
+ delete(brokenChain.blockm, brokenChain.chain[missing])
+ delete(brokenChain.receiptm, brokenChain.chain[missing])
+ tester.newPeer("faulty", protocol, brokenChain)
+
+ pending := new(sync.WaitGroup)
+ pending.Add(1)
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("faulty", nil, mode); err == nil {
+ panic("succeeded faulty synchronisation")
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+ HighestBlock: uint64(brokenChain.len() - 1),
+ })
+ progress <- struct{}{}
+ pending.Wait()
+ afterFailedSync := tester.downloader.Progress()
+
+ // Synchronise with a good peer and check that the progress origin remind the same
+ // after a failure
+ tester.newPeer("valid", protocol, chain)
+ pending.Add(1)
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("valid", nil, mode); err != nil {
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "completing", afterFailedSync)
+
+ // Check final progress after successful sync
+ progress <- struct{}{}
+ pending.Wait()
+ checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+ CurrentBlock: uint64(chain.len() - 1),
+ HighestBlock: uint64(chain.len() - 1),
+ })
+}
+
+// Tests that if an attacker fakes a chain height, after the attack is detected,
+// the progress height is successfully reduced at the next sync invocation.
+func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) }
+func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) }
+func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) }
+func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) }
+func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) }
+func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
+
+func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
+ tester := newTester()
+ defer tester.terminate()
+ chain := testChainBase.shorten(blockCacheItems - 15)
+
+ // Set a sync init hook to catch progress changes
+ starting := make(chan struct{})
+ progress := make(chan struct{})
+ tester.downloader.syncInitHook = func(origin, latest uint64) {
+ starting <- struct{}{}
+ <-progress
+ }
+ checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
+
+ // Create and sync with an attacker that promises a higher chain than available.
+ brokenChain := chain.shorten(chain.len())
+ numMissing := 5
+ for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- {
+ delete(brokenChain.headerm, brokenChain.chain[i])
+ }
+ tester.newPeer("attack", protocol, brokenChain)
+
+ pending := new(sync.WaitGroup)
+ pending.Add(1)
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("attack", nil, mode); err == nil {
+ panic("succeeded attacker synchronisation")
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
+ HighestBlock: uint64(brokenChain.len() - 1),
+ })
+ progress <- struct{}{}
+ pending.Wait()
+ afterFailedSync := tester.downloader.Progress()
+
+ // Synchronise with a good peer and check that the progress height has been reduced to
+ // the true value.
+ validChain := chain.shorten(chain.len() - numMissing)
+ tester.newPeer("valid", protocol, validChain)
+ pending.Add(1)
+
+ go func() {
+ defer pending.Done()
+ if err := tester.sync("valid", nil, mode); err != nil {
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
+ }
+ }()
+ <-starting
+ checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
+ CurrentBlock: afterFailedSync.CurrentBlock,
+ HighestBlock: uint64(validChain.len() - 1),
+ })
+
+ // Check final progress after successful sync.
+ progress <- struct{}{}
+ pending.Wait()
+ checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
+ CurrentBlock: uint64(validChain.len() - 1),
+ HighestBlock: uint64(validChain.len() - 1),
+ })
+}
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang(t *testing.T) {
+ t.Parallel()
+
+ testCases := []struct {
+ protocol int
+ syncMode SyncMode
+ }{
+ {62, FullSync},
+ {63, FullSync},
+ {63, FastSync},
+ {64, FullSync},
+ {64, FastSync},
+ {64, LightSync},
+ }
+ for _, tc := range testCases {
+ t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
+ t.Parallel()
+ testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
+ })
+ }
+}
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+ master := newTester()
+ defer master.terminate()
+ chain := testChainBase.shorten(15)
+
+ for i := 0; i < 200; i++ {
+ tester := newTester()
+ tester.peerDb = master.peerDb
+ tester.newPeer("peer", protocol, chain)
+
+ // Whenever the downloader requests headers, flood it with
+ // a lot of unrequested header deliveries.
+ tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
+ peer: tester.downloader.peers.peers["peer"].peer,
+ tester: tester,
+ }
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Errorf("test %d: sync failed: %v", i, err)
+ }
+ tester.terminate()
+ }
+}
+
+type floodingTestPeer struct {
+ peer Peer
+ tester *downloadTester
+}
+
+func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
+func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
+ return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
+}
+func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
+ return ftp.peer.RequestBodies(hashes)
+}
+func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
+ return ftp.peer.RequestReceipts(hashes)
+}
+func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
+ return ftp.peer.RequestNodeData(hashes)
+}
+
+func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
+ deliveriesDone := make(chan struct{}, 500)
+ for i := 0; i < cap(deliveriesDone)-1; i++ {
+ peer := fmt.Sprintf("fake-peer%d", i)
+ go func() {
+ ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
+ deliveriesDone <- struct{}{}
+ }()
+ }
+
+ // None of the extra deliveries should block.
+ timeout := time.After(60 * time.Second)
+ launched := false
+ for i := 0; i < cap(deliveriesDone); i++ {
+ select {
+ case <-deliveriesDone:
+ if !launched {
+ // Start delivering the requested headers
+ // after one of the flooding responses has arrived.
+ go func() {
+ ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
+ deliveriesDone <- struct{}{}
+ }()
+ launched = true
+ }
+ case <-timeout:
+ panic("blocked")
+ }
+ }
+ return nil
+}
diff --git a/dex/downloader/events.go b/dex/downloader/events.go
new file mode 100644
index 000000000..64905b8f2
--- /dev/null
+++ b/dex/downloader/events.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+type DoneEvent struct{}
+type StartEvent struct{}
+type FailedEvent struct{ Err error }
diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go
new file mode 100644
index 000000000..3e29357ba
--- /dev/null
+++ b/dex/downloader/fakepeer.go
@@ -0,0 +1,161 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "math/big"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/core/rawdb"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/ethdb"
+)
+
+// FakePeer is a mock downloader peer that operates on a local database instance
+// instead of being an actual live node. It's useful for testing and to implement
+// sync commands from an existing local database.
+type FakePeer struct {
+ id string
+ db ethdb.Database
+ hc *core.HeaderChain
+ dl *Downloader
+}
+
+// NewFakePeer creates a new mock downloader peer with the given data sources.
+func NewFakePeer(id string, db ethdb.Database, hc *core.HeaderChain, dl *Downloader) *FakePeer {
+ return &FakePeer{id: id, db: db, hc: hc, dl: dl}
+}
+
+// Head implements downloader.Peer, returning the current head hash and number
+// of the best known header.
+func (p *FakePeer) Head() (common.Hash, *big.Int) {
+ header := p.hc.CurrentHeader()
+ return header.Hash(), header.Number
+}
+
+// RequestHeadersByHash implements downloader.Peer, returning a batch of headers
+// defined by the origin hash and the associated query parameters.
+func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, reverse bool) error {
+ var (
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < amount {
+ origin := p.hc.GetHeaderByHash(hash)
+ if origin == nil {
+ break
+ }
+ number := origin.Number.Uint64()
+ headers = append(headers, origin)
+ if reverse {
+ for i := 0; i <= skip; i++ {
+ if header := p.hc.GetHeader(hash, number); header != nil {
+ hash = header.ParentHash
+ number--
+ } else {
+ unknown = true
+ break
+ }
+ }
+ } else {
+ var (
+ current = origin.Number.Uint64()
+ next = current + uint64(skip) + 1
+ )
+ if header := p.hc.GetHeaderByNumber(next); header != nil {
+ if p.hc.GetBlockHashesFromHash(header.Hash(), uint64(skip+1))[skip] == hash {
+ hash = header.Hash()
+ } else {
+ unknown = true
+ }
+ } else {
+ unknown = true
+ }
+ }
+ }
+ p.dl.DeliverHeaders(p.id, headers)
+ return nil
+}
+
+// RequestHeadersByNumber implements downloader.Peer, returning a batch of headers
+// defined by the origin number and the associated query parameters.
+func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, reverse bool) error {
+ var (
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < amount {
+ origin := p.hc.GetHeaderByNumber(number)
+ if origin == nil {
+ break
+ }
+ if reverse {
+ if number >= uint64(skip+1) {
+ number -= uint64(skip + 1)
+ } else {
+ unknown = true
+ }
+ } else {
+ number += uint64(skip + 1)
+ }
+ headers = append(headers, origin)
+ }
+ p.dl.DeliverHeaders(p.id, headers)
+ return nil
+}
+
+// RequestBodies implements downloader.Peer, returning a batch of block bodies
+// corresponding to the specified block hashes.
+func (p *FakePeer) RequestBodies(hashes []common.Hash) error {
+ var (
+ txs [][]*types.Transaction
+ uncles [][]*types.Header
+ )
+ for _, hash := range hashes {
+ block := rawdb.ReadBlock(p.db, hash, *p.hc.GetBlockNumber(hash))
+
+ txs = append(txs, block.Transactions())
+ uncles = append(uncles, block.Uncles())
+ }
+ p.dl.DeliverBodies(p.id, txs, uncles)
+ return nil
+}
+
+// RequestReceipts implements downloader.Peer, returning a batch of transaction
+// receipts corresponding to the specified block hashes.
+func (p *FakePeer) RequestReceipts(hashes []common.Hash) error {
+ var receipts [][]*types.Receipt
+ for _, hash := range hashes {
+ receipts = append(receipts, rawdb.ReadReceipts(p.db, hash, *p.hc.GetBlockNumber(hash)))
+ }
+ p.dl.DeliverReceipts(p.id, receipts)
+ return nil
+}
+
+// RequestNodeData implements downloader.Peer, returning a batch of state trie
+// nodes corresponding to the specified trie hashes.
+func (p *FakePeer) RequestNodeData(hashes []common.Hash) error {
+ var data [][]byte
+ for _, hash := range hashes {
+ if entry, err := p.db.Get(hash.Bytes()); err == nil {
+ data = append(data, entry)
+ }
+ }
+ p.dl.DeliverNodeData(p.id, data)
+ return nil
+}
diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go
new file mode 100644
index 000000000..0d6041712
--- /dev/null
+++ b/dex/downloader/metrics.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the metrics collected by the downloader.
+
+package downloader
+
+import (
+ "github.com/dexon-foundation/dexon/metrics"
+)
+
+var (
+ headerInMeter = metrics.NewRegisteredMeter("dex/downloader/headers/in", nil)
+ headerReqTimer = metrics.NewRegisteredTimer("dex/downloader/headers/req", nil)
+ headerDropMeter = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil)
+ headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil)
+
+ bodyInMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil)
+ bodyReqTimer = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil)
+ bodyDropMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil)
+ bodyTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/timeout", nil)
+
+ receiptInMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/in", nil)
+ receiptReqTimer = metrics.NewRegisteredTimer("dex/downloader/receipts/req", nil)
+ receiptDropMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/drop", nil)
+ receiptTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/timeout", nil)
+
+ stateInMeter = metrics.NewRegisteredMeter("dex/downloader/states/in", nil)
+ stateDropMeter = metrics.NewRegisteredMeter("dex/downloader/states/drop", nil)
+)
diff --git a/dex/downloader/modes.go b/dex/downloader/modes.go
new file mode 100644
index 000000000..8ecdf91f1
--- /dev/null
+++ b/dex/downloader/modes.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import "fmt"
+
+// SyncMode represents the synchronisation mode of the downloader.
+type SyncMode int
+
+const (
+ FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
+ FastSync // Quickly download the headers, full sync only at the chain head
+ LightSync // Download only the headers and terminate afterwards
+)
+
+func (mode SyncMode) IsValid() bool {
+ return mode >= FullSync && mode <= LightSync
+}
+
+// String implements the stringer interface.
+func (mode SyncMode) String() string {
+ switch mode {
+ case FullSync:
+ return "full"
+ case FastSync:
+ return "fast"
+ case LightSync:
+ return "light"
+ default:
+ return "unknown"
+ }
+}
+
+func (mode SyncMode) MarshalText() ([]byte, error) {
+ switch mode {
+ case FullSync:
+ return []byte("full"), nil
+ case FastSync:
+ return []byte("fast"), nil
+ case LightSync:
+ return []byte("light"), nil
+ default:
+ return nil, fmt.Errorf("unknown sync mode %d", mode)
+ }
+}
+
+func (mode *SyncMode) UnmarshalText(text []byte) error {
+ switch string(text) {
+ case "full":
+ *mode = FullSync
+ case "fast":
+ *mode = FastSync
+ case "light":
+ *mode = LightSync
+ default:
+ return fmt.Errorf(`unknown sync mode %q, want "full", "fast" or "light"`, text)
+ }
+ return nil
+}
diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go
new file mode 100644
index 000000000..1fd82fbe3
--- /dev/null
+++ b/dex/downloader/peer.go
@@ -0,0 +1,573 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the active peer-set of the downloader, maintaining both failures
+// as well as reputation metrics to prioritize the block retrievals.
+
+package downloader
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/log"
+)
+
+const (
+ maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+ measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
+)
+
+var (
+ errAlreadyFetching = errors.New("already fetching blocks from peer")
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+)
+
+// peerConnection represents an active peer from which hashes and blocks are retrieved.
+type peerConnection struct {
+ id string // Unique identifier of the peer
+
+ headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
+ blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
+ receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+ stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
+
+ headerThroughput float64 // Number of headers measured to be retrievable per second
+ blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
+ receiptThroughput float64 // Number of receipts measured to be retrievable per second
+ stateThroughput float64 // Number of node data pieces measured to be retrievable per second
+
+ rtt time.Duration // Request round trip time to track responsiveness (QoS)
+
+ headerStarted time.Time // Time instance when the last header fetch was started
+ blockStarted time.Time // Time instance when the last block (body) fetch was started
+ receiptStarted time.Time // Time instance when the last receipt fetch was started
+ stateStarted time.Time // Time instance when the last node data fetch was started
+
+ lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
+
+ peer Peer
+
+ version int // Eth protocol version number to switch strategies
+ log log.Logger // Contextual logger to add extra infos to peer logs
+ lock sync.RWMutex
+}
+
+// LightPeer encapsulates the methods required to synchronise with a remote light peer.
+type LightPeer interface {
+ Head() (common.Hash, *big.Int)
+ RequestHeadersByHash(common.Hash, int, int, bool) error
+ RequestHeadersByNumber(uint64, int, int, bool) error
+}
+
+// Peer encapsulates the methods required to synchronise with a remote full peer.
+type Peer interface {
+ LightPeer
+ RequestBodies([]common.Hash) error
+ RequestReceipts([]common.Hash) error
+ RequestNodeData([]common.Hash) error
+}
+
+// lightPeerWrapper wraps a LightPeer struct, stubbing out the Peer-only methods.
+type lightPeerWrapper struct {
+ peer LightPeer
+}
+
+func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
+func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error {
+ return w.peer.RequestHeadersByHash(h, amount, skip, reverse)
+}
+func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error {
+ return w.peer.RequestHeadersByNumber(i, amount, skip, reverse)
+}
+func (w *lightPeerWrapper) RequestBodies([]common.Hash) error {
+ panic("RequestBodies not supported in light client mode sync")
+}
+func (w *lightPeerWrapper) RequestReceipts([]common.Hash) error {
+ panic("RequestReceipts not supported in light client mode sync")
+}
+func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error {
+ panic("RequestNodeData not supported in light client mode sync")
+}
+
+// newPeerConnection creates a new downloader peer.
+func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection {
+ return &peerConnection{
+ id: id,
+ lacking: make(map[common.Hash]struct{}),
+
+ peer: peer,
+
+ version: version,
+ log: logger,
+ }
+}
+
+// Reset clears the internal state of a peer entity.
+func (p *peerConnection) Reset() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ atomic.StoreInt32(&p.headerIdle, 0)
+ atomic.StoreInt32(&p.blockIdle, 0)
+ atomic.StoreInt32(&p.receiptIdle, 0)
+ atomic.StoreInt32(&p.stateIdle, 0)
+
+ p.headerThroughput = 0
+ p.blockThroughput = 0
+ p.receiptThroughput = 0
+ p.stateThroughput = 0
+
+ p.lacking = make(map[common.Hash]struct{})
+}
+
+// FetchHeaders sends a header retrieval request to the remote peer.
+func (p *peerConnection) FetchHeaders(from uint64, count int) error {
+ // Sanity check the protocol version
+ if p.version < 62 {
+ panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
+ }
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.headerStarted = time.Now()
+
+ // Issue the header retrieval request (absolut upwards without gaps)
+ go p.peer.RequestHeadersByNumber(from, count, 0, false)
+
+ return nil
+}
+
+// FetchBodies sends a block body retrieval request to the remote peer.
+func (p *peerConnection) FetchBodies(request *fetchRequest) error {
+ // Sanity check the protocol version
+ if p.version < 62 {
+ panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version))
+ }
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.blockStarted = time.Now()
+
+ // Convert the header set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Headers))
+ for _, header := range request.Headers {
+ hashes = append(hashes, header.Hash())
+ }
+ go p.peer.RequestBodies(hashes)
+
+ return nil
+}
+
+// FetchReceipts sends a receipt retrieval request to the remote peer.
+func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
+ // Sanity check the protocol version
+ if p.version < 63 {
+ panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version))
+ }
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.receiptStarted = time.Now()
+
+ // Convert the header set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Headers))
+ for _, header := range request.Headers {
+ hashes = append(hashes, header.Hash())
+ }
+ go p.peer.RequestReceipts(hashes)
+
+ return nil
+}
+
+// FetchNodeData sends a node state data retrieval request to the remote peer.
+func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
+ // Sanity check the protocol version
+ if p.version < 63 {
+ panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
+ }
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.stateStarted = time.Now()
+
+ go p.peer.RequestNodeData(hashes)
+
+ return nil
+}
+
+// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
+// requests. Its estimated header retrieval throughput is updated with that measured
+// just now.
+func (p *peerConnection) SetHeadersIdle(delivered int) {
+ p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
+}
+
+// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
+// requests. Its estimated body retrieval throughput is updated with that measured
+// just now.
+func (p *peerConnection) SetBodiesIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
+}
+
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
+// retrieval requests. Its estimated receipt retrieval throughput is updated
+// with that measured just now.
+func (p *peerConnection) SetReceiptsIdle(delivered int) {
+ p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
+}
+
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
+// data retrieval requests. Its estimated state retrieval throughput is updated
+// with that measured just now.
+func (p *peerConnection) SetNodeDataIdle(delivered int) {
+ p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
+}
+
+// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its estimated retrieval throughput is updated with that measured just now.
+func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+ // Irrelevant of the scaling, make sure the peer ends up idle
+ defer atomic.StoreInt32(idle, 0)
+
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
+ if delivered == 0 {
+ *throughput = 0
+ return
+ }
+ // Otherwise update the throughput with a new measurement
+ elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
+ measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
+
+ *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
+ p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
+
+ p.log.Trace("Peer throughput measurements updated",
+ "hps", p.headerThroughput, "bps", p.blockThroughput,
+ "rps", p.receiptThroughput, "sps", p.stateThroughput,
+ "miss", len(p.lacking), "rtt", p.rtt)
+}
+
+// HeaderCapacity retrieves the peers header download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
+}
+
+// BlockCapacity retrieves the peers block download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch)))
+}
+
+// ReceiptCapacity retrieves the peers receipt download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch)))
+}
+
+// NodeDataCapacity retrieves the peers state download allowance based on its
+// previously discovered throughput.
+func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch)))
+}
+
+// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
+// that a peer is known not to have (i.e. have been requested before). If the
+// set reaches its maximum allowed capacity, items are randomly dropped off.
+func (p *peerConnection) MarkLacking(hash common.Hash) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ for len(p.lacking) >= maxLackingHashes {
+ for drop := range p.lacking {
+ delete(p.lacking, drop)
+ break
+ }
+ }
+ p.lacking[hash] = struct{}{}
+}
+
+// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
+// list (i.e. whether we know that the peer does not have it).
+func (p *peerConnection) Lacks(hash common.Hash) bool {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ _, ok := p.lacking[hash]
+ return ok
+}
+
+// peerSet represents the collection of active peer participating in the chain
+// download procedure.
+type peerSet struct {
+ peers map[string]*peerConnection
+ newPeerFeed event.Feed
+ peerDropFeed event.Feed
+ lock sync.RWMutex
+}
+
+// newPeerSet creates a new peer set top track the active download sources.
+func newPeerSet() *peerSet {
+ return &peerSet{
+ peers: make(map[string]*peerConnection),
+ }
+}
+
+// SubscribeNewPeers subscribes to peer arrival events.
+func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
+ return ps.newPeerFeed.Subscribe(ch)
+}
+
+// SubscribePeerDrops subscribes to peer departure events.
+func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
+ return ps.peerDropFeed.Subscribe(ch)
+}
+
+// Reset iterates over the current peer set, and resets each of the known peers
+// to prepare for a next batch of block retrieval.
+func (ps *peerSet) Reset() {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ for _, peer := range ps.peers {
+ peer.Reset()
+ }
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+//
+// The method also sets the starting throughput values of the new peer to the
+// average of all existing peers, to give it a realistic chance of being used
+// for data retrievals.
+func (ps *peerSet) Register(p *peerConnection) error {
+ // Retrieve the current median RTT as a sane default
+ p.rtt = ps.medianRTT()
+
+ // Register the new peer with some meaningful defaults
+ ps.lock.Lock()
+ if _, ok := ps.peers[p.id]; ok {
+ ps.lock.Unlock()
+ return errAlreadyRegistered
+ }
+ if len(ps.peers) > 0 {
+ p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
+
+ for _, peer := range ps.peers {
+ peer.lock.RLock()
+ p.headerThroughput += peer.headerThroughput
+ p.blockThroughput += peer.blockThroughput
+ p.receiptThroughput += peer.receiptThroughput
+ p.stateThroughput += peer.stateThroughput
+ peer.lock.RUnlock()
+ }
+ p.headerThroughput /= float64(len(ps.peers))
+ p.blockThroughput /= float64(len(ps.peers))
+ p.receiptThroughput /= float64(len(ps.peers))
+ p.stateThroughput /= float64(len(ps.peers))
+ }
+ ps.peers[p.id] = p
+ ps.lock.Unlock()
+
+ ps.newPeerFeed.Send(p)
+ return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+ ps.lock.Lock()
+ p, ok := ps.peers[id]
+ if !ok {
+ defer ps.lock.Unlock()
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
+ ps.lock.Unlock()
+
+ ps.peerDropFeed.Send(p)
+ return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peerConnection {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return len(ps.peers)
+}
+
+// AllPeers retrieves a flat list of all the peers within the set.
+func (ps *peerSet) AllPeers() []*peerConnection {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peerConnection, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ list = append(list, p)
+ }
+ return list
+}
+
+// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
+// within the active peer set, ordered by their reputation.
+func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
+ return atomic.LoadInt32(&p.headerIdle) == 0
+ }
+ throughput := func(p *peerConnection) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.headerThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
+}
+
+// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
+// the active peer set, ordered by their reputation.
+func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
+ }
+ throughput := func(p *peerConnection) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
+}
+
+// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
+// within the active peer set, ordered by their reputation.
+func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
+ return atomic.LoadInt32(&p.receiptIdle) == 0
+ }
+ throughput := func(p *peerConnection) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.receiptThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
+}
+
+// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
+// peers within the active peer set, ordered by their reputation.
+func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
+ return atomic.LoadInt32(&p.stateIdle) == 0
+ }
+ throughput := func(p *peerConnection) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.stateThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
+}
+
+// idlePeers retrieves a flat list of all currently idle peers satisfying the
+// protocol version constraints, using the provided function to check idleness.
+// The resulting set of peers are sorted by their measure throughput.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
+ for _, p := range ps.peers {
+ if p.version >= minProtocol && p.version <= maxProtocol {
+ if idleCheck(p) {
+ idle = append(idle, p)
+ }
+ total++
+ }
+ }
+ for i := 0; i < len(idle); i++ {
+ for j := i + 1; j < len(idle); j++ {
+ if throughput(idle[i]) < throughput(idle[j]) {
+ idle[i], idle[j] = idle[j], idle[i]
+ }
+ }
+ }
+ return idle, total
+}
+
+// medianRTT returns the median RTT of the peerset, considering only the tuning
+// peers if there are more peers available.
+func (ps *peerSet) medianRTT() time.Duration {
+ // Gather all the currently measured round trip times
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ rtts := make([]float64, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ p.lock.RLock()
+ rtts = append(rtts, float64(p.rtt))
+ p.lock.RUnlock()
+ }
+ sort.Float64s(rtts)
+
+ median := rttMaxEstimate
+ if qosTuningPeers <= len(rtts) {
+ median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers
+ } else if len(rtts) > 0 {
+ median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
+ }
+ // Restrict the RTT into some QoS defaults, irrelevant of true RTT
+ if median < rttMinEstimate {
+ median = rttMinEstimate
+ }
+ if median > rttMaxEstimate {
+ median = rttMaxEstimate
+ }
+ return median
+}
diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go
new file mode 100644
index 000000000..12c75e793
--- /dev/null
+++ b/dex/downloader/queue.go
@@ -0,0 +1,885 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the block download scheduler to collect download tasks and schedule
+// them in an ordered, and throttled way.
+
+package downloader
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/common/prque"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/metrics"
+)
+
+var (
+ blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
+ blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
+ blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
+)
+
+var (
+ errNoFetchesPending = errors.New("no fetches pending")
+ errStaleDelivery = errors.New("stale delivery")
+)
+
+// fetchRequest is a currently running data retrieval operation.
+type fetchRequest struct {
+ Peer *peerConnection // Peer to which the request was sent
+ From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
+ Headers []*types.Header // [eth/62] Requested headers, sorted by request order
+ Time time.Time // Time when the request was made
+}
+
+// fetchResult is a struct collecting partial results from data fetchers until
+// all outstanding pieces complete and the result as a whole can be processed.
+type fetchResult struct {
+ Pending int // Number of data fetches still pending
+ Hash common.Hash // Hash of the header to prevent recalculating
+
+ Header *types.Header
+ Uncles []*types.Header
+ Transactions types.Transactions
+ Receipts types.Receipts
+}
+
+// queue represents hashes that are either need fetching or are being fetched
+type queue struct {
+ mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
+
+ // Headers are "special", they download in batches, supported by a skeleton chain
+ headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
+ headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
+ headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
+ headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
+ headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
+ headerProced int // [eth/62] Number of headers already processed from the results
+ headerOffset uint64 // [eth/62] Number of the first header in the result cache
+ headerContCh chan bool // [eth/62] Channel to notify when header download finishes
+
+ // All data retrievals below are based on an already assembles header chain
+ blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
+ blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
+ blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
+ blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
+
+ receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
+ receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
+ receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
+ receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
+
+ resultCache []*fetchResult // Downloaded but not yet delivered fetch results
+ resultOffset uint64 // Offset of the first cached fetch result in the block chain
+ resultSize common.StorageSize // Approximate size of a block (exponential moving average)
+
+ lock *sync.Mutex
+ active *sync.Cond
+ closed bool
+}
+
+// newQueue creates a new download queue for scheduling block retrieval.
+func newQueue() *queue {
+ lock := new(sync.Mutex)
+ return &queue{
+ headerPendPool: make(map[string]*fetchRequest),
+ headerContCh: make(chan bool),
+ blockTaskPool: make(map[common.Hash]*types.Header),
+ blockTaskQueue: prque.New(nil),
+ blockPendPool: make(map[string]*fetchRequest),
+ blockDonePool: make(map[common.Hash]struct{}),
+ receiptTaskPool: make(map[common.Hash]*types.Header),
+ receiptTaskQueue: prque.New(nil),
+ receiptPendPool: make(map[string]*fetchRequest),
+ receiptDonePool: make(map[common.Hash]struct{}),
+ resultCache: make([]*fetchResult, blockCacheItems),
+ active: sync.NewCond(lock),
+ lock: lock,
+ }
+}
+
+// Reset clears out the queue contents.
+func (q *queue) Reset() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ q.closed = false
+ q.mode = FullSync
+
+ q.headerHead = common.Hash{}
+ q.headerPendPool = make(map[string]*fetchRequest)
+
+ q.blockTaskPool = make(map[common.Hash]*types.Header)
+ q.blockTaskQueue.Reset()
+ q.blockPendPool = make(map[string]*fetchRequest)
+ q.blockDonePool = make(map[common.Hash]struct{})
+
+ q.receiptTaskPool = make(map[common.Hash]*types.Header)
+ q.receiptTaskQueue.Reset()
+ q.receiptPendPool = make(map[string]*fetchRequest)
+ q.receiptDonePool = make(map[common.Hash]struct{})
+
+ q.resultCache = make([]*fetchResult, blockCacheItems)
+ q.resultOffset = 0
+}
+
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+ q.lock.Lock()
+ q.closed = true
+ q.lock.Unlock()
+ q.active.Broadcast()
+}
+
+// PendingHeaders retrieves the number of header requests pending for retrieval.
+func (q *queue) PendingHeaders() int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.headerTaskQueue.Size()
+}
+
+// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
+func (q *queue) PendingBlocks() int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.blockTaskQueue.Size()
+}
+
+// PendingReceipts retrieves the number of block receipts pending for retrieval.
+func (q *queue) PendingReceipts() int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.receiptTaskQueue.Size()
+}
+
+// InFlightHeaders retrieves whether there are header fetch requests currently
+// in flight.
+func (q *queue) InFlightHeaders() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return len(q.headerPendPool) > 0
+}
+
+// InFlightBlocks retrieves whether there are block fetch requests currently in
+// flight.
+func (q *queue) InFlightBlocks() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return len(q.blockPendPool) > 0
+}
+
+// InFlightReceipts retrieves whether there are receipt fetch requests currently
+// in flight.
+func (q *queue) InFlightReceipts() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return len(q.receiptPendPool) > 0
+}
+
+// Idle returns if the queue is fully idle or has some data still inside.
+func (q *queue) Idle() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
+ pending := len(q.blockPendPool) + len(q.receiptPendPool)
+ cached := len(q.blockDonePool) + len(q.receiptDonePool)
+
+ return (queued + pending + cached) == 0
+}
+
+// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
+// fetches exceed block cache).
+func (q *queue) ShouldThrottleBlocks() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0
+}
+
+// ShouldThrottleReceipts checks if the download should be throttled (active receipt
+// fetches exceed block cache).
+func (q *queue) ShouldThrottleReceipts() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0
+}
+
+// resultSlots calculates the number of results slots available for requests
+// whilst adhering to both the item and the memory limit too of the results
+// cache.
+func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
+ // Calculate the maximum length capped by the memory limit
+ limit := len(q.resultCache)
+ if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) {
+ limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
+ }
+ // Calculate the number of slots already finished
+ finished := 0
+ for _, result := range q.resultCache[:limit] {
+ if result == nil {
+ break
+ }
+ if _, ok := donePool[result.Hash]; ok {
+ finished++
+ }
+ }
+ // Calculate the number of slots currently downloading
+ pending := 0
+ for _, request := range pendPool {
+ for _, header := range request.Headers {
+ if header.Number.Uint64() < q.resultOffset+uint64(limit) {
+ pending++
+ }
+ }
+ }
+ // Return the free slots to distribute
+ return limit - finished - pending
+}
+
+// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
+// up an already retrieved header skeleton.
+func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
+ if q.headerResults != nil {
+ panic("skeleton assembly already in progress")
+ }
+ // Schedule all the header retrieval tasks for the skeleton assembly
+ q.headerTaskPool = make(map[uint64]*types.Header)
+ q.headerTaskQueue = prque.New(nil)
+ q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
+ q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
+ q.headerProced = 0
+ q.headerOffset = from
+ q.headerContCh = make(chan bool, 1)
+
+ for i, header := range skeleton {
+ index := from + uint64(i*MaxHeaderFetch)
+
+ q.headerTaskPool[index] = header
+ q.headerTaskQueue.Push(index, -int64(index))
+ }
+}
+
+// RetrieveHeaders retrieves the header chain assemble based on the scheduled
+// skeleton.
+func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ headers, proced := q.headerResults, q.headerProced
+ q.headerResults, q.headerProced = nil, 0
+
+ return headers, proced
+}
+
+// Schedule adds a set of headers for the download queue for scheduling, returning
+// the new headers encountered.
+func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Insert all the headers prioritised by the contained block number
+ inserts := make([]*types.Header, 0, len(headers))
+ for _, header := range headers {
+ // Make sure chain order is honoured and preserved throughout
+ hash := header.Hash()
+ if header.Number == nil || header.Number.Uint64() != from {
+ log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
+ break
+ }
+ if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
+ log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
+ break
+ }
+ // Make sure no duplicate requests are executed
+ if _, ok := q.blockTaskPool[hash]; ok {
+ log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
+ continue
+ }
+ if _, ok := q.receiptTaskPool[hash]; ok {
+ log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
+ continue
+ }
+ // Queue the header for content retrieval
+ q.blockTaskPool[hash] = header
+ q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
+
+ if q.mode == FastSync {
+ q.receiptTaskPool[hash] = header
+ q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ inserts = append(inserts, header)
+ q.headerHead = hash
+ from++
+ }
+ return inserts
+}
+
+// Results retrieves and permanently removes a batch of fetch results from
+// the cache. the result slice will be empty if the queue has been closed.
+func (q *queue) Results(block bool) []*fetchResult {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Count the number of items available for processing
+ nproc := q.countProcessableItems()
+ for nproc == 0 && !q.closed {
+ if !block {
+ return nil
+ }
+ q.active.Wait()
+ nproc = q.countProcessableItems()
+ }
+ // Since we have a batch limit, don't pull more into "dangling" memory
+ if nproc > maxResultsProcess {
+ nproc = maxResultsProcess
+ }
+ results := make([]*fetchResult, nproc)
+ copy(results, q.resultCache[:nproc])
+ if len(results) > 0 {
+ // Mark results as done before dropping them from the cache.
+ for _, result := range results {
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
+ }
+ // Delete the results from the cache and clear the tail.
+ copy(q.resultCache, q.resultCache[nproc:])
+ for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+ q.resultCache[i] = nil
+ }
+ // Advance the expected block number of the first cache entry.
+ q.resultOffset += uint64(nproc)
+
+ // Recalculate the result item weights to prevent memory exhaustion
+ for _, result := range results {
+ size := result.Header.Size()
+ for _, uncle := range result.Uncles {
+ size += uncle.Size()
+ }
+ for _, receipt := range result.Receipts {
+ size += receipt.Size()
+ }
+ for _, tx := range result.Transactions {
+ size += tx.Size()
+ }
+ q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
+ }
+ }
+ return results
+}
+
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
+ for i, result := range q.resultCache {
+ if result == nil || result.Pending > 0 {
+ return i
+ }
+ }
+ return len(q.resultCache)
+}
+
+// ReserveHeaders reserves a set of headers for the given peer, skipping any
+// previously failed batches.
+func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Short circuit if the peer's already downloading something (sanity check to
+ // not corrupt state)
+ if _, ok := q.headerPendPool[p.id]; ok {
+ return nil
+ }
+ // Retrieve a batch of hashes, skipping previously failed ones
+ send, skip := uint64(0), []uint64{}
+ for send == 0 && !q.headerTaskQueue.Empty() {
+ from, _ := q.headerTaskQueue.Pop()
+ if q.headerPeerMiss[p.id] != nil {
+ if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
+ skip = append(skip, from.(uint64))
+ continue
+ }
+ }
+ send = from.(uint64)
+ }
+ // Merge all the skipped batches back
+ for _, from := range skip {
+ q.headerTaskQueue.Push(from, -int64(from))
+ }
+ // Assemble and return the block download request
+ if send == 0 {
+ return nil
+ }
+ request := &fetchRequest{
+ Peer: p,
+ From: send,
+ Time: time.Now(),
+ }
+ q.headerPendPool[p.id] = request
+ return request
+}
+
+// ReserveBodies reserves a set of body fetches for the given peer, skipping any
+// previously failed downloads. Beside the next batch of needed fetches, it also
+// returns a flag whether empty blocks were queued requiring processing.
+func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
+ isNoop := func(header *types.Header) bool {
+ return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
+ }
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
+}
+
+// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
+// any previously failed downloads. Beside the next batch of needed fetches, it
+// also returns a flag whether empty receipts were queued requiring importing.
+func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
+ isNoop := func(header *types.Header) bool {
+ return header.ReceiptHash == types.EmptyRootHash
+ }
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
+}
+
+// reserveHeaders reserves a set of data download operations for a given peer,
+// skipping any previously failed ones. This method is a generic version used
+// by the individual special reservation functions.
+//
+// Note, this method expects the queue lock to be already held for writing. The
+// reason the lock is not obtained in here is because the parameters already need
+// to access the queue, so they already need a lock anyway.
+func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
+ // Short circuit if the pool has been depleted, or if the peer's already
+ // downloading something (sanity check not to corrupt state)
+ if taskQueue.Empty() {
+ return nil, false, nil
+ }
+ if _, ok := pendPool[p.id]; ok {
+ return nil, false, nil
+ }
+ // Calculate an upper limit on the items we might fetch (i.e. throttling)
+ space := q.resultSlots(pendPool, donePool)
+
+ // Retrieve a batch of tasks, skipping previously failed ones
+ send := make([]*types.Header, 0, count)
+ skip := make([]*types.Header, 0)
+
+ progress := false
+ for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
+ header := taskQueue.PopItem().(*types.Header)
+ hash := header.Hash()
+
+ // If we're the first to request this task, initialise the result container
+ index := int(header.Number.Int64() - int64(q.resultOffset))
+ if index >= len(q.resultCache) || index < 0 {
+ common.Report("index allocation went beyond available resultCache space")
+ return nil, false, errInvalidChain
+ }
+ if q.resultCache[index] == nil {
+ components := 1
+ if q.mode == FastSync {
+ components = 2
+ }
+ q.resultCache[index] = &fetchResult{
+ Pending: components,
+ Hash: hash,
+ Header: header,
+ }
+ }
+ // If this fetch task is a noop, skip this fetch operation
+ if isNoop(header) {
+ donePool[hash] = struct{}{}
+ delete(taskPool, hash)
+
+ space, proc = space-1, proc-1
+ q.resultCache[index].Pending--
+ progress = true
+ continue
+ }
+ // Otherwise unless the peer is known not to have the data, add to the retrieve list
+ if p.Lacks(hash) {
+ skip = append(skip, header)
+ } else {
+ send = append(send, header)
+ }
+ }
+ // Merge all the skipped headers back
+ for _, header := range skip {
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ if progress {
+ // Wake WaitResults, resultCache was modified
+ q.active.Signal()
+ }
+ // Assemble and return the block download request
+ if len(send) == 0 {
+ return nil, progress, nil
+ }
+ request := &fetchRequest{
+ Peer: p,
+ Headers: send,
+ Time: time.Now(),
+ }
+ pendPool[p.id] = request
+
+ return request, progress, nil
+}
+
+// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
+func (q *queue) CancelHeaders(request *fetchRequest) {
+ q.cancel(request, q.headerTaskQueue, q.headerPendPool)
+}
+
+// CancelBodies aborts a body fetch request, returning all pending headers to the
+// task queue.
+func (q *queue) CancelBodies(request *fetchRequest) {
+ q.cancel(request, q.blockTaskQueue, q.blockPendPool)
+}
+
+// CancelReceipts aborts a body fetch request, returning all pending headers to
+// the task queue.
+func (q *queue) CancelReceipts(request *fetchRequest) {
+ q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
+}
+
+// Cancel aborts a fetch request, returning all pending hashes to the task queue.
+func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ if request.From > 0 {
+ taskQueue.Push(request.From, -int64(request.From))
+ }
+ for _, header := range request.Headers {
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ delete(pendPool, request.Peer.id)
+}
+
+// Revoke cancels all pending requests belonging to a given peer. This method is
+// meant to be called during a peer drop to quickly reassign owned data fetches
+// to remaining nodes.
+func (q *queue) Revoke(peerID string) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ if request, ok := q.blockPendPool[peerID]; ok {
+ for _, header := range request.Headers {
+ q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ delete(q.blockPendPool, peerID)
+ }
+ if request, ok := q.receiptPendPool[peerID]; ok {
+ for _, header := range request.Headers {
+ q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ delete(q.receiptPendPool, peerID)
+ }
+}
+
+// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
+// canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
+}
+
+// ExpireBodies checks for in flight block body requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
+}
+
+// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
+}
+
+// expire is the generic check that move expired tasks from a pending pool back
+// into a task pool, returning all entities caught with expired tasks.
+//
+// Note, this method expects the queue lock to be already held. The
+// reason the lock is not obtained in here is because the parameters already need
+// to access the queue, so they already need a lock anyway.
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
+ // Iterate over the expired requests and return each to the queue
+ expiries := make(map[string]int)
+ for id, request := range pendPool {
+ if time.Since(request.Time) > timeout {
+ // Update the metrics with the timeout
+ timeoutMeter.Mark(1)
+
+ // Return any non satisfied requests to the pool
+ if request.From > 0 {
+ taskQueue.Push(request.From, -int64(request.From))
+ }
+ for _, header := range request.Headers {
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ // Add the peer to the expiry report along the number of failed requests
+ expiries[id] = len(request.Headers)
+
+ // Remove the expired requests from the pending pool directly
+ delete(pendPool, id)
+ }
+ }
+ return expiries
+}
+
+// DeliverHeaders injects a header retrieval response into the header results
+// cache. This method either accepts all headers it received, or none of them
+// if they do not map correctly to the skeleton.
+//
+// If the headers are accepted, the method makes an attempt to deliver the set
+// of ready headers to the processor to keep the pipeline full. However it will
+// not block to prevent stalling other pending deliveries.
+func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Short circuit if the data was never requested
+ request := q.headerPendPool[id]
+ if request == nil {
+ return 0, errNoFetchesPending
+ }
+ headerReqTimer.UpdateSince(request.Time)
+ delete(q.headerPendPool, id)
+
+ // Ensure headers can be mapped onto the skeleton chain
+ target := q.headerTaskPool[request.From].Hash()
+
+ accepted := len(headers) == MaxHeaderFetch
+ if accepted {
+ if headers[0].Number.Uint64() != request.From {
+ log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
+ accepted = false
+ } else if headers[len(headers)-1].Hash() != target {
+ log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
+ accepted = false
+ }
+ }
+ if accepted {
+ for i, header := range headers[1:] {
+ hash := header.Hash()
+ if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
+ log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
+ accepted = false
+ break
+ }
+ if headers[i].Hash() != header.ParentHash {
+ log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
+ accepted = false
+ break
+ }
+ }
+ }
+ // If the batch of headers wasn't accepted, mark as unavailable
+ if !accepted {
+ log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)
+
+ miss := q.headerPeerMiss[id]
+ if miss == nil {
+ q.headerPeerMiss[id] = make(map[uint64]struct{})
+ miss = q.headerPeerMiss[id]
+ }
+ miss[request.From] = struct{}{}
+
+ q.headerTaskQueue.Push(request.From, -int64(request.From))
+ return 0, errors.New("delivery not accepted")
+ }
+ // Clean up a successful fetch and try to deliver any sub-results
+ copy(q.headerResults[request.From-q.headerOffset:], headers)
+ delete(q.headerTaskPool, request.From)
+
+ ready := 0
+ for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
+ ready += MaxHeaderFetch
+ }
+ if ready > 0 {
+ // Headers are ready for delivery, gather them and push forward (non blocking)
+ process := make([]*types.Header, ready)
+ copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
+
+ select {
+ case headerProcCh <- process:
+ log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
+ q.headerProced += len(process)
+ default:
+ }
+ }
+ // Check for termination and return
+ if len(q.headerTaskPool) == 0 {
+ q.headerContCh <- false
+ }
+ return len(headers), nil
+}
+
+// DeliverBodies injects a block body retrieval response into the results queue.
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+ if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
+ return errInvalidBody
+ }
+ result.Transactions = txLists[index]
+ result.Uncles = uncleLists[index]
+ return nil
+ }
+ return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
+}
+
+// DeliverReceipts injects a receipt retrieval response into the results queue.
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+ if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
+ return errInvalidReceipt
+ }
+ result.Receipts = receiptList[index]
+ return nil
+ }
+ return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
+}
+
+// deliver injects a data retrieval response into the results queue.
+//
+// Note, this method expects the queue lock to be already held for writing. The
+// reason the lock is not obtained in here is because the parameters already need
+// to access the queue, so they already need a lock anyway.
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+ results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
+ // Short circuit if the data was never requested
+ request := pendPool[id]
+ if request == nil {
+ return 0, errNoFetchesPending
+ }
+ reqTimer.UpdateSince(request.Time)
+ delete(pendPool, id)
+
+ // If no data items were retrieved, mark them as unavailable for the origin peer
+ if results == 0 {
+ for _, header := range request.Headers {
+ request.Peer.MarkLacking(header.Hash())
+ }
+ }
+ // Assemble each of the results with their headers and retrieved data parts
+ var (
+ accepted int
+ failure error
+ useful bool
+ )
+ for i, header := range request.Headers {
+ // Short circuit assembly if no more fetch results are found
+ if i >= results {
+ break
+ }
+ // Reconstruct the next result if contents match up
+ index := int(header.Number.Int64() - int64(q.resultOffset))
+ if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
+ failure = errInvalidChain
+ break
+ }
+ if err := reconstruct(header, i, q.resultCache[index]); err != nil {
+ failure = err
+ break
+ }
+ hash := header.Hash()
+
+ donePool[hash] = struct{}{}
+ q.resultCache[index].Pending--
+ useful = true
+ accepted++
+
+ // Clean up a successful fetch
+ request.Headers[i] = nil
+ delete(taskPool, hash)
+ }
+ // Return all failed or missing fetches to the queue
+ for _, header := range request.Headers {
+ if header != nil {
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
+ }
+ }
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
+ // If none of the data was good, it's a stale delivery
+ switch {
+ case failure == nil || failure == errInvalidChain:
+ return accepted, failure
+ case useful:
+ return accepted, fmt.Errorf("partial failure: %v", failure)
+ default:
+ return accepted, errStaleDelivery
+ }
+}
+
+// Prepare configures the result cache to allow accepting and caching inbound
+// fetch results.
+func (q *queue) Prepare(offset uint64, mode SyncMode) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Prepare the queue for sync results
+ if q.resultOffset < offset {
+ q.resultOffset = offset
+ }
+ q.mode = mode
+}
diff --git a/dex/downloader/statesync.go b/dex/downloader/statesync.go
new file mode 100644
index 000000000..49117abbb
--- /dev/null
+++ b/dex/downloader/statesync.go
@@ -0,0 +1,484 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+ "hash"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core/rawdb"
+ "github.com/dexon-foundation/dexon/core/state"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/trie"
+ "golang.org/x/crypto/sha3"
+)
+
+// stateReq represents a batch of state fetch requests grouped together into
+// a single data retrieval network packet.
+type stateReq struct {
+ items []common.Hash // Hashes of the state items to download
+ tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
+ timeout time.Duration // Maximum round trip time for this to complete
+ timer *time.Timer // Timer to fire when the RTT timeout expires
+ peer *peerConnection // Peer that we're requesting from
+ response [][]byte // Response data of the peer (nil for timeouts)
+ dropped bool // Flag whether the peer dropped off early
+}
+
+// timedOut returns if this request timed out.
+func (req *stateReq) timedOut() bool {
+ return req.response == nil
+}
+
+// stateSyncStats is a collection of progress stats to report during a state trie
+// sync to RPC requests as well as to display in user logs.
+type stateSyncStats struct {
+ processed uint64 // Number of state entries processed
+ duplicate uint64 // Number of state entries downloaded twice
+ unexpected uint64 // Number of non-requested state entries received
+ pending uint64 // Number of still pending state entries
+}
+
+// syncState starts downloading state with the given root hash.
+func (d *Downloader) syncState(root common.Hash) *stateSync {
+ s := newStateSync(d, root)
+ select {
+ case d.stateSyncStart <- s:
+ case <-d.quitCh:
+ s.err = errCancelStateFetch
+ close(s.done)
+ }
+ return s
+}
+
+// stateFetcher manages the active state sync and accepts requests
+// on its behalf.
+func (d *Downloader) stateFetcher() {
+ for {
+ select {
+ case s := <-d.stateSyncStart:
+ for next := s; next != nil; {
+ next = d.runStateSync(next)
+ }
+ case <-d.stateCh:
+ // Ignore state responses while no sync is running.
+ case <-d.quitCh:
+ return
+ }
+ }
+}
+
+// runStateSync runs a state synchronisation until it completes or another root
+// hash is requested to be switched over to.
+func (d *Downloader) runStateSync(s *stateSync) *stateSync {
+ var (
+ active = make(map[string]*stateReq) // Currently in-flight requests
+ finished []*stateReq // Completed or failed requests
+ timeout = make(chan *stateReq) // Timed out active requests
+ )
+ defer func() {
+ // Cancel active request timers on exit. Also set peers to idle so they're
+ // available for the next sync.
+ for _, req := range active {
+ req.timer.Stop()
+ req.peer.SetNodeDataIdle(len(req.items))
+ }
+ }()
+ // Run the state sync.
+ go s.run()
+ defer s.Cancel()
+
+ // Listen for peer departure events to cancel assigned tasks
+ peerDrop := make(chan *peerConnection, 1024)
+ peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
+ defer peerSub.Unsubscribe()
+
+ for {
+ // Enable sending of the first buffered element if there is one.
+ var (
+ deliverReq *stateReq
+ deliverReqCh chan *stateReq
+ )
+ if len(finished) > 0 {
+ deliverReq = finished[0]
+ deliverReqCh = s.deliver
+ }
+
+ select {
+ // The stateSync lifecycle:
+ case next := <-d.stateSyncStart:
+ return next
+
+ case <-s.done:
+ return nil
+
+ // Send the next finished request to the current sync:
+ case deliverReqCh <- deliverReq:
+ // Shift out the first request, but also set the emptied slot to nil for GC
+ copy(finished, finished[1:])
+ finished[len(finished)-1] = nil
+ finished = finished[:len(finished)-1]
+
+ // Handle incoming state packs:
+ case pack := <-d.stateCh:
+ // Discard any data not requested (or previously timed out)
+ req := active[pack.PeerId()]
+ if req == nil {
+ log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
+ continue
+ }
+ // Finalize the request and queue up for processing
+ req.timer.Stop()
+ req.response = pack.(*statePack).states
+
+ finished = append(finished, req)
+ delete(active, pack.PeerId())
+
+ // Handle dropped peer connections:
+ case p := <-peerDrop:
+ // Skip if no request is currently pending
+ req := active[p.id]
+ if req == nil {
+ continue
+ }
+ // Finalize the request and queue up for processing
+ req.timer.Stop()
+ req.dropped = true
+
+ finished = append(finished, req)
+ delete(active, p.id)
+
+ // Handle timed-out requests:
+ case req := <-timeout:
+ // If the peer is already requesting something else, ignore the stale timeout.
+ // This can happen when the timeout and the delivery happens simultaneously,
+ // causing both pathways to trigger.
+ if active[req.peer.id] != req {
+ continue
+ }
+ // Move the timed out data back into the download queue
+ finished = append(finished, req)
+ delete(active, req.peer.id)
+
+ // Track outgoing state requests:
+ case req := <-d.trackStateReq:
+ // If an active request already exists for this peer, we have a problem. In
+ // theory the trie node schedule must never assign two requests to the same
+ // peer. In practice however, a peer might receive a request, disconnect and
+ // immediately reconnect before the previous times out. In this case the first
+ // request is never honored, alas we must not silently overwrite it, as that
+ // causes valid requests to go missing and sync to get stuck.
+ if old := active[req.peer.id]; old != nil {
+ log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
+
+ // Make sure the previous one doesn't get siletly lost
+ old.timer.Stop()
+ old.dropped = true
+
+ finished = append(finished, old)
+ }
+ // Start a timer to notify the sync loop if the peer stalled.
+ req.timer = time.AfterFunc(req.timeout, func() {
+ select {
+ case timeout <- req:
+ case <-s.done:
+ // Prevent leaking of timer goroutines in the unlikely case where a
+ // timer is fired just before exiting runStateSync.
+ }
+ })
+ active[req.peer.id] = req
+ }
+ }
+}
+
+// stateSync schedules requests for downloading a particular state trie defined
+// by a given state root.
+type stateSync struct {
+ d *Downloader // Downloader instance to access and manage current peerset
+
+ sched *trie.Sync // State trie sync scheduler defining the tasks
+ keccak hash.Hash // Keccak256 hasher to verify deliveries with
+ tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
+
+ numUncommitted int
+ bytesUncommitted int
+
+ deliver chan *stateReq // Delivery channel multiplexing peer responses
+ cancel chan struct{} // Channel to signal a termination request
+ cancelOnce sync.Once // Ensures cancel only ever gets called once
+ done chan struct{} // Channel to signal termination completion
+ err error // Any error hit during sync (set before completion)
+}
+
+// stateTask represents a single trie node download task, containing a set of
+// peers already attempted retrieval from to detect stalled syncs and abort.
+type stateTask struct {
+ attempts map[string]struct{}
+}
+
+// newStateSync creates a new state trie download scheduler. This method does not
+// yet start the sync. The user needs to call run to initiate.
+func newStateSync(d *Downloader, root common.Hash) *stateSync {
+ return &stateSync{
+ d: d,
+ sched: state.NewStateSync(root, d.stateDB),
+ keccak: sha3.NewLegacyKeccak256(),
+ tasks: make(map[common.Hash]*stateTask),
+ deliver: make(chan *stateReq),
+ cancel: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+}
+
+// run starts the task assignment and response processing loop, blocking until
+// it finishes, and finally notifying any goroutines waiting for the loop to
+// finish.
+func (s *stateSync) run() {
+ s.err = s.loop()
+ close(s.done)
+}
+
+// Wait blocks until the sync is done or canceled.
+func (s *stateSync) Wait() error {
+ <-s.done
+ return s.err
+}
+
+// Cancel cancels the sync and waits until it has shut down.
+func (s *stateSync) Cancel() error {
+ s.cancelOnce.Do(func() { close(s.cancel) })
+ return s.Wait()
+}
+
+// loop is the main event loop of a state trie sync. It it responsible for the
+// assignment of new tasks to peers (including sending it to them) as well as
+// for the processing of inbound data. Note, that the loop does not directly
+// receive data from peers, rather those are buffered up in the downloader and
+// pushed here async. The reason is to decouple processing from data receipt
+// and timeouts.
+func (s *stateSync) loop() (err error) {
+ // Listen for new peer events to assign tasks to them
+ newPeer := make(chan *peerConnection, 1024)
+ peerSub := s.d.peers.SubscribeNewPeers(newPeer)
+ defer peerSub.Unsubscribe()
+ defer func() {
+ cerr := s.commit(true)
+ if err == nil {
+ err = cerr
+ }
+ }()
+
+ // Keep assigning new tasks until the sync completes or aborts
+ for s.sched.Pending() > 0 {
+ if err = s.commit(false); err != nil {
+ return err
+ }
+ s.assignTasks()
+ // Tasks assigned, wait for something to happen
+ select {
+ case <-newPeer:
+ // New peer arrived, try to assign it download tasks
+
+ case <-s.cancel:
+ return errCancelStateFetch
+
+ case <-s.d.cancelCh:
+ return errCancelStateFetch
+
+ case req := <-s.deliver:
+ // Response, disconnect or timeout triggered, drop the peer if stalling
+ log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
+ if len(req.items) <= 2 && !req.dropped && req.timedOut() {
+ // 2 items are the minimum requested, if even that times out, we've no use of
+ // this peer at the moment.
+ log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
+ s.d.dropPeer(req.peer.id)
+ }
+ // Process all the received blobs and check for stale delivery
+ delivered, err := s.process(req)
+ if err != nil {
+ log.Warn("Node data write error", "err", err)
+ return err
+ }
+ req.peer.SetNodeDataIdle(delivered)
+ }
+ }
+ return nil
+}
+
+func (s *stateSync) commit(force bool) error {
+ if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
+ return nil
+ }
+ start := time.Now()
+ b := s.d.stateDB.NewBatch()
+ if written, err := s.sched.Commit(b); written == 0 || err != nil {
+ return err
+ }
+ if err := b.Write(); err != nil {
+ return fmt.Errorf("DB write error: %v", err)
+ }
+ s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
+ s.numUncommitted = 0
+ s.bytesUncommitted = 0
+ return nil
+}
+
+// assignTasks attempts to assign new tasks to all idle peers, either from the
+// batch currently being retried, or fetching new data from the trie sync itself.
+func (s *stateSync) assignTasks() {
+ // Iterate over all idle peers and try to assign them state fetches
+ peers, _ := s.d.peers.NodeDataIdlePeers()
+ for _, p := range peers {
+ // Assign a batch of fetches proportional to the estimated latency/bandwidth
+ cap := p.NodeDataCapacity(s.d.requestRTT())
+ req := &stateReq{peer: p, timeout: s.d.requestTTL()}
+ s.fillTasks(cap, req)
+
+ // If the peer was assigned tasks to fetch, send the network request
+ if len(req.items) > 0 {
+ req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
+ select {
+ case s.d.trackStateReq <- req:
+ req.peer.FetchNodeData(req.items)
+ case <-s.cancel:
+ case <-s.d.cancelCh:
+ }
+ }
+ }
+}
+
+// fillTasks fills the given request object with a maximum of n state download
+// tasks to send to the remote peer.
+func (s *stateSync) fillTasks(n int, req *stateReq) {
+ // Refill available tasks from the scheduler.
+ if len(s.tasks) < n {
+ new := s.sched.Missing(n - len(s.tasks))
+ for _, hash := range new {
+ s.tasks[hash] = &stateTask{make(map[string]struct{})}
+ }
+ }
+ // Find tasks that haven't been tried with the request's peer.
+ req.items = make([]common.Hash, 0, n)
+ req.tasks = make(map[common.Hash]*stateTask, n)
+ for hash, t := range s.tasks {
+ // Stop when we've gathered enough requests
+ if len(req.items) == n {
+ break
+ }
+ // Skip any requests we've already tried from this peer
+ if _, ok := t.attempts[req.peer.id]; ok {
+ continue
+ }
+ // Assign the request to this peer
+ t.attempts[req.peer.id] = struct{}{}
+ req.items = append(req.items, hash)
+ req.tasks[hash] = t
+ delete(s.tasks, hash)
+ }
+}
+
+// process iterates over a batch of delivered state data, injecting each item
+// into a running state sync, re-queuing any items that were requested but not
+// delivered.
+// Returns whether the peer actually managed to deliver anything of value,
+// and any error that occurred
+func (s *stateSync) process(req *stateReq) (int, error) {
+ // Collect processing stats and update progress if valid data was received
+ duplicate, unexpected, successful := 0, 0, 0
+
+ defer func(start time.Time) {
+ if duplicate > 0 || unexpected > 0 {
+ s.updateStats(0, duplicate, unexpected, time.Since(start))
+ }
+ }(time.Now())
+
+ // Iterate over all the delivered data and inject one-by-one into the trie
+ progress := false
+ for _, blob := range req.response {
+ prog, hash, err := s.processNodeData(blob)
+ switch err {
+ case nil:
+ s.numUncommitted++
+ s.bytesUncommitted += len(blob)
+ progress = progress || prog
+ successful++
+ case trie.ErrNotRequested:
+ unexpected++
+ case trie.ErrAlreadyProcessed:
+ duplicate++
+ default:
+ return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
+ }
+ if _, ok := req.tasks[hash]; ok {
+ delete(req.tasks, hash)
+ }
+ }
+ // Put unfulfilled tasks back into the retry queue
+ npeers := s.d.peers.Len()
+ for hash, task := range req.tasks {
+ // If the node did deliver something, missing items may be due to a protocol
+ // limit or a previous timeout + delayed delivery. Both cases should permit
+ // the node to retry the missing items (to avoid single-peer stalls).
+ if len(req.response) > 0 || req.timedOut() {
+ delete(task.attempts, req.peer.id)
+ }
+ // If we've requested the node too many times already, it may be a malicious
+ // sync where nobody has the right data. Abort.
+ if len(task.attempts) >= npeers {
+ return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
+ }
+ // Missing item, place into the retry queue.
+ s.tasks[hash] = task
+ }
+ return successful, nil
+}
+
+// processNodeData tries to inject a trie node data blob delivered from a remote
+// peer into the state trie, returning whether anything useful was written or any
+// error occurred.
+func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
+ res := trie.SyncResult{Data: blob}
+ s.keccak.Reset()
+ s.keccak.Write(blob)
+ s.keccak.Sum(res.Hash[:0])
+ committed, _, err := s.sched.Process([]trie.SyncResult{res})
+ return committed, res.Hash, err
+}
+
+// updateStats bumps the various state sync progress counters and displays a log
+// message for the user to see.
+func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) {
+ s.d.syncStatsLock.Lock()
+ defer s.d.syncStatsLock.Unlock()
+
+ s.d.syncStatsState.pending = uint64(s.sched.Pending())
+ s.d.syncStatsState.processed += uint64(written)
+ s.d.syncStatsState.duplicate += uint64(duplicate)
+ s.d.syncStatsState.unexpected += uint64(unexpected)
+
+ if written > 0 || duplicate > 0 || unexpected > 0 {
+ log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
+ }
+ if written > 0 {
+ rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed)
+ }
+}
diff --git a/dex/downloader/testchain_test.go b/dex/downloader/testchain_test.go
new file mode 100644
index 000000000..e73bed513
--- /dev/null
+++ b/dex/downloader/testchain_test.go
@@ -0,0 +1,221 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+ "math/big"
+ "sync"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/consensus/ethash"
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/params"
+)
+
+// Test chain parameters.
+var (
+ testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
+ testDB = ethdb.NewMemDatabase()
+ testGenesis = core.GenesisBlockForTesting(testDB, testAddress, big.NewInt(1000000000))
+)
+
+// The common prefix of all test chains:
+var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
+
+// Different forks on top of the base chain:
+var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
+
+func init() {
+ var forkLen = int(MaxForkAncestry + 50)
+ var wg sync.WaitGroup
+ wg.Add(3)
+ go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()
+ go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }()
+ go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }()
+ wg.Wait()
+}
+
+type testChain struct {
+ genesis *types.Block
+ chain []common.Hash
+ headerm map[common.Hash]*types.Header
+ blockm map[common.Hash]*types.Block
+ receiptm map[common.Hash][]*types.Receipt
+ tdm map[common.Hash]*big.Int
+}
+
+// newTestChain creates a blockchain of the given length.
+func newTestChain(length int, genesis *types.Block) *testChain {
+ tc := new(testChain).copy(length)
+ tc.genesis = genesis
+ tc.chain = append(tc.chain, genesis.Hash())
+ tc.headerm[tc.genesis.Hash()] = tc.genesis.Header()
+ tc.tdm[tc.genesis.Hash()] = tc.genesis.Difficulty()
+ tc.blockm[tc.genesis.Hash()] = tc.genesis
+ tc.generate(length-1, 0, genesis, false)
+ return tc
+}
+
+// makeFork creates a fork on top of the test chain.
+func (tc *testChain) makeFork(length int, heavy bool, seed byte) *testChain {
+ fork := tc.copy(tc.len() + length)
+ fork.generate(length, seed, tc.headBlock(), heavy)
+ return fork
+}
+
+// shorten creates a copy of the chain with the given length. It panics if the
+// length is longer than the number of available blocks.
+func (tc *testChain) shorten(length int) *testChain {
+ if length > tc.len() {
+ panic(fmt.Errorf("can't shorten test chain to %d blocks, it's only %d blocks long", length, tc.len()))
+ }
+ return tc.copy(length)
+}
+
+func (tc *testChain) copy(newlen int) *testChain {
+ cpy := &testChain{
+ genesis: tc.genesis,
+ headerm: make(map[common.Hash]*types.Header, newlen),
+ blockm: make(map[common.Hash]*types.Block, newlen),
+ receiptm: make(map[common.Hash][]*types.Receipt, newlen),
+ tdm: make(map[common.Hash]*big.Int, newlen),
+ }
+ for i := 0; i < len(tc.chain) && i < newlen; i++ {
+ hash := tc.chain[i]
+ cpy.chain = append(cpy.chain, tc.chain[i])
+ cpy.tdm[hash] = tc.tdm[hash]
+ cpy.blockm[hash] = tc.blockm[hash]
+ cpy.headerm[hash] = tc.headerm[hash]
+ cpy.receiptm[hash] = tc.receiptm[hash]
+ }
+ return cpy
+}
+
+// generate creates a chain of n blocks starting at and including parent.
+// the returned hash chain is ordered head->parent. In addition, every 22th block
+// contains a transaction and every 5th an uncle to allow testing correct block
+// reassembly.
+func (tc *testChain) generate(n int, seed byte, parent *types.Block, heavy bool) {
+ // start := time.Now()
+ // defer func() { fmt.Printf("test chain generated in %v\n", time.Since(start)) }()
+
+ blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
+ block.SetCoinbase(common.Address{seed})
+ // If a heavy chain is requested, delay blocks to raise difficulty
+ if heavy {
+ block.OffsetTime(-1)
+ }
+ // Include transactions to the miner to make blocks more interesting.
+ if parent == tc.genesis && i%22 == 0 {
+ signer := types.MakeSigner(params.TestChainConfig, block.Number())
+ tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
+ if err != nil {
+ panic(err)
+ }
+ block.AddTx(tx)
+ }
+ // if the block number is a multiple of 5, add a bonus uncle to the block
+ if i > 0 && i%5 == 0 {
+ block.AddUncle(&types.Header{
+ ParentHash: block.PrevBlock(i - 1).Hash(),
+ Number: big.NewInt(block.Number().Int64() - 1),
+ })
+ }
+ })
+
+ // Convert the block-chain into a hash-chain and header/block maps
+ td := new(big.Int).Set(tc.td(parent.Hash()))
+ for i, b := range blocks {
+ td := td.Add(td, b.Difficulty())
+ hash := b.Hash()
+ tc.chain = append(tc.chain, hash)
+ tc.blockm[hash] = b
+ tc.headerm[hash] = b.Header()
+ tc.receiptm[hash] = receipts[i]
+ tc.tdm[hash] = new(big.Int).Set(td)
+ }
+}
+
+// len returns the total number of blocks in the chain.
+func (tc *testChain) len() int {
+ return len(tc.chain)
+}
+
+// headBlock returns the head of the chain.
+func (tc *testChain) headBlock() *types.Block {
+ return tc.blockm[tc.chain[len(tc.chain)-1]]
+}
+
+// td returns the total difficulty of the given block.
+func (tc *testChain) td(hash common.Hash) *big.Int {
+ return tc.tdm[hash]
+}
+
+// headersByHash returns headers in ascending order from the given hash.
+func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.Header {
+ num, _ := tc.hashToNumber(origin)
+ return tc.headersByNumber(num, amount, skip)
+}
+
+// headersByNumber returns headers in ascending order from the given number.
+func (tc *testChain) headersByNumber(origin uint64, amount int, skip int) []*types.Header {
+ result := make([]*types.Header, 0, amount)
+ for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 {
+ if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
+ result = append(result, header)
+ }
+ }
+ return result
+}
+
+// receipts returns the receipts of the given block hashes.
+func (tc *testChain) receipts(hashes []common.Hash) [][]*types.Receipt {
+ results := make([][]*types.Receipt, 0, len(hashes))
+ for _, hash := range hashes {
+ if receipt, ok := tc.receiptm[hash]; ok {
+ results = append(results, receipt)
+ }
+ }
+ return results
+}
+
+// bodies returns the block bodies of the given block hashes.
+func (tc *testChain) bodies(hashes []common.Hash) ([][]*types.Transaction, [][]*types.Header) {
+ transactions := make([][]*types.Transaction, 0, len(hashes))
+ uncles := make([][]*types.Header, 0, len(hashes))
+ for _, hash := range hashes {
+ if block, ok := tc.blockm[hash]; ok {
+ transactions = append(transactions, block.Transactions())
+ uncles = append(uncles, block.Uncles())
+ }
+ }
+ return transactions, uncles
+}
+
+func (tc *testChain) hashToNumber(target common.Hash) (uint64, bool) {
+ for num, hash := range tc.chain {
+ if hash == target {
+ return uint64(num), true
+ }
+ }
+ return 0, false
+}
diff --git a/dex/downloader/types.go b/dex/downloader/types.go
new file mode 100644
index 000000000..d320b7590
--- /dev/null
+++ b/dex/downloader/types.go
@@ -0,0 +1,79 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+
+ "github.com/dexon-foundation/dexon/core/types"
+)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
+
+// dataPack is a data message returned by a peer for some query.
+type dataPack interface {
+ PeerId() string
+ Items() int
+ Stats() string
+}
+
+// headerPack is a batch of block headers returned by a peer.
+type headerPack struct {
+ peerID string
+ headers []*types.Header
+}
+
+func (p *headerPack) PeerId() string { return p.peerID }
+func (p *headerPack) Items() int { return len(p.headers) }
+func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
+
+// bodyPack is a batch of block bodies returned by a peer.
+type bodyPack struct {
+ peerID string
+ transactions [][]*types.Transaction
+ uncles [][]*types.Header
+}
+
+func (p *bodyPack) PeerId() string { return p.peerID }
+func (p *bodyPack) Items() int {
+ if len(p.transactions) <= len(p.uncles) {
+ return len(p.transactions)
+ }
+ return len(p.uncles)
+}
+func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
+
+// receiptPack is a batch of receipts returned by a peer.
+type receiptPack struct {
+ peerID string
+ receipts [][]*types.Receipt
+}
+
+func (p *receiptPack) PeerId() string { return p.peerID }
+func (p *receiptPack) Items() int { return len(p.receipts) }
+func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
+
+// statePack is a batch of states returned by a peer.
+type statePack struct {
+ peerID string
+ states [][]byte
+}
+
+func (p *statePack) PeerId() string { return p.peerID }
+func (p *statePack) Items() int { return len(p.states) }
+func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) }
diff --git a/dex/fetcher/fetcher.go b/dex/fetcher/fetcher.go
new file mode 100644
index 000000000..f6807b5a5
--- /dev/null
+++ b/dex/fetcher/fetcher.go
@@ -0,0 +1,736 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package fetcher contains the block announcement based synchronisation.
+package fetcher
+
+import (
+ "errors"
+ "math/rand"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/common/prque"
+ "github.com/dexon-foundation/dexon/consensus"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/log"
+)
+
+const (
+ arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+ gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
+ fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block
+ maxUncleDist = 7 // Maximum allowed backward distance from the chain head
+ maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
+ hashLimit = 256 // Maximum number of unique blocks a peer may have announced
+ blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
+)
+
+var (
+ errTerminated = errors.New("terminated")
+)
+
+// blockRetrievalFn is a callback type for retrieving a block from the local chain.
+type blockRetrievalFn func(common.Hash) *types.Block
+
+// headerRequesterFn is a callback type for sending a header retrieval request.
+type headerRequesterFn func(common.Hash) error
+
+// bodyRequesterFn is a callback type for sending a body retrieval request.
+type bodyRequesterFn func([]common.Hash) error
+
+// headerVerifierFn is a callback type to verify a block's header for fast propagation.
+type headerVerifierFn func(header *types.Header) error
+
+// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
+type blockBroadcasterFn func(block *types.Block, propagate bool)
+
+// chainHeightFn is a callback type to retrieve the current chain height.
+type chainHeightFn func() uint64
+
+// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type chainInsertFn func(types.Blocks) (int, error)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
+
+// announce is the hash notification of the availability of a new block in the
+// network.
+type announce struct {
+ hash common.Hash // Hash of the block being announced
+ number uint64 // Number of the block being announced (0 = unknown | old protocol)
+ header *types.Header // Header of the block partially reassembled (new protocol)
+ time time.Time // Timestamp of the announcement
+
+ origin string // Identifier of the peer originating the notification
+
+ fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
+ fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
+}
+
+// headerFilterTask represents a batch of headers needing fetcher filtering.
+type headerFilterTask struct {
+ peer string // The source peer of block headers
+ headers []*types.Header // Collection of headers to filter
+ time time.Time // Arrival time of the headers
+}
+
+// bodyFilterTask represents a batch of block bodies (transactions and uncles)
+// needing fetcher filtering.
+type bodyFilterTask struct {
+ peer string // The source peer of block bodies
+ transactions [][]*types.Transaction // Collection of transactions per block bodies
+ uncles [][]*types.Header // Collection of uncles per block bodies
+ time time.Time // Arrival time of the blocks' contents
+}
+
+// inject represents a schedules import operation.
+type inject struct {
+ origin string
+ block *types.Block
+}
+
+// Fetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type Fetcher struct {
+ // Various event channels
+ notify chan *announce
+ inject chan *inject
+
+ blockFilter chan chan []*types.Block
+ headerFilter chan chan *headerFilterTask
+ bodyFilter chan chan *bodyFilterTask
+
+ done chan common.Hash
+ quit chan struct{}
+
+ // Announce states
+ announces map[string]int // Per peer announce counts to prevent memory exhaustion
+ announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
+ fetching map[common.Hash]*announce // Announced blocks, currently fetching
+ fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
+ completing map[common.Hash]*announce // Blocks with headers, currently body-completing
+
+ // Block cache
+ queue *prque.Prque // Queue containing the import operations (block number sorted)
+ queues map[string]int // Per peer block counts to prevent memory exhaustion
+ queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
+
+ // Callbacks
+ getBlock blockRetrievalFn // Retrieves a block from the local chain
+ verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
+ broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
+ chainHeight chainHeightFn // Retrieves the current chain's height
+ insertChain chainInsertFn // Injects a batch of blocks into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
+
+ // Testing hooks
+ announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
+ queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
+ fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
+ completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
+ importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
+}
+
+// New creates a block fetcher to retrieve blocks based on hash announcements.
+func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
+ return &Fetcher{
+ notify: make(chan *announce),
+ inject: make(chan *inject),
+ blockFilter: make(chan chan []*types.Block),
+ headerFilter: make(chan chan *headerFilterTask),
+ bodyFilter: make(chan chan *bodyFilterTask),
+ done: make(chan common.Hash),
+ quit: make(chan struct{}),
+ announces: make(map[string]int),
+ announced: make(map[common.Hash][]*announce),
+ fetching: make(map[common.Hash]*announce),
+ fetched: make(map[common.Hash][]*announce),
+ completing: make(map[common.Hash]*announce),
+ queue: prque.New(nil),
+ queues: make(map[string]int),
+ queued: make(map[common.Hash]*inject),
+ getBlock: getBlock,
+ verifyHeader: verifyHeader,
+ broadcastBlock: broadcastBlock,
+ chainHeight: chainHeight,
+ insertChain: insertChain,
+ dropPeer: dropPeer,
+ }
+}
+
+// Start boots up the announcement based synchroniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *Fetcher) Start() {
+ go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *Fetcher) Stop() {
+ close(f.quit)
+}
+
+// Notify announces the fetcher of the potential availability of a new block in
+// the network.
+func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
+ headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
+ block := &announce{
+ hash: hash,
+ number: number,
+ time: time,
+ origin: peer,
+ fetchHeader: headerFetcher,
+ fetchBodies: bodyFetcher,
+ }
+ select {
+ case f.notify <- block:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
+// Enqueue tries to fill gaps the fetcher's future import queue.
+func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
+ op := &inject{
+ origin: peer,
+ block: block,
+ }
+ select {
+ case f.inject <- op:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
+// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
+// returning those that should be handled differently.
+func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
+ log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
+
+ // Send the filter channel to the fetcher
+ filter := make(chan *headerFilterTask)
+
+ select {
+ case f.headerFilter <- filter:
+ case <-f.quit:
+ return nil
+ }
+ // Request the filtering of the header list
+ select {
+ case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
+ case <-f.quit:
+ return nil
+ }
+ // Retrieve the headers remaining after filtering
+ select {
+ case task := <-filter:
+ return task.headers
+ case <-f.quit:
+ return nil
+ }
+}
+
+// FilterBodies extracts all the block bodies that were explicitly requested by
+// the fetcher, returning those that should be handled differently.
+func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
+ log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
+
+ // Send the filter channel to the fetcher
+ filter := make(chan *bodyFilterTask)
+
+ select {
+ case f.bodyFilter <- filter:
+ case <-f.quit:
+ return nil, nil
+ }
+ // Request the filtering of the body list
+ select {
+ case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
+ case <-f.quit:
+ return nil, nil
+ }
+ // Retrieve the bodies remaining after filtering
+ select {
+ case task := <-filter:
+ return task.transactions, task.uncles
+ case <-f.quit:
+ return nil, nil
+ }
+}
+
+// Loop is the main fetcher loop, checking and processing various notification
+// events.
+func (f *Fetcher) loop() {
+ // Iterate the block fetching until a quit is requested
+ fetchTimer := time.NewTimer(0)
+ completeTimer := time.NewTimer(0)
+
+ for {
+ // Clean up any expired block fetches
+ for hash, announce := range f.fetching {
+ if time.Since(announce.time) > fetchTimeout {
+ f.forgetHash(hash)
+ }
+ }
+ // Import any queued blocks that could potentially fit
+ height := f.chainHeight()
+ for !f.queue.Empty() {
+ op := f.queue.PopItem().(*inject)
+ hash := op.block.Hash()
+ if f.queueChangeHook != nil {
+ f.queueChangeHook(hash, false)
+ }
+ // If too high up the chain or phase, continue later
+ number := op.block.NumberU64()
+ if number > height+1 {
+ f.queue.Push(op, -int64(number))
+ if f.queueChangeHook != nil {
+ f.queueChangeHook(hash, true)
+ }
+ break
+ }
+ // Otherwise if fresh and still unknown, try and import
+ if number+maxUncleDist < height || f.getBlock(hash) != nil {
+ f.forgetBlock(hash)
+ continue
+ }
+ f.insert(op.origin, op.block)
+ }
+ // Wait for an outside event to occur
+ select {
+ case <-f.quit:
+ // Fetcher terminating, abort all operations
+ return
+
+ case notification := <-f.notify:
+ // A block was announced, make sure the peer isn't DOSing us
+ propAnnounceInMeter.Mark(1)
+
+ count := f.announces[notification.origin] + 1
+ if count > hashLimit {
+ log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
+ propAnnounceDOSMeter.Mark(1)
+ break
+ }
+ // If we have a valid block number, check that it's potentially useful
+ if notification.number > 0 {
+ if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
+ log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
+ propAnnounceDropMeter.Mark(1)
+ break
+ }
+ }
+ // All is well, schedule the announce if block's not yet downloading
+ if _, ok := f.fetching[notification.hash]; ok {
+ break
+ }
+ if _, ok := f.completing[notification.hash]; ok {
+ break
+ }
+ f.announces[notification.origin] = count
+ f.announced[notification.hash] = append(f.announced[notification.hash], notification)
+ if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
+ f.announceChangeHook(notification.hash, true)
+ }
+ if len(f.announced) == 1 {
+ f.rescheduleFetch(fetchTimer)
+ }
+
+ case op := <-f.inject:
+ // A direct block insertion was requested, try and fill any pending gaps
+ propBroadcastInMeter.Mark(1)
+ f.enqueue(op.origin, op.block)
+
+ case hash := <-f.done:
+ // A pending import finished, remove all traces of the notification
+ f.forgetHash(hash)
+ f.forgetBlock(hash)
+
+ case <-fetchTimer.C:
+ // At least one block's timer ran out, check for needing retrieval
+ request := make(map[string][]common.Hash)
+
+ for hash, announces := range f.announced {
+ if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
+ // Pick a random peer to retrieve from, reset all others
+ announce := announces[rand.Intn(len(announces))]
+ f.forgetHash(hash)
+
+ // If the block still didn't arrive, queue for fetching
+ if f.getBlock(hash) == nil {
+ request[announce.origin] = append(request[announce.origin], hash)
+ f.fetching[hash] = announce
+ }
+ }
+ }
+ // Send out all block header requests
+ for peer, hashes := range request {
+ log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
+
+ // Create a closure of the fetch and schedule in on a new thread
+ fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
+ go func() {
+ if f.fetchingHook != nil {
+ f.fetchingHook(hashes)
+ }
+ for _, hash := range hashes {
+ headerFetchMeter.Mark(1)
+ fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
+ }
+ }()
+ }
+ // Schedule the next fetch if blocks are still pending
+ f.rescheduleFetch(fetchTimer)
+
+ case <-completeTimer.C:
+ // At least one header's timer ran out, retrieve everything
+ request := make(map[string][]common.Hash)
+
+ for hash, announces := range f.fetched {
+ // Pick a random peer to retrieve from, reset all others
+ announce := announces[rand.Intn(len(announces))]
+ f.forgetHash(hash)
+
+ // If the block still didn't arrive, queue for completion
+ if f.getBlock(hash) == nil {
+ request[announce.origin] = append(request[announce.origin], hash)
+ f.completing[hash] = announce
+ }
+ }
+ // Send out all block body requests
+ for peer, hashes := range request {
+ log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
+
+ // Create a closure of the fetch and schedule in on a new thread
+ if f.completingHook != nil {
+ f.completingHook(hashes)
+ }
+ bodyFetchMeter.Mark(int64(len(hashes)))
+ go f.completing[hashes[0]].fetchBodies(hashes)
+ }
+ // Schedule the next fetch if blocks are still pending
+ f.rescheduleComplete(completeTimer)
+
+ case filter := <-f.headerFilter:
+ // Headers arrived from a remote peer. Extract those that were explicitly
+ // requested by the fetcher, and return everything else so it's delivered
+ // to other parts of the system.
+ var task *headerFilterTask
+ select {
+ case task = <-filter:
+ case <-f.quit:
+ return
+ }
+ headerFilterInMeter.Mark(int64(len(task.headers)))
+
+ // Split the batch of headers into unknown ones (to return to the caller),
+ // known incomplete ones (requiring body retrievals) and completed blocks.
+ unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
+ for _, header := range task.headers {
+ hash := header.Hash()
+
+ // Filter fetcher-requested headers from other synchronisation algorithms
+ if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
+ // If the delivered header does not match the promised number, drop the announcer
+ if header.Number.Uint64() != announce.number {
+ log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
+ f.dropPeer(announce.origin)
+ f.forgetHash(hash)
+ continue
+ }
+ // Only keep if not imported by other means
+ if f.getBlock(hash) == nil {
+ announce.header = header
+ announce.time = task.time
+
+ // If the block is empty (header only), short circuit into the final import queue
+ if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
+ log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
+
+ block := types.NewBlockWithHeader(header)
+ block.ReceivedAt = task.time
+
+ complete = append(complete, block)
+ f.completing[hash] = announce
+ continue
+ }
+ // Otherwise add to the list of blocks needing completion
+ incomplete = append(incomplete, announce)
+ } else {
+ log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
+ f.forgetHash(hash)
+ }
+ } else {
+ // Fetcher doesn't know about it, add to the return list
+ unknown = append(unknown, header)
+ }
+ }
+ headerFilterOutMeter.Mark(int64(len(unknown)))
+ select {
+ case filter <- &headerFilterTask{headers: unknown, time: task.time}:
+ case <-f.quit:
+ return
+ }
+ // Schedule the retrieved headers for body completion
+ for _, announce := range incomplete {
+ hash := announce.header.Hash()
+ if _, ok := f.completing[hash]; ok {
+ continue
+ }
+ f.fetched[hash] = append(f.fetched[hash], announce)
+ if len(f.fetched) == 1 {
+ f.rescheduleComplete(completeTimer)
+ }
+ }
+ // Schedule the header-only blocks for import
+ for _, block := range complete {
+ if announce := f.completing[block.Hash()]; announce != nil {
+ f.enqueue(announce.origin, block)
+ }
+ }
+
+ case filter := <-f.bodyFilter:
+ // Block bodies arrived, extract any explicitly requested blocks, return the rest
+ var task *bodyFilterTask
+ select {
+ case task = <-filter:
+ case <-f.quit:
+ return
+ }
+ bodyFilterInMeter.Mark(int64(len(task.transactions)))
+
+ blocks := []*types.Block{}
+ for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
+ // Match up a body to any possible completion request
+ matched := false
+
+ for hash, announce := range f.completing {
+ if f.queued[hash] == nil {
+ txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
+ uncleHash := types.CalcUncleHash(task.uncles[i])
+
+ if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
+ // Mark the body matched, reassemble if still unknown
+ matched = true
+
+ if f.getBlock(hash) == nil {
+ block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
+ block.ReceivedAt = task.time
+
+ blocks = append(blocks, block)
+ } else {
+ f.forgetHash(hash)
+ }
+ }
+ }
+ }
+ if matched {
+ task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
+ task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
+ i--
+ continue
+ }
+ }
+
+ bodyFilterOutMeter.Mark(int64(len(task.transactions)))
+ select {
+ case filter <- task:
+ case <-f.quit:
+ return
+ }
+ // Schedule the retrieved blocks for ordered import
+ for _, block := range blocks {
+ if announce := f.completing[block.Hash()]; announce != nil {
+ f.enqueue(announce.origin, block)
+ }
+ }
+ }
+ }
+}
+
+// rescheduleFetch resets the specified fetch timer to the next announce timeout.
+func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
+ // Short circuit if no blocks are announced
+ if len(f.announced) == 0 {
+ return
+ }
+ // Otherwise find the earliest expiring announcement
+ earliest := time.Now()
+ for _, announces := range f.announced {
+ if earliest.After(announces[0].time) {
+ earliest = announces[0].time
+ }
+ }
+ fetch.Reset(arriveTimeout - time.Since(earliest))
+}
+
+// rescheduleComplete resets the specified completion timer to the next fetch timeout.
+func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
+ // Short circuit if no headers are fetched
+ if len(f.fetched) == 0 {
+ return
+ }
+ // Otherwise find the earliest expiring announcement
+ earliest := time.Now()
+ for _, announces := range f.fetched {
+ if earliest.After(announces[0].time) {
+ earliest = announces[0].time
+ }
+ }
+ complete.Reset(gatherSlack - time.Since(earliest))
+}
+
+// enqueue schedules a new future import operation, if the block to be imported
+// has not yet been seen.
+func (f *Fetcher) enqueue(peer string, block *types.Block) {
+ hash := block.Hash()
+
+ // Ensure the peer isn't DOSing us
+ count := f.queues[peer] + 1
+ if count > blockLimit {
+ log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
+ propBroadcastDOSMeter.Mark(1)
+ f.forgetHash(hash)
+ return
+ }
+ // Discard any past or too distant blocks
+ if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
+ log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
+ propBroadcastDropMeter.Mark(1)
+ f.forgetHash(hash)
+ return
+ }
+ // Schedule the block for future importing
+ if _, ok := f.queued[hash]; !ok {
+ op := &inject{
+ origin: peer,
+ block: block,
+ }
+ f.queues[peer] = count
+ f.queued[hash] = op
+ f.queue.Push(op, -int64(block.NumberU64()))
+ if f.queueChangeHook != nil {
+ f.queueChangeHook(op.block.Hash(), true)
+ }
+ log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
+ }
+}
+
+// insert spawns a new goroutine to run a block insertion into the chain. If the
+// block's number is at the same height as the current import phase, it updates
+// the phase states accordingly.
+func (f *Fetcher) insert(peer string, block *types.Block) {
+ hash := block.Hash()
+
+ // Run the import on a new thread
+ log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
+ go func() {
+ defer func() { f.done <- hash }()
+
+ // If the parent's unknown, abort insertion
+ parent := f.getBlock(block.ParentHash())
+ if parent == nil {
+ log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
+ return
+ }
+ // Quickly validate the header and propagate the block if it passes
+ switch err := f.verifyHeader(block.Header()); err {
+ case nil:
+ // All ok, quickly propagate to our peers
+ propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
+ go f.broadcastBlock(block, true)
+
+ case consensus.ErrFutureBlock:
+ // Weird future block, don't fail, but neither propagate
+
+ default:
+ // Something went very wrong, drop the peer
+ log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
+ f.dropPeer(peer)
+ return
+ }
+ // Run the actual import and log any issues
+ if _, err := f.insertChain(types.Blocks{block}); err != nil {
+ log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
+ return
+ }
+ // If import succeeded, broadcast the block
+ propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
+ go f.broadcastBlock(block, false)
+
+ // Invoke the testing hook if needed
+ if f.importedHook != nil {
+ f.importedHook(block)
+ }
+ }()
+}
+
+// forgetHash removes all traces of a block announcement from the fetcher's
+// internal state.
+func (f *Fetcher) forgetHash(hash common.Hash) {
+ // Remove all pending announces and decrement DOS counters
+ for _, announce := range f.announced[hash] {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] == 0 {
+ delete(f.announces, announce.origin)
+ }
+ }
+ delete(f.announced, hash)
+ if f.announceChangeHook != nil {
+ f.announceChangeHook(hash, false)
+ }
+ // Remove any pending fetches and decrement the DOS counters
+ if announce := f.fetching[hash]; announce != nil {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] == 0 {
+ delete(f.announces, announce.origin)
+ }
+ delete(f.fetching, hash)
+ }
+
+ // Remove any pending completion requests and decrement the DOS counters
+ for _, announce := range f.fetched[hash] {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] == 0 {
+ delete(f.announces, announce.origin)
+ }
+ }
+ delete(f.fetched, hash)
+
+ // Remove any pending completions and decrement the DOS counters
+ if announce := f.completing[hash]; announce != nil {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] == 0 {
+ delete(f.announces, announce.origin)
+ }
+ delete(f.completing, hash)
+ }
+}
+
+// forgetBlock removes all traces of a queued block from the fetcher's internal
+// state.
+func (f *Fetcher) forgetBlock(hash common.Hash) {
+ if insert := f.queued[hash]; insert != nil {
+ f.queues[insert.origin]--
+ if f.queues[insert.origin] == 0 {
+ delete(f.queues, insert.origin)
+ }
+ delete(f.queued, hash)
+ }
+}
diff --git a/dex/fetcher/fetcher_test.go b/dex/fetcher/fetcher_test.go
new file mode 100644
index 000000000..24611a8a0
--- /dev/null
+++ b/dex/fetcher/fetcher_test.go
@@ -0,0 +1,790 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package fetcher
+
+import (
+ "errors"
+ "math/big"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/consensus/ethash"
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/params"
+)
+
+var (
+ testdb = ethdb.NewMemDatabase()
+ testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
+ genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
+ unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil)
+)
+
+// makeChain creates a chain of n blocks starting at and including parent.
+// the returned hash chain is ordered head->parent. In addition, every 3rd block
+// contains a transaction and every 5th an uncle to allow testing correct block
+// reassembly.
+func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
+ blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
+ block.SetCoinbase(common.Address{seed})
+
+ // If the block number is multiple of 3, send a bonus transaction to the miner
+ if parent == genesis && i%3 == 0 {
+ signer := types.MakeSigner(params.TestChainConfig, block.Number())
+ tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
+ if err != nil {
+ panic(err)
+ }
+ block.AddTx(tx)
+ }
+ // If the block number is a multiple of 5, add a bonus uncle to the block
+ if i%5 == 0 {
+ block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
+ }
+ })
+ hashes := make([]common.Hash, n+1)
+ hashes[len(hashes)-1] = parent.Hash()
+ blockm := make(map[common.Hash]*types.Block, n+1)
+ blockm[parent.Hash()] = parent
+ for i, b := range blocks {
+ hashes[len(hashes)-i-2] = b.Hash()
+ blockm[b.Hash()] = b
+ }
+ return hashes, blockm
+}
+
+// fetcherTester is a test simulator for mocking out local block chain.
+type fetcherTester struct {
+ fetcher *Fetcher
+
+ hashes []common.Hash // Hash chain belonging to the tester
+ blocks map[common.Hash]*types.Block // Blocks belonging to the tester
+ drops map[string]bool // Map of peers dropped by the fetcher
+
+ lock sync.RWMutex
+}
+
+// newTester creates a new fetcher test mocker.
+func newTester() *fetcherTester {
+ tester := &fetcherTester{
+ hashes: []common.Hash{genesis.Hash()},
+ blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
+ drops: make(map[string]bool),
+ }
+ tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
+ tester.fetcher.Start()
+
+ return tester
+}
+
+// getBlock retrieves a block from the tester's block chain.
+func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+
+ return f.blocks[hash]
+}
+
+// verifyHeader is a nop placeholder for the block header verification.
+func (f *fetcherTester) verifyHeader(header *types.Header) error {
+ return nil
+}
+
+// broadcastBlock is a nop placeholder for the block broadcasting.
+func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
+}
+
+// chainHeight retrieves the current height (block number) of the chain.
+func (f *fetcherTester) chainHeight() uint64 {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+
+ return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
+}
+
+// insertChain injects a new blocks into the simulated chain.
+func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ for i, block := range blocks {
+ // Make sure the parent in known
+ if _, ok := f.blocks[block.ParentHash()]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ // Discard any new blocks if the same height already exists
+ if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
+ return i, nil
+ }
+ // Otherwise build our current chain
+ f.hashes = append(f.hashes, block.Hash())
+ f.blocks[block.Hash()] = block
+ }
+ return 0, nil
+}
+
+// dropPeer is an emulator for the peer removal, simply accumulating the various
+// peers dropped by the fetcher.
+func (f *fetcherTester) dropPeer(peer string) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ f.drops[peer] = true
+}
+
+// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
+func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
+ closure := make(map[common.Hash]*types.Block)
+ for hash, block := range blocks {
+ closure[hash] = block
+ }
+ // Create a function that return a header from the closure
+ return func(hash common.Hash) error {
+ // Gather the blocks to return
+ headers := make([]*types.Header, 0, 1)
+ if block, ok := closure[hash]; ok {
+ headers = append(headers, block.Header())
+ }
+ // Return on a new thread
+ go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
+
+ return nil
+ }
+}
+
+// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
+func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
+ closure := make(map[common.Hash]*types.Block)
+ for hash, block := range blocks {
+ closure[hash] = block
+ }
+ // Create a function that returns blocks from the closure
+ return func(hashes []common.Hash) error {
+ // Gather the block bodies to return
+ transactions := make([][]*types.Transaction, 0, len(hashes))
+ uncles := make([][]*types.Header, 0, len(hashes))
+
+ for _, hash := range hashes {
+ if block, ok := closure[hash]; ok {
+ transactions = append(transactions, block.Transactions())
+ uncles = append(uncles, block.Uncles())
+ }
+ }
+ // Return on a new thread
+ go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
+
+ return nil
+ }
+}
+
+// verifyFetchingEvent verifies that one single event arrive on a fetching channel.
+func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
+ if arrive {
+ select {
+ case <-fetching:
+ case <-time.After(time.Second):
+ t.Fatalf("fetching timeout")
+ }
+ } else {
+ select {
+ case <-fetching:
+ t.Fatalf("fetching invoked")
+ case <-time.After(10 * time.Millisecond):
+ }
+ }
+}
+
+// verifyCompletingEvent verifies that one single event arrive on an completing channel.
+func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
+ if arrive {
+ select {
+ case <-completing:
+ case <-time.After(time.Second):
+ t.Fatalf("completing timeout")
+ }
+ } else {
+ select {
+ case <-completing:
+ t.Fatalf("completing invoked")
+ case <-time.After(10 * time.Millisecond):
+ }
+ }
+}
+
+// verifyImportEvent verifies that one single event arrive on an import channel.
+func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
+ if arrive {
+ select {
+ case <-imported:
+ case <-time.After(time.Second):
+ t.Fatalf("import timeout")
+ }
+ } else {
+ select {
+ case <-imported:
+ t.Fatalf("import invoked")
+ case <-time.After(10 * time.Millisecond):
+ }
+ }
+}
+
+// verifyImportCount verifies that exactly count number of events arrive on an
+// import hook channel.
+func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
+ for i := 0; i < count; i++ {
+ select {
+ case <-imported:
+ case <-time.After(time.Second):
+ t.Fatalf("block %d: import timeout", i+1)
+ }
+ }
+ verifyImportDone(t, imported)
+}
+
+// verifyImportDone verifies that no more events are arriving on an import channel.
+func verifyImportDone(t *testing.T, imported chan *types.Block) {
+ select {
+ case <-imported:
+ t.Fatalf("extra block imported")
+ case <-time.After(50 * time.Millisecond):
+ }
+}
+
+// Tests that a fetcher accepts block announcements and initiates retrievals for
+// them, successfully importing into the local chain.
+func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
+func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
+func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
+
+func testSequentialAnnouncements(t *testing.T, protocol int) {
+ // Create a chain of blocks to import
+ targetBlocks := 4 * hashLimit
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ // Iteratively announce blocks until all are imported
+ imported := make(chan *types.Block)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ for i := len(hashes) - 2; i >= 0; i-- {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ verifyImportEvent(t, imported, true)
+ }
+ verifyImportDone(t, imported)
+}
+
+// Tests that if blocks are announced by multiple peers (or even the same buggy
+// peer), they will only get downloaded at most once.
+func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
+func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
+func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
+
+func testConcurrentAnnouncements(t *testing.T, protocol int) {
+ // Create a chain of blocks to import
+ targetBlocks := 4 * hashLimit
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+
+ // Assemble a tester with a built in counter for the requests
+ tester := newTester()
+ firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
+ firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
+ secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
+ secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
+
+ counter := uint32(0)
+ firstHeaderWrapper := func(hash common.Hash) error {
+ atomic.AddUint32(&counter, 1)
+ return firstHeaderFetcher(hash)
+ }
+ secondHeaderWrapper := func(hash common.Hash) error {
+ atomic.AddUint32(&counter, 1)
+ return secondHeaderFetcher(hash)
+ }
+ // Iteratively announce blocks until all are imported
+ imported := make(chan *types.Block)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ for i := len(hashes) - 2; i >= 0; i-- {
+ tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
+ tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
+ tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
+ verifyImportEvent(t, imported, true)
+ }
+ verifyImportDone(t, imported)
+
+ // Make sure no blocks were retrieved twice
+ if int(counter) != targetBlocks {
+ t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
+ }
+}
+
+// Tests that announcements arriving while a previous is being fetched still
+// results in a valid import.
+func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
+func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
+func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
+
+func testOverlappingAnnouncements(t *testing.T, protocol int) {
+ // Create a chain of blocks to import
+ targetBlocks := 4 * hashLimit
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ // Iteratively announce blocks, but overlap them continuously
+ overlap := 16
+ imported := make(chan *types.Block, len(hashes)-1)
+ for i := 0; i < overlap; i++ {
+ imported <- nil
+ }
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ for i := len(hashes) - 2; i >= 0; i-- {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ select {
+ case <-imported:
+ case <-time.After(time.Second):
+ t.Fatalf("block %d: import timeout", len(hashes)-i)
+ }
+ }
+ // Wait for all the imports to complete and check count
+ verifyImportCount(t, imported, overlap)
+}
+
+// Tests that announces already being retrieved will not be duplicated.
+func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
+func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
+func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
+
+func testPendingDeduplication(t *testing.T, protocol int) {
+ // Create a hash and corresponding block
+ hashes, blocks := makeChain(1, 0, genesis)
+
+ // Assemble a tester with a built in counter and delayed fetcher
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
+
+ delay := 50 * time.Millisecond
+ counter := uint32(0)
+ headerWrapper := func(hash common.Hash) error {
+ atomic.AddUint32(&counter, 1)
+
+ // Simulate a long running fetch
+ go func() {
+ time.Sleep(delay)
+ headerFetcher(hash)
+ }()
+ return nil
+ }
+ // Announce the same block many times until it's fetched (wait for any pending ops)
+ for tester.getBlock(hashes[0]) == nil {
+ tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
+ time.Sleep(time.Millisecond)
+ }
+ time.Sleep(delay)
+
+ // Check that all blocks were imported and none fetched twice
+ if imported := len(tester.blocks); imported != 2 {
+ t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2)
+ }
+ if int(counter) != 1 {
+ t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
+ }
+}
+
+// Tests that announcements retrieved in a random order are cached and eventually
+// imported when all the gaps are filled in.
+func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
+func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
+func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
+
+func testRandomArrivalImport(t *testing.T, protocol int) {
+ // Create a chain of blocks to import, and choose one to delay
+ targetBlocks := maxQueueDist
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ skip := targetBlocks / 2
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ // Iteratively announce blocks, skipping one entry
+ imported := make(chan *types.Block, len(hashes)-1)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ for i := len(hashes) - 1; i >= 0; i-- {
+ if i != skip {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ time.Sleep(time.Millisecond)
+ }
+ }
+ // Finally announce the skipped entry and check full import
+ tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ verifyImportCount(t, imported, len(hashes)-1)
+}
+
+// Tests that direct block enqueues (due to block propagation vs. hash announce)
+// are correctly schedule, filling and import queue gaps.
+func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
+func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
+func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
+
+func testQueueGapFill(t *testing.T, protocol int) {
+ // Create a chain of blocks to import, and choose one to not announce at all
+ targetBlocks := maxQueueDist
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ skip := targetBlocks / 2
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ // Iteratively announce blocks, skipping one entry
+ imported := make(chan *types.Block, len(hashes)-1)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ for i := len(hashes) - 1; i >= 0; i-- {
+ if i != skip {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ time.Sleep(time.Millisecond)
+ }
+ }
+ // Fill the missing block directly as if propagated
+ tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
+ verifyImportCount(t, imported, len(hashes)-1)
+}
+
+// Tests that blocks arriving from various sources (multiple propagations, hash
+// announces, etc) do not get scheduled for import multiple times.
+func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
+func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
+func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
+
+func testImportDeduplication(t *testing.T, protocol int) {
+ // Create two blocks to import (one for duplication, the other for stalling)
+ hashes, blocks := makeChain(2, 0, genesis)
+
+ // Create the tester and wrap the importer with a counter
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ counter := uint32(0)
+ tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
+ atomic.AddUint32(&counter, uint32(len(blocks)))
+ return tester.insertChain(blocks)
+ }
+ // Instrument the fetching and imported events
+ fetching := make(chan []common.Hash)
+ imported := make(chan *types.Block, len(hashes)-1)
+ tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ // Announce the duplicating block, wait for retrieval, and also propagate directly
+ tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ <-fetching
+
+ tester.fetcher.Enqueue("valid", blocks[hashes[0]])
+ tester.fetcher.Enqueue("valid", blocks[hashes[0]])
+ tester.fetcher.Enqueue("valid", blocks[hashes[0]])
+
+ // Fill the missing block directly as if propagated, and check import uniqueness
+ tester.fetcher.Enqueue("valid", blocks[hashes[1]])
+ verifyImportCount(t, imported, 2)
+
+ if counter != 2 {
+ t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
+ }
+}
+
+// Tests that blocks with numbers much lower or higher than out current head get
+// discarded to prevent wasting resources on useless blocks from faulty peers.
+func TestDistantPropagationDiscarding(t *testing.T) {
+ // Create a long chain to import and define the discard boundaries
+ hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
+ head := hashes[len(hashes)/2]
+
+ low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
+
+ // Create a tester and simulate a head block being the middle of the above chain
+ tester := newTester()
+
+ tester.lock.Lock()
+ tester.hashes = []common.Hash{head}
+ tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+ tester.lock.Unlock()
+
+ // Ensure that a block with a lower number than the threshold is discarded
+ tester.fetcher.Enqueue("lower", blocks[hashes[low]])
+ time.Sleep(10 * time.Millisecond)
+ if !tester.fetcher.queue.Empty() {
+ t.Fatalf("fetcher queued stale block")
+ }
+ // Ensure that a block with a higher number than the threshold is discarded
+ tester.fetcher.Enqueue("higher", blocks[hashes[high]])
+ time.Sleep(10 * time.Millisecond)
+ if !tester.fetcher.queue.Empty() {
+ t.Fatalf("fetcher queued future block")
+ }
+}
+
+// Tests that announcements with numbers much lower or higher than out current
+// head get discarded to prevent wasting resources on useless blocks from faulty
+// peers.
+func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
+func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
+func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
+
+func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
+ // Create a long chain to import and define the discard boundaries
+ hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
+ head := hashes[len(hashes)/2]
+
+ low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
+
+ // Create a tester and simulate a head block being the middle of the above chain
+ tester := newTester()
+
+ tester.lock.Lock()
+ tester.hashes = []common.Hash{head}
+ tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+ tester.lock.Unlock()
+
+ headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
+
+ fetching := make(chan struct{}, 2)
+ tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
+
+ // Ensure that a block with a lower number than the threshold is discarded
+ tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ select {
+ case <-time.After(50 * time.Millisecond):
+ case <-fetching:
+ t.Fatalf("fetcher requested stale header")
+ }
+ // Ensure that a block with a higher number than the threshold is discarded
+ tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ select {
+ case <-time.After(50 * time.Millisecond):
+ case <-fetching:
+ t.Fatalf("fetcher requested future header")
+ }
+}
+
+// Tests that peers announcing blocks with invalid numbers (i.e. not matching
+// the headers provided afterwards) get dropped as malicious.
+func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
+func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
+func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
+
+func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
+ // Create a single block to import and check numbers against
+ hashes, blocks := makeChain(1, 0, genesis)
+
+ tester := newTester()
+ badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
+ badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
+
+ imported := make(chan *types.Block)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ // Announce a block with a bad number, check for immediate drop
+ tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
+ verifyImportEvent(t, imported, false)
+
+ tester.lock.RLock()
+ dropped := tester.drops["bad"]
+ tester.lock.RUnlock()
+
+ if !dropped {
+ t.Fatalf("peer with invalid numbered announcement not dropped")
+ }
+
+ goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
+ goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
+ // Make sure a good announcement passes without a drop
+ tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
+ verifyImportEvent(t, imported, true)
+
+ tester.lock.RLock()
+ dropped = tester.drops["good"]
+ tester.lock.RUnlock()
+
+ if dropped {
+ t.Fatalf("peer with valid numbered announcement dropped")
+ }
+ verifyImportDone(t, imported)
+}
+
+// Tests that if a block is empty (i.e. header only), no body request should be
+// made, and instead the header should be assembled into a whole block in itself.
+func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
+func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
+func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
+
+func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
+ // Create a chain of blocks to import
+ hashes, blocks := makeChain(32, 0, genesis)
+
+ tester := newTester()
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ // Add a monitoring hook for all internal events
+ fetching := make(chan []common.Hash)
+ tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
+
+ completing := make(chan []common.Hash)
+ tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
+
+ imported := make(chan *types.Block)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+
+ // Iteratively announce blocks until all are imported
+ for i := len(hashes) - 2; i >= 0; i-- {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+
+ // All announces should fetch the header
+ verifyFetchingEvent(t, fetching, true)
+
+ // Only blocks with data contents should request bodies
+ verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
+
+ // Irrelevant of the construct, import should succeed
+ verifyImportEvent(t, imported, true)
+ }
+ verifyImportDone(t, imported)
+}
+
+// Tests that a peer is unable to use unbounded memory with sending infinite
+// block announcements to a node, but that even in the face of such an attack,
+// the fetcher remains operational.
+func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
+func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
+func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
+
+func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
+ // Create a tester with instrumented import hooks
+ tester := newTester()
+
+ imported, announces := make(chan *types.Block), int32(0)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+ tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
+ if added {
+ atomic.AddInt32(&announces, 1)
+ } else {
+ atomic.AddInt32(&announces, -1)
+ }
+ }
+ // Create a valid chain and an infinite junk chain
+ targetBlocks := hashLimit + 2*maxQueueDist
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
+
+ attack, _ := makeChain(targetBlocks, 0, unknownBlock)
+ attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
+ attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
+
+ // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
+ for i := 0; i < len(attack); i++ {
+ if i < maxQueueDist {
+ tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
+ }
+ tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
+ }
+ if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
+ t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
+ }
+ // Wait for fetches to complete
+ verifyImportCount(t, imported, maxQueueDist)
+
+ // Feed the remaining valid hashes to ensure DOS protection state remains clean
+ for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
+ tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
+ verifyImportEvent(t, imported, true)
+ }
+ verifyImportDone(t, imported)
+}
+
+// Tests that blocks sent to the fetcher (either through propagation or via hash
+// announces and retrievals) don't pile up indefinitely, exhausting available
+// system memory.
+func TestBlockMemoryExhaustionAttack(t *testing.T) {
+ // Create a tester with instrumented import hooks
+ tester := newTester()
+
+ imported, enqueued := make(chan *types.Block), int32(0)
+ tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
+ tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
+ if added {
+ atomic.AddInt32(&enqueued, 1)
+ } else {
+ atomic.AddInt32(&enqueued, -1)
+ }
+ }
+ // Create a valid chain and a batch of dangling (but in range) blocks
+ targetBlocks := hashLimit + 2*maxQueueDist
+ hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ attack := make(map[common.Hash]*types.Block)
+ for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
+ hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
+ for _, hash := range hashes[:maxQueueDist-2] {
+ attack[hash] = blocks[hash]
+ }
+ }
+ // Try to feed all the attacker blocks make sure only a limited batch is accepted
+ for _, block := range attack {
+ tester.fetcher.Enqueue("attacker", block)
+ }
+ time.Sleep(200 * time.Millisecond)
+ if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
+ t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
+ }
+ // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
+ for i := 0; i < maxQueueDist-1; i++ {
+ tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
+ }
+ time.Sleep(100 * time.Millisecond)
+ if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
+ t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
+ }
+ // Insert the missing piece (and sanity check the import)
+ tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
+ verifyImportCount(t, imported, maxQueueDist)
+
+ // Insert the remaining blocks in chunks to ensure clean DOS protection
+ for i := maxQueueDist; i < len(hashes)-1; i++ {
+ tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
+ verifyImportEvent(t, imported, true)
+ }
+ verifyImportDone(t, imported)
+}
diff --git a/dex/fetcher/metrics.go b/dex/fetcher/metrics.go
new file mode 100644
index 000000000..23b670549
--- /dev/null
+++ b/dex/fetcher/metrics.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the metrics collected by the fetcher.
+
+package fetcher
+
+import (
+ "github.com/dexon-foundation/dexon/metrics"
+)
+
+var (
+ propAnnounceInMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/in", nil)
+ propAnnounceOutTimer = metrics.NewRegisteredTimer("dex/fetcher/prop/announces/out", nil)
+ propAnnounceDropMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/drop", nil)
+ propAnnounceDOSMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/dos", nil)
+
+ propBroadcastInMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/in", nil)
+ propBroadcastOutTimer = metrics.NewRegisteredTimer("dex/fetcher/prop/broadcasts/out", nil)
+ propBroadcastDropMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/drop", nil)
+ propBroadcastDOSMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/dos", nil)
+
+ headerFetchMeter = metrics.NewRegisteredMeter("dex/fetcher/fetch/headers", nil)
+ bodyFetchMeter = metrics.NewRegisteredMeter("dex/fetcher/fetch/bodies", nil)
+
+ headerFilterInMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/headers/in", nil)
+ headerFilterOutMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/headers/out", nil)
+ bodyFilterInMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/bodies/in", nil)
+ bodyFilterOutMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/bodies/out", nil)
+)