diff options
Diffstat (limited to 'dex')
-rw-r--r-- | dex/downloader/api.go | 166 | ||||
-rw-r--r-- | dex/downloader/downloader.go | 1684 | ||||
-rw-r--r-- | dex/downloader/downloader_test.go | 1481 | ||||
-rw-r--r-- | dex/downloader/events.go | 21 | ||||
-rw-r--r-- | dex/downloader/fakepeer.go | 161 | ||||
-rw-r--r-- | dex/downloader/metrics.go | 43 | ||||
-rw-r--r-- | dex/downloader/modes.go | 73 | ||||
-rw-r--r-- | dex/downloader/peer.go | 573 | ||||
-rw-r--r-- | dex/downloader/queue.go | 885 | ||||
-rw-r--r-- | dex/downloader/statesync.go | 484 | ||||
-rw-r--r-- | dex/downloader/testchain_test.go | 221 | ||||
-rw-r--r-- | dex/downloader/types.go | 79 | ||||
-rw-r--r-- | dex/fetcher/fetcher.go | 736 | ||||
-rw-r--r-- | dex/fetcher/fetcher_test.go | 790 | ||||
-rw-r--r-- | dex/fetcher/metrics.go | 43 |
15 files changed, 7440 insertions, 0 deletions
diff --git a/dex/downloader/api.go b/dex/downloader/api.go new file mode 100644 index 000000000..721818e75 --- /dev/null +++ b/dex/downloader/api.go @@ -0,0 +1,166 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "context" + "sync" + + ethereum "github.com/dexon-foundation/dexon" + "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/rpc" +) + +// PublicDownloaderAPI provides an API which gives information about the current synchronisation status. +// It offers only methods that operates on data that can be available to anyone without security risks. +type PublicDownloaderAPI struct { + d *Downloader + mux *event.TypeMux + installSyncSubscription chan chan interface{} + uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest +} + +// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that +// listens for events from the downloader through the global event mux. In case it receives one of +// these events it broadcasts it to all syncing subscriptions that are installed through the +// installSyncSubscription channel. +func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI { + api := &PublicDownloaderAPI{ + d: d, + mux: m, + installSyncSubscription: make(chan chan interface{}), + uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), + } + + go api.eventLoop() + + return api +} + +// eventLoop runs a loop until the event mux closes. It will install and uninstall new +// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions. +func (api *PublicDownloaderAPI) eventLoop() { + var ( + sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + syncSubscriptions = make(map[chan interface{}]struct{}) + ) + + for { + select { + case i := <-api.installSyncSubscription: + syncSubscriptions[i] = struct{}{} + case u := <-api.uninstallSyncSubscription: + delete(syncSubscriptions, u.c) + close(u.uninstalled) + case event := <-sub.Chan(): + if event == nil { + return + } + + var notification interface{} + switch event.Data.(type) { + case StartEvent: + notification = &SyncingResult{ + Syncing: true, + Status: api.d.Progress(), + } + case DoneEvent, FailedEvent: + notification = false + } + // broadcast + for c := range syncSubscriptions { + c <- notification + } + } + } +} + +// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. +func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + statuses := make(chan interface{}) + sub := api.SubscribeSyncStatus(statuses) + + for { + select { + case status := <-statuses: + notifier.Notify(rpcSub.ID, status) + case <-rpcSub.Err(): + sub.Unsubscribe() + return + case <-notifier.Closed(): + sub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// SyncingResult provides information about the current synchronisation status for this node. +type SyncingResult struct { + Syncing bool `json:"syncing"` + Status ethereum.SyncProgress `json:"status"` +} + +// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop. +type uninstallSyncSubscriptionRequest struct { + c chan interface{} + uninstalled chan interface{} +} + +// SyncStatusSubscription represents a syncing subscription. +type SyncStatusSubscription struct { + api *PublicDownloaderAPI // register subscription in event loop of this api instance + c chan interface{} // channel where events are broadcasted to + unsubOnce sync.Once // make sure unsubscribe logic is executed once +} + +// Unsubscribe uninstalls the subscription from the DownloadAPI event loop. +// The status channel that was passed to subscribeSyncStatus isn't used anymore +// after this method returns. +func (s *SyncStatusSubscription) Unsubscribe() { + s.unsubOnce.Do(func() { + req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})} + s.api.uninstallSyncSubscription <- &req + + for { + select { + case <-s.c: + // drop new status events until uninstall confirmation + continue + case <-req.uninstalled: + return + } + } + }) +} + +// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates. +// The given channel must receive interface values, the result can either +func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription { + api.installSyncSubscription <- status + return &SyncStatusSubscription{api: api, c: status} +} diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go new file mode 100644 index 000000000..0383a3709 --- /dev/null +++ b/dex/downloader/downloader.go @@ -0,0 +1,1684 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package downloader contains the manual full chain synchronisation. +package downloader + +import ( + "errors" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + ethereum "github.com/dexon-foundation/dexon" + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/metrics" + "github.com/dexon-foundation/dexon/params" +) + +var ( + MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly + MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request + MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request + MaxStateFetch = 384 // Amount of node state values to allow fetching per request + + MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation + rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests + rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests + rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value + ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion + ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts + + qosTuningPeers = 5 // Number of peers to tune based on (best peers) + qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence + qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value + + maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxHeadersProcess = 2048 // Number of header download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain + + reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection + reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs + + fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync +) + +var ( + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errNoPeers = errors.New("no peers to keep download active") + errTimeout = errors.New("timeout") + errEmptyHeaderSet = errors.New("empty header set by peer") + errPeersUnavailable = errors.New("no peers available or all tried for download") + errInvalidAncestor = errors.New("retrieved ancestor is invalid") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errInvalidBlock = errors.New("retrieved block is invalid") + errInvalidBody = errors.New("retrieved block body is invalid") + errInvalidReceipt = errors.New("retrieved receipt is invalid") + errCancelBlockFetch = errors.New("block download canceled (requested)") + errCancelHeaderFetch = errors.New("block header download canceled (requested)") + errCancelBodyFetch = errors.New("block body download canceled (requested)") + errCancelReceiptFetch = errors.New("receipt download canceled (requested)") + errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelHeaderProcessing = errors.New("header processing canceled (requested)") + errCancelContentProcessing = errors.New("content processing canceled (requested)") + errNoSyncActive = errors.New("no sync active") + errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)") +) + +type Downloader struct { + mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) + mux *event.TypeMux // Event multiplexer to announce sync operation events + + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database + + rttEstimate uint64 // Round trip time to target for download requests + rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) + + // Statistics + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats + syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + + lightchain LightChain + blockchain BlockChain + + // Callbacks + dropPeer peerDropFn // Drops a peer for misbehaving + + // Status + synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing + synchronising int32 + notified int32 + committed int32 + + // Channels + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + + // Cancellation and termination + cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) + cancelCh chan struct{} // Channel to cancel mid-flight syncs + cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers + cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited. + + quitCh chan struct{} // Quit channel to signal termination + quitLock sync.RWMutex // Lock to prevent double closes + + // Testing hooks + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run + bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch + receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch + chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) +} + +// LightChain encapsulates functions required to synchronise a light chain. +type LightChain interface { + // HasHeader verifies a header's presence in the local chain. + HasHeader(common.Hash, uint64) bool + + // GetHeaderByHash retrieves a header from the local chain. + GetHeaderByHash(common.Hash) *types.Header + + // CurrentHeader retrieves the head header from the local chain. + CurrentHeader() *types.Header + + // GetTd returns the total difficulty of a local block. + GetTd(common.Hash, uint64) *big.Int + + // InsertHeaderChain inserts a batch of headers into the local chain. + InsertHeaderChain([]*types.Header, int) (int, error) + + // Rollback removes a few recently added elements from the local chain. + Rollback([]common.Hash) +} + +// BlockChain encapsulates functions required to sync a (full or fast) blockchain. +type BlockChain interface { + LightChain + + // HasBlock verifies a block's presence in the local chain. + HasBlock(common.Hash, uint64) bool + + // GetBlockByHash retrieves a block from the local chain. + GetBlockByHash(common.Hash) *types.Block + + // CurrentBlock retrieves the head block from the local chain. + CurrentBlock() *types.Block + + // CurrentFastBlock retrieves the head fast block from the local chain. + CurrentFastBlock() *types.Block + + // FastSyncCommitHead directly commits the head block to a certain entity. + FastSyncCommitHead(common.Hash) error + + // InsertChain inserts a batch of blocks into the local chain. + InsertChain(types.Blocks) (int, error) + + // InsertReceiptChain inserts a batch of receipts into the local chain. + InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) +} + +// New creates a new downloader to fetch hashes and blocks from remote peers. +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { + if lightchain == nil { + lightchain = chain + } + + dl := &Downloader{ + mode: mode, + stateDB: stateDb, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), + stateCh: make(chan dataPack), + stateSyncStart: make(chan *stateSync), + syncStatsState: stateSyncStats{ + processed: rawdb.ReadFastTrieProgress(stateDb), + }, + trackStateReq: make(chan *stateReq), + } + go dl.qosTuner() + go dl.stateFetcher() + return dl +} + +// Progress retrieves the synchronisation boundaries, specifically the origin +// block where synchronisation started at (may have failed/suspended); the block +// or header sync is currently at; and the latest known block which the sync targets. +// +// In addition, during the state download phase of fast synchronisation the number +// of processed and the total number of known states are also returned. Otherwise +// these are zero. +func (d *Downloader) Progress() ethereum.SyncProgress { + // Lock the current stats and return the progress + d.syncStatsLock.RLock() + defer d.syncStatsLock.RUnlock() + + current := uint64(0) + switch d.mode { + case FullSync: + current = d.blockchain.CurrentBlock().NumberU64() + case FastSync: + current = d.blockchain.CurrentFastBlock().NumberU64() + case LightSync: + current = d.lightchain.CurrentHeader().Number.Uint64() + } + return ethereum.SyncProgress{ + StartingBlock: d.syncStatsChainOrigin, + CurrentBlock: current, + HighestBlock: d.syncStatsChainHeight, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, + } +} + +// Synchronising returns whether the downloader is currently retrieving blocks. +func (d *Downloader) Synchronising() bool { + return atomic.LoadInt32(&d.synchronising) > 0 +} + +// RegisterPeer injects a new download peer into the set of block source to be +// used for fetching hashes and blocks from. +func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error { + logger := log.New("peer", id) + logger.Trace("Registering sync peer") + if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { + logger.Error("Failed to register sync peer", "err", err) + return err + } + d.qosReduceConfidence() + + return nil +} + +// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer. +func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error { + return d.RegisterPeer(id, version, &lightPeerWrapper{peer}) +} + +// UnregisterPeer remove a peer from the known list, preventing any action from +// the specified peer. An effort is also made to return any pending fetches into +// the queue. +func (d *Downloader) UnregisterPeer(id string) error { + // Unregister the peer from the active peer set and revoke any fetch tasks + logger := log.New("peer", id) + logger.Trace("Unregistering sync peer") + if err := d.peers.Unregister(id); err != nil { + logger.Error("Failed to unregister sync peer", "err", err) + return err + } + d.queue.Revoke(id) + + // If this peer was the master peer, abort sync immediately + d.cancelLock.RLock() + master := id == d.cancelPeer + d.cancelLock.RUnlock() + + if master { + d.cancel() + } + return nil +} + +// Synchronise tries to sync up our local block chain with a remote peer, both +// adding various sanity checks as well as wrapping it with various log entries. +func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { + err := d.synchronise(id, head, td, mode) + switch err { + case nil: + case errBusy: + + case errTimeout, errBadPeer, errStallingPeer, + errEmptyHeaderSet, errPeersUnavailable, errTooOld, + errInvalidAncestor, errInvalidChain: + log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) + } else { + d.dropPeer(id) + } + default: + log.Warn("Synchronisation failed, retrying", "err", err) + } + return err +} + +// synchronise will select the peer and use it for synchronising. If an empty string is given +// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the +// checks fail an error will be returned. This method is synchronous +func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { + // Mock out the synchronisation if testing + if d.synchroniseMock != nil { + return d.synchroniseMock(id, hash) + } + // Make sure only one goroutine is ever allowed past this point at once + if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { + return errBusy + } + defer atomic.StoreInt32(&d.synchronising, 0) + + // Post a user notification of the sync (only once per session) + if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { + log.Info("Block synchronisation started") + } + // Reset the queue, peer set and wake channels to clean any internal leftover state + d.queue.Reset() + d.peers.Reset() + + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case <-ch: + default: + } + } + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { + for empty := false; !empty; { + select { + case <-ch: + default: + empty = true + } + } + } + for empty := false; !empty; { + select { + case <-d.headerProcCh: + default: + empty = true + } + } + // Create cancel channel for aborting mid-flight and mark the master peer + d.cancelLock.Lock() + d.cancelCh = make(chan struct{}) + d.cancelPeer = id + d.cancelLock.Unlock() + + defer d.Cancel() // No matter what, we can't leave the cancel channel open + + // Set the requested sync mode, unless it's forbidden + d.mode = mode + + // Retrieve the origin peer and initiate the downloading process + p := d.peers.Peer(id) + if p == nil { + return errUnknownPeer + } + return d.syncWithPeer(p, hash, td) +} + +// syncWithPeer starts a block synchronization based on the hash chain from the +// specified peer and head hash. +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { + d.mux.Post(StartEvent{}) + defer func() { + // reset on error + if err != nil { + d.mux.Post(FailedEvent{err}) + } else { + d.mux.Post(DoneEvent{}) + } + }() + if p.version < 62 { + return errTooOld + } + + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) + defer func(start time.Time) { + log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) + }(time.Now()) + + // Look up the sync boundaries: the common ancestor and the target block + latest, err := d.fetchHeight(p) + if err != nil { + return err + } + height := latest.Number.Uint64() + + origin, err := d.findAncestor(p, height) + if err != nil { + return err + } + d.syncStatsLock.Lock() + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin + } + d.syncStatsChainHeight = height + d.syncStatsLock.Unlock() + + // Ensure our origin point is below any fast sync pivot point + pivot := uint64(0) + if d.mode == FastSync { + if height <= uint64(fsMinFullBlocks) { + origin = 0 + } else { + pivot = height - uint64(fsMinFullBlocks) + if pivot <= origin { + origin = pivot - 1 + } + } + } + d.committed = 1 + if d.mode == FastSync && pivot != 0 { + d.committed = 0 + } + // Initiate the sync using a concurrent header and content retrieval algorithm + d.queue.Prepare(origin+1, d.mode) + if d.syncInitHook != nil { + d.syncInitHook(origin, height) + } + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, pivot, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + return d.spawnSync(fetchers) +} + +// spawnSync runs d.process and all given fetcher functions to completion in +// separate goroutines, returning the first error that appears. +func (d *Downloader) spawnSync(fetchers []func() error) error { + errc := make(chan error, len(fetchers)) + d.cancelWg.Add(len(fetchers)) + for _, fn := range fetchers { + fn := fn + go func() { defer d.cancelWg.Done(); errc <- fn() }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { + // Close the queue when all fetchers have exited. + // This will cause the block processor to end when + // it has processed the queue. + d.queue.Close() + } + if err = <-errc; err != nil { + break + } + } + d.queue.Close() + d.Cancel() + return err +} + +// cancel aborts all of the operations and resets the queue. However, cancel does +// not wait for the running download goroutines to finish. This method should be +// used when cancelling the downloads from inside the downloader. +func (d *Downloader) cancel() { + // Close the current cancel channel + d.cancelLock.Lock() + if d.cancelCh != nil { + select { + case <-d.cancelCh: + // Channel was already closed + default: + close(d.cancelCh) + } + } + d.cancelLock.Unlock() +} + +// Cancel aborts all of the operations and waits for all download goroutines to +// finish before returning. +func (d *Downloader) Cancel() { + d.cancel() + d.cancelWg.Wait() +} + +// Terminate interrupts the downloader, canceling all pending operations. +// The downloader cannot be reused after calling Terminate. +func (d *Downloader) Terminate() { + // Close the termination channel (make sure double close is allowed) + d.quitLock.Lock() + select { + case <-d.quitCh: + default: + close(d.quitCh) + } + d.quitLock.Unlock() + + // Cancel any pending download requests + d.Cancel() +} + +// fetchHeight retrieves the head header of the remote peer to aid in estimating +// the total time a pending synchronisation would take. +func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { + p.log.Debug("Retrieving remote chain height") + + // Request the advertised remote head block and wait for the response + head, _ := p.peer.Head() + go p.peer.RequestHeadersByHash(head, 1, 0, false) + + ttl := d.requestTTL() + timeout := time.After(ttl) + for { + select { + case <-d.cancelCh: + return nil, errCancelBlockFetch + + case packet := <-d.headerCh: + // Discard anything not from the origin peer + if packet.PeerId() != p.id { + log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) + break + } + // Make sure the peer actually gave something valid + headers := packet.(*headerPack).headers + if len(headers) != 1 { + p.log.Debug("Multiple headers for single request", "headers", len(headers)) + return nil, errBadPeer + } + head := headers[0] + p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) + return head, nil + + case <-timeout: + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) + return nil, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore + } + } +} + +// findAncestor tries to locate the common ancestor link of the local chain and +// a remote peers blockchain. In the general case when our node was in sync and +// on the correct chain, checking the top N links should already get us a match. +// In the rare scenario when we ended up on a long reorganisation (i.e. none of +// the head links match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { + // Figure out the valid ancestor range to prevent rewrite attacks + floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() + + if d.mode == FullSync { + ceil = d.blockchain.CurrentBlock().NumberU64() + } else if d.mode == FastSync { + ceil = d.blockchain.CurrentFastBlock().NumberU64() + } + if ceil >= MaxForkAncestry { + floor = int64(ceil - MaxForkAncestry) + } + p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) + + // Request the topmost blocks to short circuit binary ancestor lookup + head := ceil + if head > height { + head = height + } + from := int64(head) - int64(MaxHeaderFetch) + if from < 0 { + from = 0 + } + // Span out with 15 block gaps into the future to catch bad head reports + limit := 2 * MaxHeaderFetch / 16 + count := 1 + int((int64(ceil)-from)/16) + if count > limit { + count = limit + } + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) + + // Wait for the remote response to the head fetch + number, hash := uint64(0), common.Hash{} + + ttl := d.requestTTL() + timeout := time.After(ttl) + + for finished := false; !finished; { + select { + case <-d.cancelCh: + return 0, errCancelHeaderFetch + + case packet := <-d.headerCh: + // Discard anything not from the origin peer + if packet.PeerId() != p.id { + log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) + break + } + // Make sure the peer actually gave something valid + headers := packet.(*headerPack).headers + if len(headers) == 0 { + p.log.Warn("Empty head header set") + return 0, errEmptyHeaderSet + } + // Make sure the peer's reply conforms to the request + for i := 0; i < len(headers); i++ { + if number := headers[i].Number.Int64(); number != from+int64(i)*16 { + p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) + return 0, errInvalidChain + } + } + // Check if a common ancestor was found + finished = true + for i := len(headers) - 1; i >= 0; i-- { + // Skip any headers that underflow/overflow our requested set + if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil { + continue + } + // Otherwise check if we already know the header or not + h := headers[i].Hash() + n := headers[i].Number.Uint64() + if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) { + number, hash = n, h + + // If every header is known, even future ones, the peer straight out lied about its head + if number > height && i == limit-1 { + p.log.Warn("Lied about chain head", "reported", height, "found", number) + return 0, errStallingPeer + } + break + } + } + + case <-timeout: + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) + return 0, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore + } + } + // If the head fetch already found an ancestor, return + if hash != (common.Hash{}) { + if int64(number) <= floor { + p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) + return 0, errInvalidAncestor + } + p.log.Debug("Found common ancestor", "number", number, "hash", hash) + return number, nil + } + // Ancestor not found, we need to binary search over our chain + start, end := uint64(0), head + if floor > 0 { + start = uint64(floor) + } + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + ttl := d.requestTTL() + timeout := time.After(ttl) + + go p.peer.RequestHeadersByNumber(check, 1, 0, false) + + // Wait until a reply arrives to this request + for arrived := false; !arrived; { + select { + case <-d.cancelCh: + return 0, errCancelHeaderFetch + + case packer := <-d.headerCh: + // Discard anything not from the origin peer + if packer.PeerId() != p.id { + log.Debug("Received headers from incorrect peer", "peer", packer.PeerId()) + break + } + // Make sure the peer actually gave something valid + headers := packer.(*headerPack).headers + if len(headers) != 1 { + p.log.Debug("Multiple headers for single request", "headers", len(headers)) + return 0, errBadPeer + } + arrived = true + + // Modify the search interval based on the response + h := headers[0].Hash() + n := headers[0].Number.Uint64() + if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) { + end = check + break + } + header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists + if header.Number.Uint64() != check { + p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) + return 0, errBadPeer + } + start = check + hash = h + + case <-timeout: + p.log.Debug("Waiting for search header timed out", "elapsed", ttl) + return 0, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore + } + } + } + // Ensure valid ancestry and return + if int64(start) <= floor { + p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor) + return 0, errInvalidAncestor + } + p.log.Debug("Found common ancestor", "number", start, "hash", hash) + return start, nil +} + +// fetchHeaders keeps retrieving headers concurrently from the number +// requested, until no more are returned, potentially throttling on the way. To +// facilitate concurrency but still protect against malicious nodes sending bad +// headers, we construct a header chain skeleton using the "origin" peer we are +// syncing with, and fill in the missing headers using anyone else. Headers from +// other peers are only accepted if they map cleanly to the skeleton. If no one +// can fill in the skeleton - not even the origin peer - it's assumed invalid and +// the origin is dropped. +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error { + p.log.Debug("Directing header downloads", "origin", from) + defer p.log.Debug("Header download terminated") + + // Create a timeout timer, and the associated header fetcher + skeleton := true // Skeleton assembly phase or finishing up + request := time.Now() // time of the last skeleton fetch request + timeout := time.NewTimer(0) // timer to dump a non-responsive active peer + <-timeout.C // timeout channel should be initially empty + defer timeout.Stop() + + var ttl time.Duration + getHeaders := func(from uint64) { + request = time.Now() + + ttl = d.requestTTL() + timeout.Reset(ttl) + + if skeleton { + p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + } else { + p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) + } + } + // Start pulling the header chain skeleton until all is done + getHeaders(from) + + for { + select { + case <-d.cancelCh: + return errCancelHeaderFetch + + case packet := <-d.headerCh: + // Make sure the active peer is giving us the skeleton headers + if packet.PeerId() != p.id { + log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) + break + } + headerReqTimer.UpdateSince(request) + timeout.Stop() + + // If the skeleton's finished, pull any remaining head headers directly from the origin + if packet.Items() == 0 && skeleton { + skeleton = false + getHeaders(from) + continue + } + // If no more headers are inbound, notify the content fetchers and return + if packet.Items() == 0 { + // Don't abort header fetches while the pivot is downloading + if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { + p.log.Debug("No headers, waiting for pivot commit") + select { + case <-time.After(fsHeaderContCheck): + getHeaders(from) + continue + case <-d.cancelCh: + return errCancelHeaderFetch + } + } + // Pivot done (or not in fast sync) and no more headers, terminate the process + p.log.Debug("No more headers available") + select { + case d.headerProcCh <- nil: + return nil + case <-d.cancelCh: + return errCancelHeaderFetch + } + } + headers := packet.(*headerPack).headers + + // If we received a skeleton batch, resolve internals concurrently + if skeleton { + filled, proced, err := d.fillHeaderSkeleton(from, headers) + if err != nil { + p.log.Debug("Skeleton chain invalid", "err", err) + return errInvalidChain + } + headers = filled[proced:] + from += uint64(proced) + } else { + // If we're closing in on the chain head, but haven't yet reached it, delay + // the last few headers so mini reorgs on the head don't cause invalid hash + // chain errors. + if n := len(headers); n > 0 { + // Retrieve the current head we're at + head := uint64(0) + if d.mode == LightSync { + head = d.lightchain.CurrentHeader().Number.Uint64() + } else { + head = d.blockchain.CurrentFastBlock().NumberU64() + if full := d.blockchain.CurrentBlock().NumberU64(); head < full { + head = full + } + } + // If the head is way older than this batch, delay the last few headers + if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() { + delay := reorgProtHeaderDelay + if delay > n { + delay = n + } + headers = headers[:n-delay] + } + } + } + // Insert all the new headers and fetch the next batch + if len(headers) > 0 { + p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCancelHeaderFetch + } + from += uint64(len(headers)) + getHeaders(from) + } else { + // No headers delivered, or all of them being delayed, sleep a bit and retry + p.log.Trace("All headers delayed, waiting") + select { + case <-time.After(fsHeaderContCheck): + getHeaders(from) + continue + case <-d.cancelCh: + return errCancelHeaderFetch + } + } + + case <-timeout.C: + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id) + break + } + // Header retrieval timed out, consider the peer bad and drop + p.log.Debug("Header request timed out", "elapsed", ttl) + headerTimeoutMeter.Mark(1) + d.dropPeer(p.id) + + // Finish the sync gracefully instead of dumping the gathered data though + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } + } + select { + case d.headerProcCh <- nil: + case <-d.cancelCh: + } + return errBadPeer + } + } +} + +// fillHeaderSkeleton concurrently retrieves headers from all our available peers +// and maps them to the provided skeleton header chain. +// +// Any partial results from the beginning of the skeleton is (if possible) forwarded +// immediately to the header processor to keep the rest of the pipeline full even +// in the case of header stalls. +// +// The method returns the entire filled skeleton and also the number of headers +// already forwarded for processing. +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { + log.Debug("Filling up skeleton", "from", from) + d.queue.ScheduleSkeleton(from, skeleton) + + var ( + deliver = func(packet dataPack) (int, error) { + pack := packet.(*headerPack) + return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) + } + expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } + throttle = func() bool { return false } + reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { + return d.queue.ReserveHeaders(p, count), false, nil + } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } + capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } + ) + err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, + d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, + nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") + + log.Debug("Skeleton fill terminated", "err", err) + + filled, proced := d.queue.RetrieveHeaders() + return filled, proced, err +} + +// fetchBodies iteratively downloads the scheduled block bodies, taking any +// available peers, reserving a chunk of blocks for each, waiting for delivery +// and also periodically checking for timeouts. +func (d *Downloader) fetchBodies(from uint64) error { + log.Debug("Downloading block bodies", "origin", from) + + var ( + deliver = func(packet dataPack) (int, error) { + pack := packet.(*bodyPack) + return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles) + } + expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } + capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } + ) + err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, + d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") + + log.Debug("Block body download terminated", "err", err) + return err +} + +// fetchReceipts iteratively downloads the scheduled block receipts, taking any +// available peers, reserving a chunk of receipts for each, waiting for delivery +// and also periodically checking for timeouts. +func (d *Downloader) fetchReceipts(from uint64) error { + log.Debug("Downloading transaction receipts", "origin", from) + + var ( + deliver = func(packet dataPack) (int, error) { + pack := packet.(*receiptPack) + return d.queue.DeliverReceipts(pack.peerID, pack.receipts) + } + expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } + capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } + ) + err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, + d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") + + log.Debug("Transaction receipt download terminated", "err", err) + return err +} + +// fetchParts iteratively downloads scheduled block parts, taking any available +// peers, reserving a chunk of fetch requests for each, waiting for delivery and +// also periodically checking for timeouts. +// +// As the scheduling/timeout logic mostly is the same for all downloaded data +// types, this method is used by each for data gathering and is instrumented with +// various callbacks to handle the slight differences between processing them. +// +// The instrumentation parameters: +// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer) +// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers) +// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`) +// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed) +// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping) +// - pending: task callback for the number of requests still needing download (detect completion/non-completability) +// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish) +// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use) +// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions) +// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic) +// - fetch: network callback to actually send a particular download request to a physical remote peer +// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer) +// - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping) +// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks +// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) +// - kind: textual label of the type being downloaded to display in log mesages +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), + fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, + idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { + + // Create a ticker to detect expired retrieval tasks + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + update := make(chan struct{}, 1) + + // Prepare the queue and fetch block parts until the block header fetcher's done + finished := false + for { + select { + case <-d.cancelCh: + return errCancel + + case packet := <-deliveryCh: + // If the peer was previously banned and failed to deliver its pack + // in a reasonable time frame, ignore its message. + if peer := d.peers.Peer(packet.PeerId()); peer != nil { + // Deliver the received chunk of data and check chain validity + accepted, err := deliver(packet) + if err == errInvalidChain { + return err + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + setIdle(peer, accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && packet.Items() == 0: + peer.log.Trace("Requested data not delivered", "type", kind) + case err == nil: + peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats()) + default: + peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err) + } + } + // Blocks assembled, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case cont := <-wakeCh: + // The header fetcher sent a continuation flag, check if it's done + if !cont { + finished = true + } + // Headers arrive, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case <-ticker.C: + // Sanity check update the progress + select { + case update <- struct{}{}: + default: + } + + case <-update: + // Short circuit if we lost all our peers + if d.peers.Len() == 0 { + return errNoPeers + } + // Check for fetch request timeouts and demote the responsible peers + for pid, fails := range expire() { + if peer := d.peers.Peer(pid); peer != nil { + // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps + // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times + // out that sync wise we need to get rid of the peer. + // + // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth + // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing + // how response times reacts, to it always requests one more than the minimum (i.e. min 2). + if fails > 2 { + peer.log.Trace("Data delivery timed out", "type", kind) + setIdle(peer, 0) + } else { + peer.log.Debug("Stalling delivery, dropping", "type", kind) + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid) + } else { + d.dropPeer(pid) + } + } + } + } + // If there's nothing more to fetch, wait or terminate + if pending() == 0 { + if !inFlight() && finished { + log.Debug("Data fetching completed", "type", kind) + return nil + } + break + } + // Send a download request to all idle peers, until throttled + progressed, throttled, running := false, false, inFlight() + idles, total := idle() + + for _, peer := range idles { + // Short circuit if throttling activated + if throttle() { + throttled = true + break + } + // Short circuit if there is no more available task. + if pending() == 0 { + break + } + // Reserve a chunk of fetches for a peer. A nil can mean either that + // no more headers are available, or that the peer is known not to + // have them. + request, progress, err := reserve(peer, capacity(peer)) + if err != nil { + return err + } + if progress { + progressed = true + } + if request == nil { + continue + } + if request.From > 0 { + peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From) + } else { + peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) + } + // Fetch the chunk and make sure any errors return the hashes to the queue + if fetchHook != nil { + fetchHook(request.Headers) + } + if err := fetch(peer, request); err != nil { + // Although we could try and make an attempt to fix this, this error really + // means that we've double allocated a fetch task to a peer. If that is the + // case, the internal state of the downloader and the queue is very wrong so + // better hard crash and note the error instead of silently accumulating into + // a much bigger issue. + panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind)) + } + running = true + } + // Make sure that we have peers available for fetching. If all peers have been tried + // and all failed throw an error + if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { + return errPeersUnavailable + } + } + } +} + +// processHeaders takes batches of retrieved headers from an input channel and +// keeps processing and scheduling them into the header chain and downloader's +// queue until the stream ends or a failure occurs. +func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { + // Keep a count of uncertain headers to roll back + rollback := []*types.Header{} + defer func() { + if len(rollback) > 0 { + // Flatten the headers and roll them back + hashes := make([]common.Hash, len(rollback)) + for i, header := range rollback { + hashes[i] = header.Hash() + } + lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 + if d.mode != LightSync { + lastFastBlock = d.blockchain.CurrentFastBlock().Number() + lastBlock = d.blockchain.CurrentBlock().Number() + } + d.lightchain.Rollback(hashes) + curFastBlock, curBlock := common.Big0, common.Big0 + if d.mode != LightSync { + curFastBlock = d.blockchain.CurrentFastBlock().Number() + curBlock = d.blockchain.CurrentBlock().Number() + } + log.Warn("Rolled back headers", "count", len(hashes), + "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), + "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), + "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) + } + }() + + // Wait for batches of headers to process + gotHeaders := false + + for { + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + + case headers := <-d.headerProcCh: + // Terminate header processing if we synced up + if len(headers) == 0 { + // Notify everyone that headers are fully processed + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } + } + // If no headers were retrieved at all, the peer violated its TD promise that it had a + // better chain compared to ours. The only exception is if its promised blocks were + // already imported by other means (e.g. fecher): + // + // R <remote peer>, L <local node>: Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if d.mode != LightSync { + head := d.blockchain.CurrentBlock() + if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { + return errStallingPeer + } + } + // If fast or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if d.mode == FastSync || d.mode == LightSync { + head := d.lightchain.CurrentHeader() + if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + return errStallingPeer + } + } + // Disable any rollback and return + rollback = nil + return nil + } + // Otherwise split the chunk of headers into batches and process them + gotHeaders = true + + for len(headers) > 0 { + // Terminate if something failed in between processing chunks + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + default: + } + // Select the next chunk of headers to import + limit := maxHeadersProcess + if limit > len(headers) { + limit = len(headers) + } + chunk := headers[:limit] + + // In case of header only syncing, validate the chunk immediately + if d.mode == FastSync || d.mode == LightSync { + // Collect the yet unknown headers to mark them as uncertain + unknown := make([]*types.Header, 0, len(headers)) + for _, header := range chunk { + if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { + unknown = append(unknown, header) + } + } + // If we're importing pure headers, verify based on their recentness + frequency := fsHeaderCheckFrequency + if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { + frequency = 1 + } + if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { + // If some headers were inserted, add them too to the rollback list + if n > 0 { + rollback = append(rollback, chunk[:n]...) + } + log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) + return errInvalidChain + } + // All verifications passed, store newly found uncertain headers + rollback = append(rollback, unknown...) + if len(rollback) > fsHeaderSafetyNet { + rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) + } + } + // Unless we're doing light chains, schedule the headers for associated content retrieval + if d.mode == FullSync || d.mode == FastSync { + // If we've reached the allowed number of pending headers, stall a bit + for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + case <-time.After(time.Second): + } + } + // Otherwise insert the headers for content retrieval + inserts := d.queue.Schedule(chunk, origin) + if len(inserts) != len(chunk) { + log.Debug("Stale headers") + return errBadPeer + } + } + headers = headers[limit:] + origin += uint64(limit) + } + + // Update the highest block number we know if a higher one is found. + d.syncStatsLock.Lock() + if d.syncStatsChainHeight < origin { + d.syncStatsChainHeight = origin - 1 + } + d.syncStatsLock.Unlock() + + // Signal the content downloaders of the availablility of new tasks + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- true: + default: + } + } + } + } +} + +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { + for { + results := d.queue.Results(true) + if len(results) == 0 { + return nil + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + // Check for any early termination requests + if len(results) == 0 { + return nil + } + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting downloaded chain", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnum", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, len(results)) + for i, result := range results { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.blockchain.InsertChain(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. This should get us most of + // the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + d.queue.Close() // wake up WaitResults + } + }() + // Figure out the ideal pivot block. Note, that this goalpost may move if the + // sync takes long enough for the chain head to move significantly. + pivot := uint64(0) + if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { + pivot = height - uint64(fsMinFullBlocks) + } + // To cater for moving pivot points, track the pivot block and subsequently + // accumulated download results separately. + var ( + oldPivot *fetchResult // Locked in pivot block, might change eventually + oldTail []*fetchResult // Downloaded content after the pivot + ) + for { + // Wait for the next batch of downloaded data to be available, and if the pivot + // block became stale, move the goalpost + results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness + if len(results) == 0 { + // If pivot sync is done, stop + if oldPivot == nil { + return stateSync.Cancel() + } + // If sync failed, stop + select { + case <-d.cancelCh: + return stateSync.Cancel() + default: + } + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + if oldPivot != nil { + results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) + } + // Split around the pivot block and process the two sides via fast/full sync + if atomic.LoadInt32(&d.committed) == 0 { + latest = results[len(results)-1].Header + if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { + log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) + pivot = height - uint64(fsMinFullBlocks) + } + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + // If new pivot block found, cancel old state retrieval and restart + if oldPivot != P { + stateSync.Cancel() + + stateSync = d.syncState(P.Header.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + d.queue.Close() // wake up WaitResults + } + }() + oldPivot = P + } + // Wait for completion, occasionally checking for pivot staleness + select { + case <-stateSync.done: + if stateSync.err != nil { + return stateSync.err + } + if err := d.commitPivotBlock(P); err != nil { + return err + } + oldPivot = nil + + case <-time.After(time.Second): + oldTail = afterP + continue + } + } + // Fast sync done, pivot commit done, full import + if err := d.importBlockResults(afterP); err != nil { + return err + } + } +} + +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + // Check for any early termination requests + if len(results) == 0 { + return nil + } + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err + } + default: + } + // Retrieve the a batch of results to import + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, len(results)) + receipts := make([]types.Receipts, len(results)) + for i, result := range results { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { + return err + } + if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil { + return err + } + atomic.StoreInt32(&d.committed, 1) + return nil +} + +// DeliverHeaders injects a new batch of block headers received from a remote +// node into the download schedule. +func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { + return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) +} + +// DeliverBodies injects a new batch of block bodies received from a remote node. +func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { + return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) +} + +// DeliverReceipts injects a new batch of receipts received from a remote node. +func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { + return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter) +} + +// DeliverNodeData injects a new batch of node state data received from a remote node. +func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) { + return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter) +} + +// deliver injects a new batch of data received from a remote node. +func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) { + // Update the delivery metrics for both good and failed deliveries + inMeter.Mark(int64(packet.Items())) + defer func() { + if err != nil { + dropMeter.Mark(int64(packet.Items())) + } + }() + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + if cancel == nil { + return errNoSyncActive + } + select { + case destCh <- packet: + return nil + case <-cancel: + return errNoSyncActive + } +} + +// qosTuner is the quality of service tuning loop that occasionally gathers the +// peer latency statistics and updates the estimated request round trip time. +func (d *Downloader) qosTuner() { + for { + // Retrieve the current median RTT and integrate into the previoust target RTT + rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) + atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) + + // A new RTT cycle passed, increase our confidence in the estimated RTT + conf := atomic.LoadUint64(&d.rttConfidence) + conf = conf + (1000000-conf)/2 + atomic.StoreUint64(&d.rttConfidence, conf) + + // Log the new QoS values and sleep until the next RTT + log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) + select { + case <-d.quitCh: + return + case <-time.After(rtt): + } + } +} + +// qosReduceConfidence is meant to be called when a new peer joins the downloader's +// peer set, needing to reduce the confidence we have in out QoS estimates. +func (d *Downloader) qosReduceConfidence() { + // If we have a single peer, confidence is always 1 + peers := uint64(d.peers.Len()) + if peers == 0 { + // Ensure peer connectivity races don't catch us off guard + return + } + if peers == 1 { + atomic.StoreUint64(&d.rttConfidence, 1000000) + return + } + // If we have a ton of peers, don't drop confidence) + if peers >= uint64(qosConfidenceCap) { + return + } + // Otherwise drop the confidence factor + conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers + if float64(conf)/1000000 < rttMinConfidence { + conf = uint64(rttMinConfidence * 1000000) + } + atomic.StoreUint64(&d.rttConfidence, conf) + + rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate)) + log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) +} + +// requestRTT returns the current target round trip time for a download request +// to complete in. +// +// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that +// the downloader tries to adapt queries to the RTT, so multiple RTT values can +// be adapted to, but smaller ones are preferred (stabler download stream). +func (d *Downloader) requestRTT() time.Duration { + return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10 +} + +// requestTTL returns the current timeout allowance for a single download request +// to finish under. +func (d *Downloader) requestTTL() time.Duration { + var ( + rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate)) + conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0 + ) + ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf) + if ttl > ttlLimit { + ttl = ttlLimit + } + return ttl +} diff --git a/dex/downloader/downloader_test.go b/dex/downloader/downloader_test.go new file mode 100644 index 000000000..093b751ca --- /dev/null +++ b/dex/downloader/downloader_test.go @@ -0,0 +1,1481 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "errors" + "fmt" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + ethereum "github.com/dexon-foundation/dexon" + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/trie" +) + +// Reduce some of the parameters to make the tester faster. +func init() { + MaxForkAncestry = uint64(10000) + blockCacheItems = 1024 + fsHeaderContCheck = 500 * time.Millisecond +} + +// downloadTester is a test simulator for mocking out local block chain. +type downloadTester struct { + downloader *Downloader + + genesis *types.Block // Genesis blocks used by the tester and peers + stateDb ethdb.Database // Database used by the tester for syncing from peers + peerDb ethdb.Database // Database of the peers containing all data + peers map[string]*downloadTesterPeer + + ownHashes []common.Hash // Hash chain belonging to the tester + ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester + ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain + + lock sync.RWMutex +} + +// newTester creates a new downloader test mocker. +func newTester() *downloadTester { + tester := &downloadTester{ + genesis: testGenesis, + peerDb: testDB, + peers: make(map[string]*downloadTesterPeer), + ownHashes: []common.Hash{testGenesis.Hash()}, + ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()}, + ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, + ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, + ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, + } + tester.stateDb = ethdb.NewMemDatabase() + tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) + tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) + return tester +} + +// terminate aborts any operations on the embedded downloader and releases all +// held resources. +func (dl *downloadTester) terminate() { + dl.downloader.Terminate() +} + +// sync starts synchronizing with a remote peer, blocking until it completes. +func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { + dl.lock.RLock() + hash := dl.peers[id].chain.headBlock().Hash() + // If no particular TD was requested, load from the peer's blockchain + if td == nil { + td = dl.peers[id].chain.td(hash) + } + dl.lock.RUnlock() + + // Synchronise with the chosen peer and ensure proper cleanup afterwards + err := dl.downloader.synchronise(id, hash, td, mode) + select { + case <-dl.downloader.cancelCh: + // Ok, downloader fully cancelled after sync cycle + default: + // Downloader is still accepting packets, can block a peer up + panic("downloader active post sync cycle") // panic will be caught by tester + } + return err +} + +// HasHeader checks if a header is present in the testers canonical chain. +func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool { + return dl.GetHeaderByHash(hash) != nil +} + +// HasBlock checks if a block is present in the testers canonical chain. +func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool { + return dl.GetBlockByHash(hash) != nil +} + +// GetHeader retrieves a header from the testers canonical chain. +func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.ownHeaders[hash] +} + +// GetBlock retrieves a block from the testers canonical chain. +func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.ownBlocks[hash] +} + +// CurrentHeader retrieves the current head header from the canonical chain. +func (dl *downloadTester) CurrentHeader() *types.Header { + dl.lock.RLock() + defer dl.lock.RUnlock() + + for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil { + return header + } + } + return dl.genesis.Header() +} + +// CurrentBlock retrieves the current head block from the canonical chain. +func (dl *downloadTester) CurrentBlock() *types.Block { + dl.lock.RLock() + defer dl.lock.RUnlock() + + for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil { + if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { + return block + } + } + } + return dl.genesis +} + +// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain. +func (dl *downloadTester) CurrentFastBlock() *types.Block { + dl.lock.RLock() + defer dl.lock.RUnlock() + + for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil { + return block + } + } + return dl.genesis +} + +// FastSyncCommitHead manually sets the head block to a given hash. +func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error { + // For now only check that the state trie is correct + if block := dl.GetBlockByHash(hash); block != nil { + _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb), 0) + return err + } + return fmt.Errorf("non existent block: %x", hash[:4]) +} + +// GetTd retrieves the block's total difficulty from the canonical chain. +func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.ownChainTd[hash] +} + +// InsertHeaderChain injects a new batch of headers into the simulated chain. +func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) { + dl.lock.Lock() + defer dl.lock.Unlock() + + // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors + if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok { + return 0, errors.New("unknown parent") + } + for i := 1; i < len(headers); i++ { + if headers[i].ParentHash != headers[i-1].Hash() { + return i, errors.New("unknown parent") + } + } + // Do a full insert if pre-checks passed + for i, header := range headers { + if _, ok := dl.ownHeaders[header.Hash()]; ok { + continue + } + if _, ok := dl.ownHeaders[header.ParentHash]; !ok { + return i, errors.New("unknown parent") + } + dl.ownHashes = append(dl.ownHashes, header.Hash()) + dl.ownHeaders[header.Hash()] = header + dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty) + } + return len(headers), nil +} + +// InsertChain injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) { + dl.lock.Lock() + defer dl.lock.Unlock() + + for i, block := range blocks { + if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok { + return i, errors.New("unknown parent") + } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil { + return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err) + } + if _, ok := dl.ownHeaders[block.Hash()]; !ok { + dl.ownHashes = append(dl.ownHashes, block.Hash()) + dl.ownHeaders[block.Hash()] = block.Header() + } + dl.ownBlocks[block.Hash()] = block + dl.stateDb.Put(block.Root().Bytes(), []byte{0x00}) + dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty()) + } + return len(blocks), nil +} + +// InsertReceiptChain injects a new batch of receipts into the simulated chain. +func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) { + dl.lock.Lock() + defer dl.lock.Unlock() + + for i := 0; i < len(blocks) && i < len(receipts); i++ { + if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok { + return i, errors.New("unknown owner") + } + if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + dl.ownBlocks[blocks[i].Hash()] = blocks[i] + dl.ownReceipts[blocks[i].Hash()] = receipts[i] + } + return len(blocks), nil +} + +// Rollback removes some recently added elements from the chain. +func (dl *downloadTester) Rollback(hashes []common.Hash) { + dl.lock.Lock() + defer dl.lock.Unlock() + + for i := len(hashes) - 1; i >= 0; i-- { + if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] { + dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1] + } + delete(dl.ownChainTd, hashes[i]) + delete(dl.ownHeaders, hashes[i]) + delete(dl.ownReceipts, hashes[i]) + delete(dl.ownBlocks, hashes[i]) + } +} + +// newPeer registers a new block download source into the downloader. +func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error { + dl.lock.Lock() + defer dl.lock.Unlock() + + peer := &downloadTesterPeer{dl: dl, id: id, chain: chain} + dl.peers[id] = peer + return dl.downloader.RegisterPeer(id, version, peer) +} + +// dropPeer simulates a hard peer removal from the connection pool. +func (dl *downloadTester) dropPeer(id string) { + dl.lock.Lock() + defer dl.lock.Unlock() + + delete(dl.peers, id) + dl.downloader.UnregisterPeer(id) +} + +type downloadTesterPeer struct { + dl *downloadTester + id string + lock sync.RWMutex + chain *testChain + missingStates map[common.Hash]bool // State entries that fast sync should not return +} + +// Head constructs a function to retrieve a peer's current head hash +// and total difficulty. +func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { + b := dlp.chain.headBlock() + return b.Hash(), dlp.chain.td(b.Hash()) +} + +// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed +// origin; associated with a particular peer in the download tester. The returned +// function can be used to retrieve batches of headers from the particular peer. +func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { + if reverse { + panic("reverse header requests not supported") + } + + result := dlp.chain.headersByHash(origin, amount, skip) + go dlp.dl.downloader.DeliverHeaders(dlp.id, result) + return nil +} + +// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered +// origin; associated with a particular peer in the download tester. The returned +// function can be used to retrieve batches of headers from the particular peer. +func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { + if reverse { + panic("reverse header requests not supported") + } + + result := dlp.chain.headersByNumber(origin, amount, skip) + go dlp.dl.downloader.DeliverHeaders(dlp.id, result) + return nil +} + +// RequestBodies constructs a getBlockBodies method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of block bodies from the particularly requested peer. +func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error { + txs, uncles := dlp.chain.bodies(hashes) + go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles) + return nil +} + +// RequestReceipts constructs a getReceipts method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of block receipts from the particularly requested peer. +func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error { + receipts := dlp.chain.receipts(hashes) + go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts) + return nil +} + +// RequestNodeData constructs a getNodeData method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of node state data from the particularly requested peer. +func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error { + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() + + results := make([][]byte, 0, len(hashes)) + for _, hash := range hashes { + if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil { + if !dlp.missingStates[hash] { + results = append(results, data) + } + } + } + go dlp.dl.downloader.DeliverNodeData(dlp.id, results) + return nil +} + +// assertOwnChain checks if the local chain contains the correct number of items +// of the various chain components. +func assertOwnChain(t *testing.T, tester *downloadTester, length int) { + assertOwnForkedChain(t, tester, 1, []int{length}) +} + +// assertOwnForkedChain checks if the local forked chain contains the correct +// number of items of the various chain components. +func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { + // Initialize the counters for the first fork + headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks + + if receipts < 0 { + receipts = 1 + } + // Update the counters for each subsequent fork + for _, length := range lengths[1:] { + headers += length - common + blocks += length - common + receipts += length - common - fsMinFullBlocks + } + switch tester.downloader.mode { + case FullSync: + receipts = 1 + case LightSync: + blocks, receipts = 1, 1 + } + if hs := len(tester.ownHeaders); hs != headers { + t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) + } + if bs := len(tester.ownBlocks); bs != blocks { + t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks) + } + if rs := len(tester.ownReceipts); rs != receipts { + t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) + } +} + +// Tests that simple synchronization against a canonical chain works correctly. +// In this test common ancestor lookup should be short circuited and not require +// binary searching. +func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) } +func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) } +func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) } +func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) } +func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) } +func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) } + +func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Create a small enough block chain to download + chain := testChainBase.shorten(blockCacheItems - 15) + tester.newPeer("peer", protocol, chain) + + // Synchronise with the peer and make sure all relevant data was retrieved + if err := tester.sync("peer", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chain.len()) +} + +// Tests that if a large batch of blocks are being downloaded, it is throttled +// until the cached blocks are retrieved. +func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) } +func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) } +func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) } +func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) } +func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } + +func testThrottling(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + tester := newTester() + defer tester.terminate() + + // Create a long block chain to download and the tester + targetBlocks := testChainBase.len() - 1 + tester.newPeer("peer", protocol, testChainBase) + + // Wrap the importer to allow stepping + blocked, proceed := uint32(0), make(chan struct{}) + tester.downloader.chainInsertHook = func(results []*fetchResult) { + atomic.StoreUint32(&blocked, uint32(len(results))) + <-proceed + } + // Start a synchronisation concurrently + errc := make(chan error) + go func() { + errc <- tester.sync("peer", nil, mode) + }() + // Iteratively take some blocks, always checking the retrieval count + for { + // Check the retrieval count synchronously (! reason for this ugly block) + tester.lock.RLock() + retrieved := len(tester.ownBlocks) + tester.lock.RUnlock() + if retrieved >= targetBlocks+1 { + break + } + // Wait a bit for sync to throttle itself + var cached, frozen int + for start := time.Now(); time.Since(start) < 3*time.Second; { + time.Sleep(25 * time.Millisecond) + + tester.lock.Lock() + tester.downloader.queue.lock.Lock() + cached = len(tester.downloader.queue.blockDonePool) + if mode == FastSync { + if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { + cached = receipts + } + } + frozen = int(atomic.LoadUint32(&blocked)) + retrieved = len(tester.ownBlocks) + tester.downloader.queue.lock.Unlock() + tester.lock.Unlock() + + if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { + break + } + } + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + + tester.lock.RLock() + retrieved = len(tester.ownBlocks) + tester.lock.RUnlock() + if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { + t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) + } + // Permit the blocked blocks to import + if atomic.LoadUint32(&blocked) > 0 { + atomic.StoreUint32(&blocked, uint32(0)) + proceed <- struct{}{} + } + } + // Check that we haven't pulled more blocks than available + assertOwnChain(t, tester, targetBlocks+1) + if err := <-errc; err != nil { + t.Fatalf("block synchronization failed: %v", err) + } +} + +// Tests that simple synchronization against a forked chain works correctly. In +// this test common ancestor lookup should *not* be short circuited, and a full +// binary search should be executed. +func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) } +func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) } +func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) } +func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) } +func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) } +func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) } + +func testForkedSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chainA := testChainForkLightA.shorten(testChainBase.len() + 80) + chainB := testChainForkLightB.shorten(testChainBase.len() + 80) + tester.newPeer("fork A", protocol, chainA) + tester.newPeer("fork B", protocol, chainB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("fork A", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chainA.len()) + + // Synchronise with the second peer and make sure that fork is pulled too + if err := tester.sync("fork B", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()}) +} + +// Tests that synchronising against a much shorter but much heavyer fork works +// corrently and is not dropped. +func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) } +func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) } +func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) } +func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) } +func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) } +func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) } + +func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chainA := testChainForkLightA.shorten(testChainBase.len() + 80) + chainB := testChainForkHeavy.shorten(testChainBase.len() + 80) + tester.newPeer("light", protocol, chainA) + tester.newPeer("heavy", protocol, chainB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("light", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chainA.len()) + + // Synchronise with the second peer and make sure that fork is pulled too + if err := tester.sync("heavy", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()}) +} + +// Tests that chain forks are contained within a certain interval of the current +// chain head, ensuring that malicious peers cannot waste resources by feeding +// long dead chains. +func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) } +func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) } +func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) } +func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) } +func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) } +func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) } + +func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chainA := testChainForkLightA + chainB := testChainForkLightB + tester.newPeer("original", protocol, chainA) + tester.newPeer("rewriter", protocol, chainB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("original", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chainA.len()) + + // Synchronise with the second peer and ensure that the fork is rejected to being too old + if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor { + t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor) + } +} + +// Tests that chain forks are contained within a certain interval of the current +// chain head for short but heavy forks too. These are a bit special because they +// take different ancestor lookup paths. +func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) } +func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) } +func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) } +func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) } +func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) } +func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) } + +func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Create a long enough forked chain + chainA := testChainForkLightA + chainB := testChainForkHeavy + tester.newPeer("original", protocol, chainA) + tester.newPeer("heavy-rewriter", protocol, chainB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("original", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chainA.len()) + + // Synchronise with the second peer and ensure that the fork is rejected to being too old + if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor { + t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor) + } +} + +// Tests that an inactive downloader will not accept incoming block headers and +// bodies. +func TestInactiveDownloader62(t *testing.T) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Check that neither block headers nor bodies are accepted + if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} + +// Tests that an inactive downloader will not accept incoming block headers, +// bodies and receipts. +func TestInactiveDownloader63(t *testing.T) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Check that neither block headers nor bodies are accepted + if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) } +func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) } +func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) } +func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) } +func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) } +func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) } + +func testCancel(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chain := testChainBase.shorten(MaxHeaderFetch) + tester.newPeer("peer", protocol, chain) + + // Make sure canceling works with a pristine downloader + tester.downloader.Cancel() + if !tester.downloader.queue.Idle() { + t.Errorf("download queue not idle") + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.Cancel() + if !tester.downloader.queue.Idle() { + t.Errorf("download queue not idle") + } +} + +// Tests that synchronisation from multiple peers works as intended (multi thread sanity test). +func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) } +func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) } +func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) } +func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) } +func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) } +func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) } + +func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Create various peers with various parts of the chain + targetPeers := 8 + chain := testChainBase.shorten(targetPeers * 100) + + for i := 0; i < targetPeers; i++ { + id := fmt.Sprintf("peer #%d", i) + tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1))) + } + if err := tester.sync("peer #0", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chain.len()) +} + +// Tests that synchronisations behave well in multi-version protocol environments +// and not wreak havoc on other nodes in the network. +func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) } +func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) } +func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) } +func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) } +func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) } +func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) } + +func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Create a small enough block chain to download + chain := testChainBase.shorten(blockCacheItems - 15) + + // Create peers of every type + tester.newPeer("peer 62", 62, chain) + tester.newPeer("peer 63", 63, chain) + tester.newPeer("peer 64", 64, chain) + + // Synchronise with the requested peer and make sure all blocks were retrieved + if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chain.len()) + + // Check that no peers have been dropped off + for _, version := range []int{62, 63, 64} { + peer := fmt.Sprintf("peer %d", version) + if _, ok := tester.peers[peer]; !ok { + t.Errorf("%s dropped", peer) + } + } +} + +// Tests that if a block is empty (e.g. header only), no body request should be +// made, and instead the header should be assembled into a whole block in itself. +func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) } +func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) } +func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) } +func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) } +func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) } +func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } + +func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Create a block chain to download + chain := testChainBase + tester.newPeer("peer", protocol, chain) + + // Instrument the downloader to signal body requests + bodiesHave, receiptsHave := int32(0), int32(0) + tester.downloader.bodyFetchHook = func(headers []*types.Header) { + atomic.AddInt32(&bodiesHave, int32(len(headers))) + } + tester.downloader.receiptFetchHook = func(headers []*types.Header) { + atomic.AddInt32(&receiptsHave, int32(len(headers))) + } + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("peer", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chain.len()) + + // Validate the number of block bodies that should have been requested + bodiesNeeded, receiptsNeeded := 0, 0 + for _, block := range chain.blockm { + if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { + bodiesNeeded++ + } + } + for _, receipt := range chain.receiptm { + if mode == FastSync && len(receipt) > 0 { + receiptsNeeded++ + } + } + if int(bodiesHave) != bodiesNeeded { + t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded) + } + if int(receiptsHave) != receiptsNeeded { + t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded) + } +} + +// Tests that headers are enqueued continuously, preventing malicious nodes from +// stalling the downloader by feeding gapped header chains. +func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) } +func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) } +func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) } +func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) } +func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) } +func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) } + +func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chain := testChainBase.shorten(blockCacheItems - 15) + brokenChain := chain.shorten(chain.len()) + delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2]) + tester.newPeer("attack", protocol, brokenChain) + + if err := tester.sync("attack", nil, mode); err == nil { + t.Fatalf("succeeded attacker synchronisation") + } + // Synchronise with the valid peer and make sure sync succeeds + tester.newPeer("valid", protocol, chain) + if err := tester.sync("valid", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chain.len()) +} + +// Tests that if requested headers are shifted (i.e. first is missing), the queue +// detects the invalid numbering. +func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) } +func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) } +func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) } +func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) } +func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) } +func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) } + +func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chain := testChainBase.shorten(blockCacheItems - 15) + + // Attempt a full sync with an attacker feeding shifted headers + brokenChain := chain.shorten(chain.len()) + delete(brokenChain.headerm, brokenChain.chain[1]) + delete(brokenChain.blockm, brokenChain.chain[1]) + delete(brokenChain.receiptm, brokenChain.chain[1]) + tester.newPeer("attack", protocol, brokenChain) + if err := tester.sync("attack", nil, mode); err == nil { + t.Fatalf("succeeded attacker synchronisation") + } + + // Synchronise with the valid peer and make sure sync succeeds + tester.newPeer("valid", protocol, chain) + if err := tester.sync("valid", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + assertOwnChain(t, tester, chain.len()) +} + +// Tests that upon detecting an invalid header, the recent ones are rolled back +// for various failure scenarios. Afterwards a full sync is attempted to make +// sure no state was corrupted. +func TestInvalidHeaderRollback63Fast(t *testing.T) { testInvalidHeaderRollback(t, 63, FastSync) } +func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) } +func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) } + +func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + // Create a small enough block chain to download + targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks + chain := testChainBase.shorten(targetBlocks) + + // Attempt to sync with an attacker that feeds junk during the fast sync phase. + // This should result in the last fsHeaderSafetyNet headers being rolled back. + missing := fsHeaderSafetyNet + MaxHeaderFetch + 1 + fastAttackChain := chain.shorten(chain.len()) + delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) + tester.newPeer("fast-attack", protocol, fastAttackChain) + + if err := tester.sync("fast-attack", nil, mode); err == nil { + t.Fatalf("succeeded fast attacker synchronisation") + } + if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch { + t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch) + } + + // Attempt to sync with an attacker that feeds junk during the block import phase. + // This should result in both the last fsHeaderSafetyNet number of headers being + // rolled back, and also the pivot point being reverted to a non-block status. + missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 + blockAttackChain := chain.shorten(chain.len()) + delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in + delete(blockAttackChain.headerm, blockAttackChain.chain[missing]) + tester.newPeer("block-attack", protocol, blockAttackChain) + + if err := tester.sync("block-attack", nil, mode); err == nil { + t.Fatalf("succeeded block attacker synchronisation") + } + if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { + t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch) + } + if mode == FastSync { + if head := tester.CurrentBlock().NumberU64(); head != 0 { + t.Errorf("fast sync pivot block #%d not rolled back", head) + } + } + + // Attempt to sync with an attacker that withholds promised blocks after the + // fast sync pivot point. This could be a trial to leave the node with a bad + // but already imported pivot block. + withholdAttackChain := chain.shorten(chain.len()) + tester.newPeer("withhold-attack", protocol, withholdAttackChain) + tester.downloader.syncInitHook = func(uint64, uint64) { + for i := missing; i < withholdAttackChain.len(); i++ { + delete(withholdAttackChain.headerm, withholdAttackChain.chain[i]) + } + tester.downloader.syncInitHook = nil + } + if err := tester.sync("withhold-attack", nil, mode); err == nil { + t.Fatalf("succeeded withholding attacker synchronisation") + } + if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { + t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch) + } + if mode == FastSync { + if head := tester.CurrentBlock().NumberU64(); head != 0 { + t.Errorf("fast sync pivot block #%d not rolled back", head) + } + } + + // synchronise with the valid peer and make sure sync succeeds. Since the last rollback + // should also disable fast syncing for this process, verify that we did a fresh full + // sync. Note, we can't assert anything about the receipts since we won't purge the + // database of them, hence we can't use assertOwnChain. + tester.newPeer("valid", protocol, chain) + if err := tester.sync("valid", nil, mode); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if hs := len(tester.ownHeaders); hs != chain.len() { + t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len()) + } + if mode != LightSync { + if bs := len(tester.ownBlocks); bs != chain.len() { + t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len()) + } + } +} + +// Tests that a peer advertising an high TD doesn't get to stall the downloader +// afterwards by not sending any useful hashes. +func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) } +func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) } +func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) } +func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) } +func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) } +func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) } + +func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + + chain := testChainBase.shorten(1) + tester.newPeer("attack", protocol, chain) + if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) + } +} + +// Tests that misbehaving peers are disconnected, whilst behaving ones are not. +func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) } +func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) } +func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) } + +func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { + t.Parallel() + + // Define the disconnection requirement for individual hash fetch errors + tests := []struct { + result error + drop bool + }{ + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop + } + // Run the tests and check disconnection status + tester := newTester() + defer tester.terminate() + chain := testChainBase.shorten(1) + + for i, tt := range tests { + // Register a new peer and ensure it's presence + id := fmt.Sprintf("test %d", i) + if err := tester.newPeer(id, protocol, chain); err != nil { + t.Fatalf("test %d: failed to register new peer: %v", i, err) + } + if _, ok := tester.peers[id]; !ok { + t.Fatalf("test %d: registered peer not found", i) + } + // Simulate a synchronisation and check the required result + tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } + + tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync) + if _, ok := tester.peers[id]; !ok != tt.drop { + t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) + } + } +} + +// Tests that synchronisation progress (origin block number, current block number +// and highest block number) is tracked and updated correctly. +func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) } +func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) } +func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) } +func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) } +func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) } +func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) } + +func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + chain := testChainBase.shorten(blockCacheItems - 15) + + // Set a sync init hook to catch progress changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + + // Synchronise half the blocks and check initial progress + tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2)) + pending := new(sync.WaitGroup) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("peer-half", nil, mode); err != nil { + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) + } + }() + <-starting + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(chain.len()/2 - 1), + }) + progress <- struct{}{} + pending.Wait() + + // Synchronise all the blocks and check continuation progress + tester.newPeer("peer-full", protocol, chain) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("peer-full", nil, mode); err != nil { + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) + } + }() + <-starting + checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{ + StartingBlock: uint64(chain.len()/2 - 1), + CurrentBlock: uint64(chain.len()/2 - 1), + HighestBlock: uint64(chain.len() - 1), + }) + + // Check final progress after successful sync + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + StartingBlock: uint64(chain.len()/2 - 1), + CurrentBlock: uint64(chain.len() - 1), + HighestBlock: uint64(chain.len() - 1), + }) +} + +func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) { + t.Helper() + p := d.Progress() + p.KnownStates, p.PulledStates = 0, 0 + want.KnownStates, want.PulledStates = 0, 0 + if p != want { + t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want) + } +} + +// Tests that synchronisation progress (origin block number and highest block +// number) is tracked and updated correctly in case of a fork (or manual head +// revertal). +func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) } +func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) } +func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) } +func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) } +func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) } +func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) } + +func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch) + chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch) + + // Set a sync init hook to catch progress changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + + // Synchronise with one of the forks and check progress + tester.newPeer("fork A", protocol, chainA) + pending := new(sync.WaitGroup) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("fork A", nil, mode); err != nil { + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) + } + }() + <-starting + + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(chainA.len() - 1), + }) + progress <- struct{}{} + pending.Wait() + + // Simulate a successful sync above the fork + tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight + + // Synchronise with the second fork and check progress resets + tester.newPeer("fork B", protocol, chainB) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("fork B", nil, mode); err != nil { + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) + } + }() + <-starting + checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ + StartingBlock: uint64(testChainBase.len()) - 1, + CurrentBlock: uint64(chainA.len() - 1), + HighestBlock: uint64(chainB.len() - 1), + }) + + // Check final progress after successful sync + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + StartingBlock: uint64(testChainBase.len()) - 1, + CurrentBlock: uint64(chainB.len() - 1), + HighestBlock: uint64(chainB.len() - 1), + }) +} + +// Tests that if synchronisation is aborted due to some failure, then the progress +// origin is not updated in the next sync cycle, as it should be considered the +// continuation of the previous sync and not a new instance. +func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) } +func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) } +func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) } +func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) } +func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) } +func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) } + +func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + chain := testChainBase.shorten(blockCacheItems - 15) + + // Set a sync init hook to catch progress changes + starting := make(chan struct{}) + progress := make(chan struct{}) + + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + + // Attempt a full sync with a faulty peer + brokenChain := chain.shorten(chain.len()) + missing := brokenChain.len() / 2 + delete(brokenChain.headerm, brokenChain.chain[missing]) + delete(brokenChain.blockm, brokenChain.chain[missing]) + delete(brokenChain.receiptm, brokenChain.chain[missing]) + tester.newPeer("faulty", protocol, brokenChain) + + pending := new(sync.WaitGroup) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("faulty", nil, mode); err == nil { + panic("succeeded faulty synchronisation") + } + }() + <-starting + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(brokenChain.len() - 1), + }) + progress <- struct{}{} + pending.Wait() + afterFailedSync := tester.downloader.Progress() + + // Synchronise with a good peer and check that the progress origin remind the same + // after a failure + tester.newPeer("valid", protocol, chain) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("valid", nil, mode); err != nil { + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) + } + }() + <-starting + checkProgress(t, tester.downloader, "completing", afterFailedSync) + + // Check final progress after successful sync + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + CurrentBlock: uint64(chain.len() - 1), + HighestBlock: uint64(chain.len() - 1), + }) +} + +// Tests that if an attacker fakes a chain height, after the attack is detected, +// the progress height is successfully reduced at the next sync invocation. +func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) } +func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) } +func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) } +func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) } +func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) } +func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) } + +func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + tester := newTester() + defer tester.terminate() + chain := testChainBase.shorten(blockCacheItems - 15) + + // Set a sync init hook to catch progress changes + starting := make(chan struct{}) + progress := make(chan struct{}) + tester.downloader.syncInitHook = func(origin, latest uint64) { + starting <- struct{}{} + <-progress + } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + + // Create and sync with an attacker that promises a higher chain than available. + brokenChain := chain.shorten(chain.len()) + numMissing := 5 + for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- { + delete(brokenChain.headerm, brokenChain.chain[i]) + } + tester.newPeer("attack", protocol, brokenChain) + + pending := new(sync.WaitGroup) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("attack", nil, mode); err == nil { + panic("succeeded attacker synchronisation") + } + }() + <-starting + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(brokenChain.len() - 1), + }) + progress <- struct{}{} + pending.Wait() + afterFailedSync := tester.downloader.Progress() + + // Synchronise with a good peer and check that the progress height has been reduced to + // the true value. + validChain := chain.shorten(chain.len() - numMissing) + tester.newPeer("valid", protocol, validChain) + pending.Add(1) + + go func() { + defer pending.Done() + if err := tester.sync("valid", nil, mode); err != nil { + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) + } + }() + <-starting + checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{ + CurrentBlock: afterFailedSync.CurrentBlock, + HighestBlock: uint64(validChain.len() - 1), + }) + + // Check final progress after successful sync. + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + CurrentBlock: uint64(validChain.len() - 1), + HighestBlock: uint64(validChain.len() - 1), + }) +} + +// This test reproduces an issue where unexpected deliveries would +// block indefinitely if they arrived at the right time. +func TestDeliverHeadersHang(t *testing.T) { + t.Parallel() + + testCases := []struct { + protocol int + syncMode SyncMode + }{ + {62, FullSync}, + {63, FullSync}, + {63, FastSync}, + {64, FullSync}, + {64, FastSync}, + {64, LightSync}, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) { + t.Parallel() + testDeliverHeadersHang(t, tc.protocol, tc.syncMode) + }) + } +} + +func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { + master := newTester() + defer master.terminate() + chain := testChainBase.shorten(15) + + for i := 0; i < 200; i++ { + tester := newTester() + tester.peerDb = master.peerDb + tester.newPeer("peer", protocol, chain) + + // Whenever the downloader requests headers, flood it with + // a lot of unrequested header deliveries. + tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ + peer: tester.downloader.peers.peers["peer"].peer, + tester: tester, + } + if err := tester.sync("peer", nil, mode); err != nil { + t.Errorf("test %d: sync failed: %v", i, err) + } + tester.terminate() + } +} + +type floodingTestPeer struct { + peer Peer + tester *downloadTester +} + +func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } +func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error { + return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse) +} +func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error { + return ftp.peer.RequestBodies(hashes) +} +func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error { + return ftp.peer.RequestReceipts(hashes) +} +func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error { + return ftp.peer.RequestNodeData(hashes) +} + +func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone)-1; i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}}) + deliveriesDone <- struct{}{} + }() + } + + // None of the extra deliveries should block. + timeout := time.After(60 * time.Second) + launched := false + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + if !launched { + // Start delivering the requested headers + // after one of the flooding responses has arrived. + go func() { + ftp.peer.RequestHeadersByNumber(from, count, skip, reverse) + deliveriesDone <- struct{}{} + }() + launched = true + } + case <-timeout: + panic("blocked") + } + } + return nil +} diff --git a/dex/downloader/events.go b/dex/downloader/events.go new file mode 100644 index 000000000..64905b8f2 --- /dev/null +++ b/dex/downloader/events.go @@ -0,0 +1,21 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +type DoneEvent struct{} +type StartEvent struct{} +type FailedEvent struct{ Err error } diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go new file mode 100644 index 000000000..3e29357ba --- /dev/null +++ b/dex/downloader/fakepeer.go @@ -0,0 +1,161 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "math/big" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/ethdb" +) + +// FakePeer is a mock downloader peer that operates on a local database instance +// instead of being an actual live node. It's useful for testing and to implement +// sync commands from an existing local database. +type FakePeer struct { + id string + db ethdb.Database + hc *core.HeaderChain + dl *Downloader +} + +// NewFakePeer creates a new mock downloader peer with the given data sources. +func NewFakePeer(id string, db ethdb.Database, hc *core.HeaderChain, dl *Downloader) *FakePeer { + return &FakePeer{id: id, db: db, hc: hc, dl: dl} +} + +// Head implements downloader.Peer, returning the current head hash and number +// of the best known header. +func (p *FakePeer) Head() (common.Hash, *big.Int) { + header := p.hc.CurrentHeader() + return header.Hash(), header.Number +} + +// RequestHeadersByHash implements downloader.Peer, returning a batch of headers +// defined by the origin hash and the associated query parameters. +func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, reverse bool) error { + var ( + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < amount { + origin := p.hc.GetHeaderByHash(hash) + if origin == nil { + break + } + number := origin.Number.Uint64() + headers = append(headers, origin) + if reverse { + for i := 0; i <= skip; i++ { + if header := p.hc.GetHeader(hash, number); header != nil { + hash = header.ParentHash + number-- + } else { + unknown = true + break + } + } + } else { + var ( + current = origin.Number.Uint64() + next = current + uint64(skip) + 1 + ) + if header := p.hc.GetHeaderByNumber(next); header != nil { + if p.hc.GetBlockHashesFromHash(header.Hash(), uint64(skip+1))[skip] == hash { + hash = header.Hash() + } else { + unknown = true + } + } else { + unknown = true + } + } + } + p.dl.DeliverHeaders(p.id, headers) + return nil +} + +// RequestHeadersByNumber implements downloader.Peer, returning a batch of headers +// defined by the origin number and the associated query parameters. +func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, reverse bool) error { + var ( + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < amount { + origin := p.hc.GetHeaderByNumber(number) + if origin == nil { + break + } + if reverse { + if number >= uint64(skip+1) { + number -= uint64(skip + 1) + } else { + unknown = true + } + } else { + number += uint64(skip + 1) + } + headers = append(headers, origin) + } + p.dl.DeliverHeaders(p.id, headers) + return nil +} + +// RequestBodies implements downloader.Peer, returning a batch of block bodies +// corresponding to the specified block hashes. +func (p *FakePeer) RequestBodies(hashes []common.Hash) error { + var ( + txs [][]*types.Transaction + uncles [][]*types.Header + ) + for _, hash := range hashes { + block := rawdb.ReadBlock(p.db, hash, *p.hc.GetBlockNumber(hash)) + + txs = append(txs, block.Transactions()) + uncles = append(uncles, block.Uncles()) + } + p.dl.DeliverBodies(p.id, txs, uncles) + return nil +} + +// RequestReceipts implements downloader.Peer, returning a batch of transaction +// receipts corresponding to the specified block hashes. +func (p *FakePeer) RequestReceipts(hashes []common.Hash) error { + var receipts [][]*types.Receipt + for _, hash := range hashes { + receipts = append(receipts, rawdb.ReadReceipts(p.db, hash, *p.hc.GetBlockNumber(hash))) + } + p.dl.DeliverReceipts(p.id, receipts) + return nil +} + +// RequestNodeData implements downloader.Peer, returning a batch of state trie +// nodes corresponding to the specified trie hashes. +func (p *FakePeer) RequestNodeData(hashes []common.Hash) error { + var data [][]byte + for _, hash := range hashes { + if entry, err := p.db.Get(hash.Bytes()); err == nil { + data = append(data, entry) + } + } + p.dl.DeliverNodeData(p.id, data) + return nil +} diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go new file mode 100644 index 000000000..0d6041712 --- /dev/null +++ b/dex/downloader/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the metrics collected by the downloader. + +package downloader + +import ( + "github.com/dexon-foundation/dexon/metrics" +) + +var ( + headerInMeter = metrics.NewRegisteredMeter("dex/downloader/headers/in", nil) + headerReqTimer = metrics.NewRegisteredTimer("dex/downloader/headers/req", nil) + headerDropMeter = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil) + headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil) + + bodyInMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil) + bodyReqTimer = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil) + bodyDropMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil) + bodyTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/timeout", nil) + + receiptInMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/in", nil) + receiptReqTimer = metrics.NewRegisteredTimer("dex/downloader/receipts/req", nil) + receiptDropMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/drop", nil) + receiptTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/receipts/timeout", nil) + + stateInMeter = metrics.NewRegisteredMeter("dex/downloader/states/in", nil) + stateDropMeter = metrics.NewRegisteredMeter("dex/downloader/states/drop", nil) +) diff --git a/dex/downloader/modes.go b/dex/downloader/modes.go new file mode 100644 index 000000000..8ecdf91f1 --- /dev/null +++ b/dex/downloader/modes.go @@ -0,0 +1,73 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import "fmt" + +// SyncMode represents the synchronisation mode of the downloader. +type SyncMode int + +const ( + FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks + FastSync // Quickly download the headers, full sync only at the chain head + LightSync // Download only the headers and terminate afterwards +) + +func (mode SyncMode) IsValid() bool { + return mode >= FullSync && mode <= LightSync +} + +// String implements the stringer interface. +func (mode SyncMode) String() string { + switch mode { + case FullSync: + return "full" + case FastSync: + return "fast" + case LightSync: + return "light" + default: + return "unknown" + } +} + +func (mode SyncMode) MarshalText() ([]byte, error) { + switch mode { + case FullSync: + return []byte("full"), nil + case FastSync: + return []byte("fast"), nil + case LightSync: + return []byte("light"), nil + default: + return nil, fmt.Errorf("unknown sync mode %d", mode) + } +} + +func (mode *SyncMode) UnmarshalText(text []byte) error { + switch string(text) { + case "full": + *mode = FullSync + case "fast": + *mode = FastSync + case "light": + *mode = LightSync + default: + return fmt.Errorf(`unknown sync mode %q, want "full", "fast" or "light"`, text) + } + return nil +} diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go new file mode 100644 index 000000000..1fd82fbe3 --- /dev/null +++ b/dex/downloader/peer.go @@ -0,0 +1,573 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the active peer-set of the downloader, maintaining both failures +// as well as reputation metrics to prioritize the block retrievals. + +package downloader + +import ( + "errors" + "fmt" + "math" + "math/big" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/log" +) + +const ( + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items + measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. +) + +var ( + errAlreadyFetching = errors.New("already fetching blocks from peer") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) + +// peerConnection represents an active peer from which hashes and blocks are retrieved. +type peerConnection struct { + id string // Unique identifier of the peer + + headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) + blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) + receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) + + headerThroughput float64 // Number of headers measured to be retrievable per second + blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second + receiptThroughput float64 // Number of receipts measured to be retrievable per second + stateThroughput float64 // Number of node data pieces measured to be retrievable per second + + rtt time.Duration // Request round trip time to track responsiveness (QoS) + + headerStarted time.Time // Time instance when the last header fetch was started + blockStarted time.Time // Time instance when the last block (body) fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started + stateStarted time.Time // Time instance when the last node data fetch was started + + lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) + + peer Peer + + version int // Eth protocol version number to switch strategies + log log.Logger // Contextual logger to add extra infos to peer logs + lock sync.RWMutex +} + +// LightPeer encapsulates the methods required to synchronise with a remote light peer. +type LightPeer interface { + Head() (common.Hash, *big.Int) + RequestHeadersByHash(common.Hash, int, int, bool) error + RequestHeadersByNumber(uint64, int, int, bool) error +} + +// Peer encapsulates the methods required to synchronise with a remote full peer. +type Peer interface { + LightPeer + RequestBodies([]common.Hash) error + RequestReceipts([]common.Hash) error + RequestNodeData([]common.Hash) error +} + +// lightPeerWrapper wraps a LightPeer struct, stubbing out the Peer-only methods. +type lightPeerWrapper struct { + peer LightPeer +} + +func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } +func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error { + return w.peer.RequestHeadersByHash(h, amount, skip, reverse) +} +func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error { + return w.peer.RequestHeadersByNumber(i, amount, skip, reverse) +} +func (w *lightPeerWrapper) RequestBodies([]common.Hash) error { + panic("RequestBodies not supported in light client mode sync") +} +func (w *lightPeerWrapper) RequestReceipts([]common.Hash) error { + panic("RequestReceipts not supported in light client mode sync") +} +func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error { + panic("RequestNodeData not supported in light client mode sync") +} + +// newPeerConnection creates a new downloader peer. +func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection { + return &peerConnection{ + id: id, + lacking: make(map[common.Hash]struct{}), + + peer: peer, + + version: version, + log: logger, + } +} + +// Reset clears the internal state of a peer entity. +func (p *peerConnection) Reset() { + p.lock.Lock() + defer p.lock.Unlock() + + atomic.StoreInt32(&p.headerIdle, 0) + atomic.StoreInt32(&p.blockIdle, 0) + atomic.StoreInt32(&p.receiptIdle, 0) + atomic.StoreInt32(&p.stateIdle, 0) + + p.headerThroughput = 0 + p.blockThroughput = 0 + p.receiptThroughput = 0 + p.stateThroughput = 0 + + p.lacking = make(map[common.Hash]struct{}) +} + +// FetchHeaders sends a header retrieval request to the remote peer. +func (p *peerConnection) FetchHeaders(from uint64, count int) error { + // Sanity check the protocol version + if p.version < 62 { + panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) { + return errAlreadyFetching + } + p.headerStarted = time.Now() + + // Issue the header retrieval request (absolut upwards without gaps) + go p.peer.RequestHeadersByNumber(from, count, 0, false) + + return nil +} + +// FetchBodies sends a block body retrieval request to the remote peer. +func (p *peerConnection) FetchBodies(request *fetchRequest) error { + // Sanity check the protocol version + if p.version < 62 { + panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { + return errAlreadyFetching + } + p.blockStarted = time.Now() + + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) + } + go p.peer.RequestBodies(hashes) + + return nil +} + +// FetchReceipts sends a receipt retrieval request to the remote peer. +func (p *peerConnection) FetchReceipts(request *fetchRequest) error { + // Sanity check the protocol version + if p.version < 63 { + panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) { + return errAlreadyFetching + } + p.receiptStarted = time.Now() + + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) + } + go p.peer.RequestReceipts(hashes) + + return nil +} + +// FetchNodeData sends a node state data retrieval request to the remote peer. +func (p *peerConnection) FetchNodeData(hashes []common.Hash) error { + // Sanity check the protocol version + if p.version < 63 { + panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) { + return errAlreadyFetching + } + p.stateStarted = time.Now() + + go p.peer.RequestNodeData(hashes) + + return nil +} + +// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval +// requests. Its estimated header retrieval throughput is updated with that measured +// just now. +func (p *peerConnection) SetHeadersIdle(delivered int) { + p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) +} + +// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval +// requests. Its estimated body retrieval throughput is updated with that measured +// just now. +func (p *peerConnection) SetBodiesIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) +} + +// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt +// retrieval requests. Its estimated receipt retrieval throughput is updated +// with that measured just now. +func (p *peerConnection) SetReceiptsIdle(delivered int) { + p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) +} + +// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie +// data retrieval requests. Its estimated state retrieval throughput is updated +// with that measured just now. +func (p *peerConnection) SetNodeDataIdle(delivered int) { + p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) +} + +// setIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its estimated retrieval throughput is updated with that measured just now. +func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { + // Irrelevant of the scaling, make sure the peer ends up idle + defer atomic.StoreInt32(idle, 0) + + p.lock.Lock() + defer p.lock.Unlock() + + // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum + if delivered == 0 { + *throughput = 0 + return + } + // Otherwise update the throughput with a new measurement + elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor + measured := float64(delivered) / (float64(elapsed) / float64(time.Second)) + + *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured + p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) + + p.log.Trace("Peer throughput measurements updated", + "hps", p.headerThroughput, "bps", p.blockThroughput, + "rps", p.receiptThroughput, "sps", p.stateThroughput, + "miss", len(p.lacking), "rtt", p.rtt) +} + +// HeaderCapacity retrieves the peers header download allowance based on its +// previously discovered throughput. +func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch))) +} + +// BlockCapacity retrieves the peers block download allowance based on its +// previously discovered throughput. +func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int { + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch))) +} + +// ReceiptCapacity retrieves the peers receipt download allowance based on its +// previously discovered throughput. +func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch))) +} + +// NodeDataCapacity retrieves the peers state download allowance based on its +// previously discovered throughput. +func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int { + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch))) +} + +// MarkLacking appends a new entity to the set of items (blocks, receipts, states) +// that a peer is known not to have (i.e. have been requested before). If the +// set reaches its maximum allowed capacity, items are randomly dropped off. +func (p *peerConnection) MarkLacking(hash common.Hash) { + p.lock.Lock() + defer p.lock.Unlock() + + for len(p.lacking) >= maxLackingHashes { + for drop := range p.lacking { + delete(p.lacking, drop) + break + } + } + p.lacking[hash] = struct{}{} +} + +// Lacks retrieves whether the hash of a blockchain item is on the peers lacking +// list (i.e. whether we know that the peer does not have it). +func (p *peerConnection) Lacks(hash common.Hash) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + _, ok := p.lacking[hash] + return ok +} + +// peerSet represents the collection of active peer participating in the chain +// download procedure. +type peerSet struct { + peers map[string]*peerConnection + newPeerFeed event.Feed + peerDropFeed event.Feed + lock sync.RWMutex +} + +// newPeerSet creates a new peer set top track the active download sources. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peerConnection), + } +} + +// SubscribeNewPeers subscribes to peer arrival events. +func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription { + return ps.newPeerFeed.Subscribe(ch) +} + +// SubscribePeerDrops subscribes to peer departure events. +func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription { + return ps.peerDropFeed.Subscribe(ch) +} + +// Reset iterates over the current peer set, and resets each of the known peers +// to prepare for a next batch of block retrieval. +func (ps *peerSet) Reset() { + ps.lock.RLock() + defer ps.lock.RUnlock() + + for _, peer := range ps.peers { + peer.Reset() + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +// +// The method also sets the starting throughput values of the new peer to the +// average of all existing peers, to give it a realistic chance of being used +// for data retrievals. +func (ps *peerSet) Register(p *peerConnection) error { + // Retrieve the current median RTT as a sane default + p.rtt = ps.medianRTT() + + // Register the new peer with some meaningful defaults + ps.lock.Lock() + if _, ok := ps.peers[p.id]; ok { + ps.lock.Unlock() + return errAlreadyRegistered + } + if len(ps.peers) > 0 { + p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0 + + for _, peer := range ps.peers { + peer.lock.RLock() + p.headerThroughput += peer.headerThroughput + p.blockThroughput += peer.blockThroughput + p.receiptThroughput += peer.receiptThroughput + p.stateThroughput += peer.stateThroughput + peer.lock.RUnlock() + } + p.headerThroughput /= float64(len(ps.peers)) + p.blockThroughput /= float64(len(ps.peers)) + p.receiptThroughput /= float64(len(ps.peers)) + p.stateThroughput /= float64(len(ps.peers)) + } + ps.peers[p.id] = p + ps.lock.Unlock() + + ps.newPeerFeed.Send(p) + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + p, ok := ps.peers[id] + if !ok { + defer ps.lock.Unlock() + return errNotRegistered + } + delete(ps.peers, id) + ps.lock.Unlock() + + ps.peerDropFeed.Send(p) + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peerConnection { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// AllPeers retrieves a flat list of all the peers within the set. +func (ps *peerSet) AllPeers() []*peerConnection { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peerConnection, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + +// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers +// within the active peer set, ordered by their reputation. +func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { + return atomic.LoadInt32(&p.headerIdle) == 0 + } + throughput := func(p *peerConnection) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.headerThroughput + } + return ps.idlePeers(62, 64, idle, throughput) +} + +// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within +// the active peer set, ordered by their reputation. +func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 + } + throughput := func(p *peerConnection) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(62, 64, idle, throughput) +} + +// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers +// within the active peer set, ordered by their reputation. +func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { + return atomic.LoadInt32(&p.receiptIdle) == 0 + } + throughput := func(p *peerConnection) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.receiptThroughput + } + return ps.idlePeers(63, 64, idle, throughput) +} + +// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle +// peers within the active peer set, ordered by their reputation. +func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { + return atomic.LoadInt32(&p.stateIdle) == 0 + } + throughput := func(p *peerConnection) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.stateThroughput + } + return ps.idlePeers(63, 64, idle, throughput) +} + +// idlePeers retrieves a flat list of all currently idle peers satisfying the +// protocol version constraints, using the provided function to check idleness. +// The resulting set of peers are sorted by their measure throughput. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) { + ps.lock.RLock() + defer ps.lock.RUnlock() + + idle, total := make([]*peerConnection, 0, len(ps.peers)), 0 + for _, p := range ps.peers { + if p.version >= minProtocol && p.version <= maxProtocol { + if idleCheck(p) { + idle = append(idle, p) + } + total++ + } + } + for i := 0; i < len(idle); i++ { + for j := i + 1; j < len(idle); j++ { + if throughput(idle[i]) < throughput(idle[j]) { + idle[i], idle[j] = idle[j], idle[i] + } + } + } + return idle, total +} + +// medianRTT returns the median RTT of the peerset, considering only the tuning +// peers if there are more peers available. +func (ps *peerSet) medianRTT() time.Duration { + // Gather all the currently measured round trip times + ps.lock.RLock() + defer ps.lock.RUnlock() + + rtts := make([]float64, 0, len(ps.peers)) + for _, p := range ps.peers { + p.lock.RLock() + rtts = append(rtts, float64(p.rtt)) + p.lock.RUnlock() + } + sort.Float64s(rtts) + + median := rttMaxEstimate + if qosTuningPeers <= len(rtts) { + median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers + } else if len(rtts) > 0 { + median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos) + } + // Restrict the RTT into some QoS defaults, irrelevant of true RTT + if median < rttMinEstimate { + median = rttMinEstimate + } + if median > rttMaxEstimate { + median = rttMaxEstimate + } + return median +} diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go new file mode 100644 index 000000000..12c75e793 --- /dev/null +++ b/dex/downloader/queue.go @@ -0,0 +1,885 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the block download scheduler to collect download tasks and schedule +// them in an ordered, and throttled way. + +package downloader + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/common/prque" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/metrics" +) + +var ( + blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download + blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching + blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones +) + +var ( + errNoFetchesPending = errors.New("no fetches pending") + errStaleDelivery = errors.New("stale delivery") +) + +// fetchRequest is a currently running data retrieval operation. +type fetchRequest struct { + Peer *peerConnection // Peer to which the request was sent + From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) + Headers []*types.Header // [eth/62] Requested headers, sorted by request order + Time time.Time // Time when the request was made +} + +// fetchResult is a struct collecting partial results from data fetchers until +// all outstanding pieces complete and the result as a whole can be processed. +type fetchResult struct { + Pending int // Number of data fetches still pending + Hash common.Hash // Hash of the header to prevent recalculating + + Header *types.Header + Uncles []*types.Header + Transactions types.Transactions + Receipts types.Receipts +} + +// queue represents hashes that are either need fetching or are being fetched +type queue struct { + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching + + // Headers are "special", they download in batches, supported by a skeleton chain + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers + headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable + headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations + headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results + headerOffset uint64 // [eth/62] Number of the first header in the result cache + headerContCh chan bool // [eth/62] Channel to notify when header download finishes + + // All data retrievals below are based on an already assembles header chain + blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers + blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for + blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations + blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches + + receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers + receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for + receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations + receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches + + resultCache []*fetchResult // Downloaded but not yet delivered fetch results + resultOffset uint64 // Offset of the first cached fetch result in the block chain + resultSize common.StorageSize // Approximate size of a block (exponential moving average) + + lock *sync.Mutex + active *sync.Cond + closed bool +} + +// newQueue creates a new download queue for scheduling block retrieval. +func newQueue() *queue { + lock := new(sync.Mutex) + return &queue{ + headerPendPool: make(map[string]*fetchRequest), + headerContCh: make(chan bool), + blockTaskPool: make(map[common.Hash]*types.Header), + blockTaskQueue: prque.New(nil), + blockPendPool: make(map[string]*fetchRequest), + blockDonePool: make(map[common.Hash]struct{}), + receiptTaskPool: make(map[common.Hash]*types.Header), + receiptTaskQueue: prque.New(nil), + receiptPendPool: make(map[string]*fetchRequest), + receiptDonePool: make(map[common.Hash]struct{}), + resultCache: make([]*fetchResult, blockCacheItems), + active: sync.NewCond(lock), + lock: lock, + } +} + +// Reset clears out the queue contents. +func (q *queue) Reset() { + q.lock.Lock() + defer q.lock.Unlock() + + q.closed = false + q.mode = FullSync + + q.headerHead = common.Hash{} + q.headerPendPool = make(map[string]*fetchRequest) + + q.blockTaskPool = make(map[common.Hash]*types.Header) + q.blockTaskQueue.Reset() + q.blockPendPool = make(map[string]*fetchRequest) + q.blockDonePool = make(map[common.Hash]struct{}) + + q.receiptTaskPool = make(map[common.Hash]*types.Header) + q.receiptTaskQueue.Reset() + q.receiptPendPool = make(map[string]*fetchRequest) + q.receiptDonePool = make(map[common.Hash]struct{}) + + q.resultCache = make([]*fetchResult, blockCacheItems) + q.resultOffset = 0 +} + +// Close marks the end of the sync, unblocking WaitResults. +// It may be called even if the queue is already closed. +func (q *queue) Close() { + q.lock.Lock() + q.closed = true + q.lock.Unlock() + q.active.Broadcast() +} + +// PendingHeaders retrieves the number of header requests pending for retrieval. +func (q *queue) PendingHeaders() int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.headerTaskQueue.Size() +} + +// PendingBlocks retrieves the number of block (body) requests pending for retrieval. +func (q *queue) PendingBlocks() int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.blockTaskQueue.Size() +} + +// PendingReceipts retrieves the number of block receipts pending for retrieval. +func (q *queue) PendingReceipts() int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.receiptTaskQueue.Size() +} + +// InFlightHeaders retrieves whether there are header fetch requests currently +// in flight. +func (q *queue) InFlightHeaders() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.headerPendPool) > 0 +} + +// InFlightBlocks retrieves whether there are block fetch requests currently in +// flight. +func (q *queue) InFlightBlocks() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.blockPendPool) > 0 +} + +// InFlightReceipts retrieves whether there are receipt fetch requests currently +// in flight. +func (q *queue) InFlightReceipts() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.receiptPendPool) > 0 +} + +// Idle returns if the queue is fully idle or has some data still inside. +func (q *queue) Idle() bool { + q.lock.Lock() + defer q.lock.Unlock() + + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) + cached := len(q.blockDonePool) + len(q.receiptDonePool) + + return (queued + pending + cached) == 0 +} + +// ShouldThrottleBlocks checks if the download should be throttled (active block (body) +// fetches exceed block cache). +func (q *queue) ShouldThrottleBlocks() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0 +} + +// ShouldThrottleReceipts checks if the download should be throttled (active receipt +// fetches exceed block cache). +func (q *queue) ShouldThrottleReceipts() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0 +} + +// resultSlots calculates the number of results slots available for requests +// whilst adhering to both the item and the memory limit too of the results +// cache. +func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int { + // Calculate the maximum length capped by the memory limit + limit := len(q.resultCache) + if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) { + limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) + } + // Calculate the number of slots already finished + finished := 0 + for _, result := range q.resultCache[:limit] { + if result == nil { + break + } + if _, ok := donePool[result.Hash]; ok { + finished++ + } + } + // Calculate the number of slots currently downloading + pending := 0 + for _, request := range pendPool { + for _, header := range request.Headers { + if header.Number.Uint64() < q.resultOffset+uint64(limit) { + pending++ + } + } + } + // Return the free slots to distribute + return limit - finished - pending +} + +// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill +// up an already retrieved header skeleton. +func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { + q.lock.Lock() + defer q.lock.Unlock() + + // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug) + if q.headerResults != nil { + panic("skeleton assembly already in progress") + } + // Schedule all the header retrieval tasks for the skeleton assembly + q.headerTaskPool = make(map[uint64]*types.Header) + q.headerTaskQueue = prque.New(nil) + q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains + q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerProced = 0 + q.headerOffset = from + q.headerContCh = make(chan bool, 1) + + for i, header := range skeleton { + index := from + uint64(i*MaxHeaderFetch) + + q.headerTaskPool[index] = header + q.headerTaskQueue.Push(index, -int64(index)) + } +} + +// RetrieveHeaders retrieves the header chain assemble based on the scheduled +// skeleton. +func (q *queue) RetrieveHeaders() ([]*types.Header, int) { + q.lock.Lock() + defer q.lock.Unlock() + + headers, proced := q.headerResults, q.headerProced + q.headerResults, q.headerProced = nil, 0 + + return headers, proced +} + +// Schedule adds a set of headers for the download queue for scheduling, returning +// the new headers encountered. +func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { + q.lock.Lock() + defer q.lock.Unlock() + + // Insert all the headers prioritised by the contained block number + inserts := make([]*types.Header, 0, len(headers)) + for _, header := range headers { + // Make sure chain order is honoured and preserved throughout + hash := header.Hash() + if header.Number == nil || header.Number.Uint64() != from { + log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from) + break + } + if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash { + log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash) + break + } + // Make sure no duplicate requests are executed + if _, ok := q.blockTaskPool[hash]; ok { + log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash) + continue + } + if _, ok := q.receiptTaskPool[hash]; ok { + log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash) + continue + } + // Queue the header for content retrieval + q.blockTaskPool[hash] = header + q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + + if q.mode == FastSync { + q.receiptTaskPool[hash] = header + q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + } + inserts = append(inserts, header) + q.headerHead = hash + from++ + } + return inserts +} + +// Results retrieves and permanently removes a batch of fetch results from +// the cache. the result slice will be empty if the queue has been closed. +func (q *queue) Results(block bool) []*fetchResult { + q.lock.Lock() + defer q.lock.Unlock() + + // Count the number of items available for processing + nproc := q.countProcessableItems() + for nproc == 0 && !q.closed { + if !block { + return nil + } + q.active.Wait() + nproc = q.countProcessableItems() + } + // Since we have a batch limit, don't pull more into "dangling" memory + if nproc > maxResultsProcess { + nproc = maxResultsProcess + } + results := make([]*fetchResult, nproc) + copy(results, q.resultCache[:nproc]) + if len(results) > 0 { + // Mark results as done before dropping them from the cache. + for _, result := range results { + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) + } + // Delete the results from the cache and clear the tail. + copy(q.resultCache, q.resultCache[nproc:]) + for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { + q.resultCache[i] = nil + } + // Advance the expected block number of the first cache entry. + q.resultOffset += uint64(nproc) + + // Recalculate the result item weights to prevent memory exhaustion + for _, result := range results { + size := result.Header.Size() + for _, uncle := range result.Uncles { + size += uncle.Size() + } + for _, receipt := range result.Receipts { + size += receipt.Size() + } + for _, tx := range result.Transactions { + size += tx.Size() + } + q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize + } + } + return results +} + +// countProcessableItems counts the processable items. +func (q *queue) countProcessableItems() int { + for i, result := range q.resultCache { + if result == nil || result.Pending > 0 { + return i + } + } + return len(q.resultCache) +} + +// ReserveHeaders reserves a set of headers for the given peer, skipping any +// previously failed batches. +func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the peer's already downloading something (sanity check to + // not corrupt state) + if _, ok := q.headerPendPool[p.id]; ok { + return nil + } + // Retrieve a batch of hashes, skipping previously failed ones + send, skip := uint64(0), []uint64{} + for send == 0 && !q.headerTaskQueue.Empty() { + from, _ := q.headerTaskQueue.Pop() + if q.headerPeerMiss[p.id] != nil { + if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { + skip = append(skip, from.(uint64)) + continue + } + } + send = from.(uint64) + } + // Merge all the skipped batches back + for _, from := range skip { + q.headerTaskQueue.Push(from, -int64(from)) + } + // Assemble and return the block download request + if send == 0 { + return nil + } + request := &fetchRequest{ + Peer: p, + From: send, + Time: time.Now(), + } + q.headerPendPool[p.id] = request + return request +} + +// ReserveBodies reserves a set of body fetches for the given peer, skipping any +// previously failed downloads. Beside the next batch of needed fetches, it also +// returns a flag whether empty blocks were queued requiring processing. +func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) { + isNoop := func(header *types.Header) bool { + return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash + } + q.lock.Lock() + defer q.lock.Unlock() + + return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop) +} + +// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping +// any previously failed downloads. Beside the next batch of needed fetches, it +// also returns a flag whether empty receipts were queued requiring importing. +func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) { + isNoop := func(header *types.Header) bool { + return header.ReceiptHash == types.EmptyRootHash + } + q.lock.Lock() + defer q.lock.Unlock() + + return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop) +} + +// reserveHeaders reserves a set of data download operations for a given peer, +// skipping any previously failed ones. This method is a generic version used +// by the individual special reservation functions. +// +// Note, this method expects the queue lock to be already held for writing. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. +func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { + // Short circuit if the pool has been depleted, or if the peer's already + // downloading something (sanity check not to corrupt state) + if taskQueue.Empty() { + return nil, false, nil + } + if _, ok := pendPool[p.id]; ok { + return nil, false, nil + } + // Calculate an upper limit on the items we might fetch (i.e. throttling) + space := q.resultSlots(pendPool, donePool) + + // Retrieve a batch of tasks, skipping previously failed ones + send := make([]*types.Header, 0, count) + skip := make([]*types.Header, 0) + + progress := false + for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { + header := taskQueue.PopItem().(*types.Header) + hash := header.Hash() + + // If we're the first to request this task, initialise the result container + index := int(header.Number.Int64() - int64(q.resultOffset)) + if index >= len(q.resultCache) || index < 0 { + common.Report("index allocation went beyond available resultCache space") + return nil, false, errInvalidChain + } + if q.resultCache[index] == nil { + components := 1 + if q.mode == FastSync { + components = 2 + } + q.resultCache[index] = &fetchResult{ + Pending: components, + Hash: hash, + Header: header, + } + } + // If this fetch task is a noop, skip this fetch operation + if isNoop(header) { + donePool[hash] = struct{}{} + delete(taskPool, hash) + + space, proc = space-1, proc-1 + q.resultCache[index].Pending-- + progress = true + continue + } + // Otherwise unless the peer is known not to have the data, add to the retrieve list + if p.Lacks(hash) { + skip = append(skip, header) + } else { + send = append(send, header) + } + } + // Merge all the skipped headers back + for _, header := range skip { + taskQueue.Push(header, -int64(header.Number.Uint64())) + } + if progress { + // Wake WaitResults, resultCache was modified + q.active.Signal() + } + // Assemble and return the block download request + if len(send) == 0 { + return nil, progress, nil + } + request := &fetchRequest{ + Peer: p, + Headers: send, + Time: time.Now(), + } + pendPool[p.id] = request + + return request, progress, nil +} + +// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue. +func (q *queue) CancelHeaders(request *fetchRequest) { + q.cancel(request, q.headerTaskQueue, q.headerPendPool) +} + +// CancelBodies aborts a body fetch request, returning all pending headers to the +// task queue. +func (q *queue) CancelBodies(request *fetchRequest) { + q.cancel(request, q.blockTaskQueue, q.blockPendPool) +} + +// CancelReceipts aborts a body fetch request, returning all pending headers to +// the task queue. +func (q *queue) CancelReceipts(request *fetchRequest) { + q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) +} + +// Cancel aborts a fetch request, returning all pending hashes to the task queue. +func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { + q.lock.Lock() + defer q.lock.Unlock() + + if request.From > 0 { + taskQueue.Push(request.From, -int64(request.From)) + } + for _, header := range request.Headers { + taskQueue.Push(header, -int64(header.Number.Uint64())) + } + delete(pendPool, request.Peer.id) +} + +// Revoke cancels all pending requests belonging to a given peer. This method is +// meant to be called during a peer drop to quickly reassign owned data fetches +// to remaining nodes. +func (q *queue) Revoke(peerID string) { + q.lock.Lock() + defer q.lock.Unlock() + + if request, ok := q.blockPendPool[peerID]; ok { + for _, header := range request.Headers { + q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + } + delete(q.blockPendPool, peerID) + } + if request, ok := q.receiptPendPool[peerID]; ok { + for _, header := range request.Headers { + q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + } + delete(q.receiptPendPool, peerID) + } +} + +// ExpireHeaders checks for in flight requests that exceeded a timeout allowance, +// canceling them and returning the responsible peers for penalisation. +func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter) +} + +// ExpireBodies checks for in flight block body requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalisation. +func (q *queue) ExpireBodies(timeout time.Duration) map[string]int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter) +} + +// ExpireReceipts checks for in flight receipt requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalisation. +func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) +} + +// expire is the generic check that move expired tasks from a pending pool back +// into a task pool, returning all entities caught with expired tasks. +// +// Note, this method expects the queue lock to be already held. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { + // Iterate over the expired requests and return each to the queue + expiries := make(map[string]int) + for id, request := range pendPool { + if time.Since(request.Time) > timeout { + // Update the metrics with the timeout + timeoutMeter.Mark(1) + + // Return any non satisfied requests to the pool + if request.From > 0 { + taskQueue.Push(request.From, -int64(request.From)) + } + for _, header := range request.Headers { + taskQueue.Push(header, -int64(header.Number.Uint64())) + } + // Add the peer to the expiry report along the number of failed requests + expiries[id] = len(request.Headers) + + // Remove the expired requests from the pending pool directly + delete(pendPool, id) + } + } + return expiries +} + +// DeliverHeaders injects a header retrieval response into the header results +// cache. This method either accepts all headers it received, or none of them +// if they do not map correctly to the skeleton. +// +// If the headers are accepted, the method makes an attempt to deliver the set +// of ready headers to the processor to keep the pipeline full. However it will +// not block to prevent stalling other pending deliveries. +func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the data was never requested + request := q.headerPendPool[id] + if request == nil { + return 0, errNoFetchesPending + } + headerReqTimer.UpdateSince(request.Time) + delete(q.headerPendPool, id) + + // Ensure headers can be mapped onto the skeleton chain + target := q.headerTaskPool[request.From].Hash() + + accepted := len(headers) == MaxHeaderFetch + if accepted { + if headers[0].Number.Uint64() != request.From { + log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From) + accepted = false + } else if headers[len(headers)-1].Hash() != target { + log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target) + accepted = false + } + } + if accepted { + for i, header := range headers[1:] { + hash := header.Hash() + if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { + log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want) + accepted = false + break + } + if headers[i].Hash() != header.ParentHash { + log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash) + accepted = false + break + } + } + } + // If the batch of headers wasn't accepted, mark as unavailable + if !accepted { + log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From) + + miss := q.headerPeerMiss[id] + if miss == nil { + q.headerPeerMiss[id] = make(map[uint64]struct{}) + miss = q.headerPeerMiss[id] + } + miss[request.From] = struct{}{} + + q.headerTaskQueue.Push(request.From, -int64(request.From)) + return 0, errors.New("delivery not accepted") + } + // Clean up a successful fetch and try to deliver any sub-results + copy(q.headerResults[request.From-q.headerOffset:], headers) + delete(q.headerTaskPool, request.From) + + ready := 0 + for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil { + ready += MaxHeaderFetch + } + if ready > 0 { + // Headers are ready for delivery, gather them and push forward (non blocking) + process := make([]*types.Header, ready) + copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) + + select { + case headerProcCh <- process: + log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number) + q.headerProced += len(process) + default: + } + } + // Check for termination and return + if len(q.headerTaskPool) == 0 { + q.headerContCh <- false + } + return len(headers), nil +} + +// DeliverBodies injects a block body retrieval response into the results queue. +// The method returns the number of blocks bodies accepted from the delivery and +// also wakes any threads waiting for data delivery. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + reconstruct := func(header *types.Header, index int, result *fetchResult) error { + if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { + return errInvalidBody + } + result.Transactions = txLists[index] + result.Uncles = uncleLists[index] + return nil + } + return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct) +} + +// DeliverReceipts injects a receipt retrieval response into the results queue. +// The method returns the number of transaction receipts accepted from the delivery +// and also wakes any threads waiting for data delivery. +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + reconstruct := func(header *types.Header, index int, result *fetchResult) error { + if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash { + return errInvalidReceipt + } + result.Receipts = receiptList[index] + return nil + } + return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct) +} + +// deliver injects a data retrieval response into the results queue. +// +// Note, this method expects the queue lock to be already held for writing. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, + results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { + + // Short circuit if the data was never requested + request := pendPool[id] + if request == nil { + return 0, errNoFetchesPending + } + reqTimer.UpdateSince(request.Time) + delete(pendPool, id) + + // If no data items were retrieved, mark them as unavailable for the origin peer + if results == 0 { + for _, header := range request.Headers { + request.Peer.MarkLacking(header.Hash()) + } + } + // Assemble each of the results with their headers and retrieved data parts + var ( + accepted int + failure error + useful bool + ) + for i, header := range request.Headers { + // Short circuit assembly if no more fetch results are found + if i >= results { + break + } + // Reconstruct the next result if contents match up + index := int(header.Number.Int64() - int64(q.resultOffset)) + if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil { + failure = errInvalidChain + break + } + if err := reconstruct(header, i, q.resultCache[index]); err != nil { + failure = err + break + } + hash := header.Hash() + + donePool[hash] = struct{}{} + q.resultCache[index].Pending-- + useful = true + accepted++ + + // Clean up a successful fetch + request.Headers[i] = nil + delete(taskPool, hash) + } + // Return all failed or missing fetches to the queue + for _, header := range request.Headers { + if header != nil { + taskQueue.Push(header, -int64(header.Number.Uint64())) + } + } + // Wake up WaitResults + if accepted > 0 { + q.active.Signal() + } + // If none of the data was good, it's a stale delivery + switch { + case failure == nil || failure == errInvalidChain: + return accepted, failure + case useful: + return accepted, fmt.Errorf("partial failure: %v", failure) + default: + return accepted, errStaleDelivery + } +} + +// Prepare configures the result cache to allow accepting and caching inbound +// fetch results. +func (q *queue) Prepare(offset uint64, mode SyncMode) { + q.lock.Lock() + defer q.lock.Unlock() + + // Prepare the queue for sync results + if q.resultOffset < offset { + q.resultOffset = offset + } + q.mode = mode +} diff --git a/dex/downloader/statesync.go b/dex/downloader/statesync.go new file mode 100644 index 000000000..49117abbb --- /dev/null +++ b/dex/downloader/statesync.go @@ -0,0 +1,484 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + "hash" + "sync" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/trie" + "golang.org/x/crypto/sha3" +) + +// stateReq represents a batch of state fetch requests grouped together into +// a single data retrieval network packet. +type stateReq struct { + items []common.Hash // Hashes of the state items to download + tasks map[common.Hash]*stateTask // Download tasks to track previous attempts + timeout time.Duration // Maximum round trip time for this to complete + timer *time.Timer // Timer to fire when the RTT timeout expires + peer *peerConnection // Peer that we're requesting from + response [][]byte // Response data of the peer (nil for timeouts) + dropped bool // Flag whether the peer dropped off early +} + +// timedOut returns if this request timed out. +func (req *stateReq) timedOut() bool { + return req.response == nil +} + +// stateSyncStats is a collection of progress stats to report during a state trie +// sync to RPC requests as well as to display in user logs. +type stateSyncStats struct { + processed uint64 // Number of state entries processed + duplicate uint64 // Number of state entries downloaded twice + unexpected uint64 // Number of non-requested state entries received + pending uint64 // Number of still pending state entries +} + +// syncState starts downloading state with the given root hash. +func (d *Downloader) syncState(root common.Hash) *stateSync { + s := newStateSync(d, root) + select { + case d.stateSyncStart <- s: + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + +// stateFetcher manages the active state sync and accepts requests +// on its behalf. +func (d *Downloader) stateFetcher() { + for { + select { + case s := <-d.stateSyncStart: + for next := s; next != nil; { + next = d.runStateSync(next) + } + case <-d.stateCh: + // Ignore state responses while no sync is running. + case <-d.quitCh: + return + } + } +} + +// runStateSync runs a state synchronisation until it completes or another root +// hash is requested to be switched over to. +func (d *Downloader) runStateSync(s *stateSync) *stateSync { + var ( + active = make(map[string]*stateReq) // Currently in-flight requests + finished []*stateReq // Completed or failed requests + timeout = make(chan *stateReq) // Timed out active requests + ) + defer func() { + // Cancel active request timers on exit. Also set peers to idle so they're + // available for the next sync. + for _, req := range active { + req.timer.Stop() + req.peer.SetNodeDataIdle(len(req.items)) + } + }() + // Run the state sync. + go s.run() + defer s.Cancel() + + // Listen for peer departure events to cancel assigned tasks + peerDrop := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribePeerDrops(peerDrop) + defer peerSub.Unsubscribe() + + for { + // Enable sending of the first buffered element if there is one. + var ( + deliverReq *stateReq + deliverReqCh chan *stateReq + ) + if len(finished) > 0 { + deliverReq = finished[0] + deliverReqCh = s.deliver + } + + select { + // The stateSync lifecycle: + case next := <-d.stateSyncStart: + return next + + case <-s.done: + return nil + + // Send the next finished request to the current sync: + case deliverReqCh <- deliverReq: + // Shift out the first request, but also set the emptied slot to nil for GC + copy(finished, finished[1:]) + finished[len(finished)-1] = nil + finished = finished[:len(finished)-1] + + // Handle incoming state packs: + case pack := <-d.stateCh: + // Discard any data not requested (or previously timed out) + req := active[pack.PeerId()] + if req == nil { + log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.response = pack.(*statePack).states + + finished = append(finished, req) + delete(active, pack.PeerId()) + + // Handle dropped peer connections: + case p := <-peerDrop: + // Skip if no request is currently pending + req := active[p.id] + if req == nil { + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.dropped = true + + finished = append(finished, req) + delete(active, p.id) + + // Handle timed-out requests: + case req := <-timeout: + // If the peer is already requesting something else, ignore the stale timeout. + // This can happen when the timeout and the delivery happens simultaneously, + // causing both pathways to trigger. + if active[req.peer.id] != req { + continue + } + // Move the timed out data back into the download queue + finished = append(finished, req) + delete(active, req.peer.id) + + // Track outgoing state requests: + case req := <-d.trackStateReq: + // If an active request already exists for this peer, we have a problem. In + // theory the trie node schedule must never assign two requests to the same + // peer. In practice however, a peer might receive a request, disconnect and + // immediately reconnect before the previous times out. In this case the first + // request is never honored, alas we must not silently overwrite it, as that + // causes valid requests to go missing and sync to get stuck. + if old := active[req.peer.id]; old != nil { + log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) + + // Make sure the previous one doesn't get siletly lost + old.timer.Stop() + old.dropped = true + + finished = append(finished, old) + } + // Start a timer to notify the sync loop if the peer stalled. + req.timer = time.AfterFunc(req.timeout, func() { + select { + case timeout <- req: + case <-s.done: + // Prevent leaking of timer goroutines in the unlikely case where a + // timer is fired just before exiting runStateSync. + } + }) + active[req.peer.id] = req + } + } +} + +// stateSync schedules requests for downloading a particular state trie defined +// by a given state root. +type stateSync struct { + d *Downloader // Downloader instance to access and manage current peerset + + sched *trie.Sync // State trie sync scheduler defining the tasks + keccak hash.Hash // Keccak256 hasher to verify deliveries with + tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval + + numUncommitted int + bytesUncommitted int + + deliver chan *stateReq // Delivery channel multiplexing peer responses + cancel chan struct{} // Channel to signal a termination request + cancelOnce sync.Once // Ensures cancel only ever gets called once + done chan struct{} // Channel to signal termination completion + err error // Any error hit during sync (set before completion) +} + +// stateTask represents a single trie node download task, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type stateTask struct { + attempts map[string]struct{} +} + +// newStateSync creates a new state trie download scheduler. This method does not +// yet start the sync. The user needs to call run to initiate. +func newStateSync(d *Downloader, root common.Hash) *stateSync { + return &stateSync{ + d: d, + sched: state.NewStateSync(root, d.stateDB), + keccak: sha3.NewLegacyKeccak256(), + tasks: make(map[common.Hash]*stateTask), + deliver: make(chan *stateReq), + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the task assignment and response processing loop, blocking until +// it finishes, and finally notifying any goroutines waiting for the loop to +// finish. +func (s *stateSync) run() { + s.err = s.loop() + close(s.done) +} + +// Wait blocks until the sync is done or canceled. +func (s *stateSync) Wait() error { + <-s.done + return s.err +} + +// Cancel cancels the sync and waits until it has shut down. +func (s *stateSync) Cancel() error { + s.cancelOnce.Do(func() { close(s.cancel) }) + return s.Wait() +} + +// loop is the main event loop of a state trie sync. It it responsible for the +// assignment of new tasks to peers (including sending it to them) as well as +// for the processing of inbound data. Note, that the loop does not directly +// receive data from peers, rather those are buffered up in the downloader and +// pushed here async. The reason is to decouple processing from data receipt +// and timeouts. +func (s *stateSync) loop() (err error) { + // Listen for new peer events to assign tasks to them + newPeer := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribeNewPeers(newPeer) + defer peerSub.Unsubscribe() + defer func() { + cerr := s.commit(true) + if err == nil { + err = cerr + } + }() + + // Keep assigning new tasks until the sync completes or aborts + for s.sched.Pending() > 0 { + if err = s.commit(false); err != nil { + return err + } + s.assignTasks() + // Tasks assigned, wait for something to happen + select { + case <-newPeer: + // New peer arrived, try to assign it download tasks + + case <-s.cancel: + return errCancelStateFetch + + case <-s.d.cancelCh: + return errCancelStateFetch + + case req := <-s.deliver: + // Response, disconnect or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) + if len(req.items) <= 2 && !req.dropped && req.timedOut() { + // 2 items are the minimum requested, if even that times out, we've no use of + // this peer at the moment. + log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) + s.d.dropPeer(req.peer.id) + } + // Process all the received blobs and check for stale delivery + delivered, err := s.process(req) + if err != nil { + log.Warn("Node data write error", "err", err) + return err + } + req.peer.SetNodeDataIdle(delivered) + } + } + return nil +} + +func (s *stateSync) commit(force bool) error { + if !force && s.bytesUncommitted < ethdb.IdealBatchSize { + return nil + } + start := time.Now() + b := s.d.stateDB.NewBatch() + if written, err := s.sched.Commit(b); written == 0 || err != nil { + return err + } + if err := b.Write(); err != nil { + return fmt.Errorf("DB write error: %v", err) + } + s.updateStats(s.numUncommitted, 0, 0, time.Since(start)) + s.numUncommitted = 0 + s.bytesUncommitted = 0 + return nil +} + +// assignTasks attempts to assign new tasks to all idle peers, either from the +// batch currently being retried, or fetching new data from the trie sync itself. +func (s *stateSync) assignTasks() { + // Iterate over all idle peers and try to assign them state fetches + peers, _ := s.d.peers.NodeDataIdlePeers() + for _, p := range peers { + // Assign a batch of fetches proportional to the estimated latency/bandwidth + cap := p.NodeDataCapacity(s.d.requestRTT()) + req := &stateReq{peer: p, timeout: s.d.requestTTL()} + s.fillTasks(cap, req) + + // If the peer was assigned tasks to fetch, send the network request + if len(req.items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + select { + case s.d.trackStateReq <- req: + req.peer.FetchNodeData(req.items) + case <-s.cancel: + case <-s.d.cancelCh: + } + } + } +} + +// fillTasks fills the given request object with a maximum of n state download +// tasks to send to the remote peer. +func (s *stateSync) fillTasks(n int, req *stateReq) { + // Refill available tasks from the scheduler. + if len(s.tasks) < n { + new := s.sched.Missing(n - len(s.tasks)) + for _, hash := range new { + s.tasks[hash] = &stateTask{make(map[string]struct{})} + } + } + // Find tasks that haven't been tried with the request's peer. + req.items = make([]common.Hash, 0, n) + req.tasks = make(map[common.Hash]*stateTask, n) + for hash, t := range s.tasks { + // Stop when we've gathered enough requests + if len(req.items) == n { + break + } + // Skip any requests we've already tried from this peer + if _, ok := t.attempts[req.peer.id]; ok { + continue + } + // Assign the request to this peer + t.attempts[req.peer.id] = struct{}{} + req.items = append(req.items, hash) + req.tasks[hash] = t + delete(s.tasks, hash) + } +} + +// process iterates over a batch of delivered state data, injecting each item +// into a running state sync, re-queuing any items that were requested but not +// delivered. +// Returns whether the peer actually managed to deliver anything of value, +// and any error that occurred +func (s *stateSync) process(req *stateReq) (int, error) { + // Collect processing stats and update progress if valid data was received + duplicate, unexpected, successful := 0, 0, 0 + + defer func(start time.Time) { + if duplicate > 0 || unexpected > 0 { + s.updateStats(0, duplicate, unexpected, time.Since(start)) + } + }(time.Now()) + + // Iterate over all the delivered data and inject one-by-one into the trie + progress := false + for _, blob := range req.response { + prog, hash, err := s.processNodeData(blob) + switch err { + case nil: + s.numUncommitted++ + s.bytesUncommitted += len(blob) + progress = progress || prog + successful++ + case trie.ErrNotRequested: + unexpected++ + case trie.ErrAlreadyProcessed: + duplicate++ + default: + return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + } + if _, ok := req.tasks[hash]; ok { + delete(req.tasks, hash) + } + } + // Put unfulfilled tasks back into the retry queue + npeers := s.d.peers.Len() + for hash, task := range req.tasks { + // If the node did deliver something, missing items may be due to a protocol + // limit or a previous timeout + delayed delivery. Both cases should permit + // the node to retry the missing items (to avoid single-peer stalls). + if len(req.response) > 0 || req.timedOut() { + delete(task.attempts, req.peer.id) + } + // If we've requested the node too many times already, it may be a malicious + // sync where nobody has the right data. Abort. + if len(task.attempts) >= npeers { + return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + } + // Missing item, place into the retry queue. + s.tasks[hash] = task + } + return successful, nil +} + +// processNodeData tries to inject a trie node data blob delivered from a remote +// peer into the state trie, returning whether anything useful was written or any +// error occurred. +func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { + res := trie.SyncResult{Data: blob} + s.keccak.Reset() + s.keccak.Write(blob) + s.keccak.Sum(res.Hash[:0]) + committed, _, err := s.sched.Process([]trie.SyncResult{res}) + return committed, res.Hash, err +} + +// updateStats bumps the various state sync progress counters and displays a log +// message for the user to see. +func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) { + s.d.syncStatsLock.Lock() + defer s.d.syncStatsLock.Unlock() + + s.d.syncStatsState.pending = uint64(s.sched.Pending()) + s.d.syncStatsState.processed += uint64(written) + s.d.syncStatsState.duplicate += uint64(duplicate) + s.d.syncStatsState.unexpected += uint64(unexpected) + + if written > 0 || duplicate > 0 || unexpected > 0 { + log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected) + } + if written > 0 { + rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed) + } +} diff --git a/dex/downloader/testchain_test.go b/dex/downloader/testchain_test.go new file mode 100644 index 000000000..e73bed513 --- /dev/null +++ b/dex/downloader/testchain_test.go @@ -0,0 +1,221 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + "math/big" + "sync" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/consensus/ethash" + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/params" +) + +// Test chain parameters. +var ( + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddress = crypto.PubkeyToAddress(testKey.PublicKey) + testDB = ethdb.NewMemDatabase() + testGenesis = core.GenesisBlockForTesting(testDB, testAddress, big.NewInt(1000000000)) +) + +// The common prefix of all test chains: +var testChainBase = newTestChain(blockCacheItems+200, testGenesis) + +// Different forks on top of the base chain: +var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain + +func init() { + var forkLen = int(MaxForkAncestry + 50) + var wg sync.WaitGroup + wg.Add(3) + go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() + go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }() + go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }() + wg.Wait() +} + +type testChain struct { + genesis *types.Block + chain []common.Hash + headerm map[common.Hash]*types.Header + blockm map[common.Hash]*types.Block + receiptm map[common.Hash][]*types.Receipt + tdm map[common.Hash]*big.Int +} + +// newTestChain creates a blockchain of the given length. +func newTestChain(length int, genesis *types.Block) *testChain { + tc := new(testChain).copy(length) + tc.genesis = genesis + tc.chain = append(tc.chain, genesis.Hash()) + tc.headerm[tc.genesis.Hash()] = tc.genesis.Header() + tc.tdm[tc.genesis.Hash()] = tc.genesis.Difficulty() + tc.blockm[tc.genesis.Hash()] = tc.genesis + tc.generate(length-1, 0, genesis, false) + return tc +} + +// makeFork creates a fork on top of the test chain. +func (tc *testChain) makeFork(length int, heavy bool, seed byte) *testChain { + fork := tc.copy(tc.len() + length) + fork.generate(length, seed, tc.headBlock(), heavy) + return fork +} + +// shorten creates a copy of the chain with the given length. It panics if the +// length is longer than the number of available blocks. +func (tc *testChain) shorten(length int) *testChain { + if length > tc.len() { + panic(fmt.Errorf("can't shorten test chain to %d blocks, it's only %d blocks long", length, tc.len())) + } + return tc.copy(length) +} + +func (tc *testChain) copy(newlen int) *testChain { + cpy := &testChain{ + genesis: tc.genesis, + headerm: make(map[common.Hash]*types.Header, newlen), + blockm: make(map[common.Hash]*types.Block, newlen), + receiptm: make(map[common.Hash][]*types.Receipt, newlen), + tdm: make(map[common.Hash]*big.Int, newlen), + } + for i := 0; i < len(tc.chain) && i < newlen; i++ { + hash := tc.chain[i] + cpy.chain = append(cpy.chain, tc.chain[i]) + cpy.tdm[hash] = tc.tdm[hash] + cpy.blockm[hash] = tc.blockm[hash] + cpy.headerm[hash] = tc.headerm[hash] + cpy.receiptm[hash] = tc.receiptm[hash] + } + return cpy +} + +// generate creates a chain of n blocks starting at and including parent. +// the returned hash chain is ordered head->parent. In addition, every 22th block +// contains a transaction and every 5th an uncle to allow testing correct block +// reassembly. +func (tc *testChain) generate(n int, seed byte, parent *types.Block, heavy bool) { + // start := time.Now() + // defer func() { fmt.Printf("test chain generated in %v\n", time.Since(start)) }() + + blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{seed}) + // If a heavy chain is requested, delay blocks to raise difficulty + if heavy { + block.OffsetTime(-1) + } + // Include transactions to the miner to make blocks more interesting. + if parent == tc.genesis && i%22 == 0 { + signer := types.MakeSigner(params.TestChainConfig, block.Number()) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey) + if err != nil { + panic(err) + } + block.AddTx(tx) + } + // if the block number is a multiple of 5, add a bonus uncle to the block + if i > 0 && i%5 == 0 { + block.AddUncle(&types.Header{ + ParentHash: block.PrevBlock(i - 1).Hash(), + Number: big.NewInt(block.Number().Int64() - 1), + }) + } + }) + + // Convert the block-chain into a hash-chain and header/block maps + td := new(big.Int).Set(tc.td(parent.Hash())) + for i, b := range blocks { + td := td.Add(td, b.Difficulty()) + hash := b.Hash() + tc.chain = append(tc.chain, hash) + tc.blockm[hash] = b + tc.headerm[hash] = b.Header() + tc.receiptm[hash] = receipts[i] + tc.tdm[hash] = new(big.Int).Set(td) + } +} + +// len returns the total number of blocks in the chain. +func (tc *testChain) len() int { + return len(tc.chain) +} + +// headBlock returns the head of the chain. +func (tc *testChain) headBlock() *types.Block { + return tc.blockm[tc.chain[len(tc.chain)-1]] +} + +// td returns the total difficulty of the given block. +func (tc *testChain) td(hash common.Hash) *big.Int { + return tc.tdm[hash] +} + +// headersByHash returns headers in ascending order from the given hash. +func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.Header { + num, _ := tc.hashToNumber(origin) + return tc.headersByNumber(num, amount, skip) +} + +// headersByNumber returns headers in ascending order from the given number. +func (tc *testChain) headersByNumber(origin uint64, amount int, skip int) []*types.Header { + result := make([]*types.Header, 0, amount) + for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 { + if header, ok := tc.headerm[tc.chain[int(num)]]; ok { + result = append(result, header) + } + } + return result +} + +// receipts returns the receipts of the given block hashes. +func (tc *testChain) receipts(hashes []common.Hash) [][]*types.Receipt { + results := make([][]*types.Receipt, 0, len(hashes)) + for _, hash := range hashes { + if receipt, ok := tc.receiptm[hash]; ok { + results = append(results, receipt) + } + } + return results +} + +// bodies returns the block bodies of the given block hashes. +func (tc *testChain) bodies(hashes []common.Hash) ([][]*types.Transaction, [][]*types.Header) { + transactions := make([][]*types.Transaction, 0, len(hashes)) + uncles := make([][]*types.Header, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := tc.blockm[hash]; ok { + transactions = append(transactions, block.Transactions()) + uncles = append(uncles, block.Uncles()) + } + } + return transactions, uncles +} + +func (tc *testChain) hashToNumber(target common.Hash) (uint64, bool) { + for num, hash := range tc.chain { + if hash == target { + return uint64(num), true + } + } + return 0, false +} diff --git a/dex/downloader/types.go b/dex/downloader/types.go new file mode 100644 index 000000000..d320b7590 --- /dev/null +++ b/dex/downloader/types.go @@ -0,0 +1,79 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + + "github.com/dexon-foundation/dexon/core/types" +) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) + +// dataPack is a data message returned by a peer for some query. +type dataPack interface { + PeerId() string + Items() int + Stats() string +} + +// headerPack is a batch of block headers returned by a peer. +type headerPack struct { + peerID string + headers []*types.Header +} + +func (p *headerPack) PeerId() string { return p.peerID } +func (p *headerPack) Items() int { return len(p.headers) } +func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) } + +// bodyPack is a batch of block bodies returned by a peer. +type bodyPack struct { + peerID string + transactions [][]*types.Transaction + uncles [][]*types.Header +} + +func (p *bodyPack) PeerId() string { return p.peerID } +func (p *bodyPack) Items() int { + if len(p.transactions) <= len(p.uncles) { + return len(p.transactions) + } + return len(p.uncles) +} +func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } + +// receiptPack is a batch of receipts returned by a peer. +type receiptPack struct { + peerID string + receipts [][]*types.Receipt +} + +func (p *receiptPack) PeerId() string { return p.peerID } +func (p *receiptPack) Items() int { return len(p.receipts) } +func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } + +// statePack is a batch of states returned by a peer. +type statePack struct { + peerID string + states [][]byte +} + +func (p *statePack) PeerId() string { return p.peerID } +func (p *statePack) Items() int { return len(p.states) } +func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) } diff --git a/dex/fetcher/fetcher.go b/dex/fetcher/fetcher.go new file mode 100644 index 000000000..f6807b5a5 --- /dev/null +++ b/dex/fetcher/fetcher.go @@ -0,0 +1,736 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package fetcher contains the block announcement based synchronisation. +package fetcher + +import ( + "errors" + "math/rand" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/common/prque" + "github.com/dexon-foundation/dexon/consensus" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/log" +) + +const ( + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block + maxUncleDist = 7 // Maximum allowed backward distance from the chain head + maxQueueDist = 32 // Maximum allowed distance from the chain head to queue + hashLimit = 256 // Maximum number of unique blocks a peer may have announced + blockLimit = 64 // Maximum number of unique blocks a peer may have delivered +) + +var ( + errTerminated = errors.New("terminated") +) + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// headerRequesterFn is a callback type for sending a header retrieval request. +type headerRequesterFn func(common.Hash) error + +// bodyRequesterFn is a callback type for sending a body retrieval request. +type bodyRequesterFn func([]common.Hash) error + +// headerVerifierFn is a callback type to verify a block's header for fast propagation. +type headerVerifierFn func(header *types.Header) error + +// blockBroadcasterFn is a callback type for broadcasting a block to connected peers. +type blockBroadcasterFn func(block *types.Block, propagate bool) + +// chainHeightFn is a callback type to retrieve the current chain height. +type chainHeightFn func() uint64 + +// chainInsertFn is a callback type to insert a batch of blocks into the local chain. +type chainInsertFn func(types.Blocks) (int, error) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) + +// announce is the hash notification of the availability of a new block in the +// network. +type announce struct { + hash common.Hash // Hash of the block being announced + number uint64 // Number of the block being announced (0 = unknown | old protocol) + header *types.Header // Header of the block partially reassembled (new protocol) + time time.Time // Timestamp of the announcement + + origin string // Identifier of the peer originating the notification + + fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block + fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block +} + +// headerFilterTask represents a batch of headers needing fetcher filtering. +type headerFilterTask struct { + peer string // The source peer of block headers + headers []*types.Header // Collection of headers to filter + time time.Time // Arrival time of the headers +} + +// bodyFilterTask represents a batch of block bodies (transactions and uncles) +// needing fetcher filtering. +type bodyFilterTask struct { + peer string // The source peer of block bodies + transactions [][]*types.Transaction // Collection of transactions per block bodies + uncles [][]*types.Header // Collection of uncles per block bodies + time time.Time // Arrival time of the blocks' contents +} + +// inject represents a schedules import operation. +type inject struct { + origin string + block *types.Block +} + +// Fetcher is responsible for accumulating block announcements from various peers +// and scheduling them for retrieval. +type Fetcher struct { + // Various event channels + notify chan *announce + inject chan *inject + + blockFilter chan chan []*types.Block + headerFilter chan chan *headerFilterTask + bodyFilter chan chan *bodyFilterTask + + done chan common.Hash + quit chan struct{} + + // Announce states + announces map[string]int // Per peer announce counts to prevent memory exhaustion + announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching + fetching map[common.Hash]*announce // Announced blocks, currently fetching + fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval + completing map[common.Hash]*announce // Blocks with headers, currently body-completing + + // Block cache + queue *prque.Prque // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports) + + // Callbacks + getBlock blockRetrievalFn // Retrieves a block from the local chain + verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work + broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers + chainHeight chainHeightFn // Retrieves the current chain's height + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Drops a peer for misbehaving + + // Testing hooks + announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list + queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue + fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch + completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) + importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) +} + +// New creates a block fetcher to retrieve blocks based on hash announcements. +func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher { + return &Fetcher{ + notify: make(chan *announce), + inject: make(chan *inject), + blockFilter: make(chan chan []*types.Block), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*announce), + fetching: make(map[common.Hash]*announce), + fetched: make(map[common.Hash][]*announce), + completing: make(map[common.Hash]*announce), + queue: prque.New(nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*inject), + getBlock: getBlock, + verifyHeader: verifyHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + insertChain: insertChain, + dropPeer: dropPeer, + } +} + +// Start boots up the announcement based synchroniser, accepting and processing +// hash notifications and block fetches until termination requested. +func (f *Fetcher) Start() { + go f.loop() +} + +// Stop terminates the announcement based synchroniser, canceling all pending +// operations. +func (f *Fetcher) Stop() { + close(f.quit) +} + +// Notify announces the fetcher of the potential availability of a new block in +// the network. +func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, + headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { + block := &announce{ + hash: hash, + number: number, + time: time, + origin: peer, + fetchHeader: headerFetcher, + fetchBodies: bodyFetcher, + } + select { + case f.notify <- block: + return nil + case <-f.quit: + return errTerminated + } +} + +// Enqueue tries to fill gaps the fetcher's future import queue. +func (f *Fetcher) Enqueue(peer string, block *types.Block) error { + op := &inject{ + origin: peer, + block: block, + } + select { + case f.inject <- op: + return nil + case <-f.quit: + return errTerminated + } +} + +// FilterHeaders extracts all the headers that were explicitly requested by the fetcher, +// returning those that should be handled differently. +func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { + log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) + + // Send the filter channel to the fetcher + filter := make(chan *headerFilterTask) + + select { + case f.headerFilter <- filter: + case <-f.quit: + return nil + } + // Request the filtering of the header list + select { + case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: + case <-f.quit: + return nil + } + // Retrieve the headers remaining after filtering + select { + case task := <-filter: + return task.headers + case <-f.quit: + return nil + } +} + +// FilterBodies extracts all the block bodies that were explicitly requested by +// the fetcher, returning those that should be handled differently. +func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { + log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) + + // Send the filter channel to the fetcher + filter := make(chan *bodyFilterTask) + + select { + case f.bodyFilter <- filter: + case <-f.quit: + return nil, nil + } + // Request the filtering of the body list + select { + case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: + case <-f.quit: + return nil, nil + } + // Retrieve the bodies remaining after filtering + select { + case task := <-filter: + return task.transactions, task.uncles + case <-f.quit: + return nil, nil + } +} + +// Loop is the main fetcher loop, checking and processing various notification +// events. +func (f *Fetcher) loop() { + // Iterate the block fetching until a quit is requested + fetchTimer := time.NewTimer(0) + completeTimer := time.NewTimer(0) + + for { + // Clean up any expired block fetches + for hash, announce := range f.fetching { + if time.Since(announce.time) > fetchTimeout { + f.forgetHash(hash) + } + } + // Import any queued blocks that could potentially fit + height := f.chainHeight() + for !f.queue.Empty() { + op := f.queue.PopItem().(*inject) + hash := op.block.Hash() + if f.queueChangeHook != nil { + f.queueChangeHook(hash, false) + } + // If too high up the chain or phase, continue later + number := op.block.NumberU64() + if number > height+1 { + f.queue.Push(op, -int64(number)) + if f.queueChangeHook != nil { + f.queueChangeHook(hash, true) + } + break + } + // Otherwise if fresh and still unknown, try and import + if number+maxUncleDist < height || f.getBlock(hash) != nil { + f.forgetBlock(hash) + continue + } + f.insert(op.origin, op.block) + } + // Wait for an outside event to occur + select { + case <-f.quit: + // Fetcher terminating, abort all operations + return + + case notification := <-f.notify: + // A block was announced, make sure the peer isn't DOSing us + propAnnounceInMeter.Mark(1) + + count := f.announces[notification.origin] + 1 + if count > hashLimit { + log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) + propAnnounceDOSMeter.Mark(1) + break + } + // If we have a valid block number, check that it's potentially useful + if notification.number > 0 { + if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { + log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) + propAnnounceDropMeter.Mark(1) + break + } + } + // All is well, schedule the announce if block's not yet downloading + if _, ok := f.fetching[notification.hash]; ok { + break + } + if _, ok := f.completing[notification.hash]; ok { + break + } + f.announces[notification.origin] = count + f.announced[notification.hash] = append(f.announced[notification.hash], notification) + if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 { + f.announceChangeHook(notification.hash, true) + } + if len(f.announced) == 1 { + f.rescheduleFetch(fetchTimer) + } + + case op := <-f.inject: + // A direct block insertion was requested, try and fill any pending gaps + propBroadcastInMeter.Mark(1) + f.enqueue(op.origin, op.block) + + case hash := <-f.done: + // A pending import finished, remove all traces of the notification + f.forgetHash(hash) + f.forgetBlock(hash) + + case <-fetchTimer.C: + // At least one block's timer ran out, check for needing retrieval + request := make(map[string][]common.Hash) + + for hash, announces := range f.announced { + if time.Since(announces[0].time) > arriveTimeout-gatherSlack { + // Pick a random peer to retrieve from, reset all others + announce := announces[rand.Intn(len(announces))] + f.forgetHash(hash) + + // If the block still didn't arrive, queue for fetching + if f.getBlock(hash) == nil { + request[announce.origin] = append(request[announce.origin], hash) + f.fetching[hash] = announce + } + } + } + // Send out all block header requests + for peer, hashes := range request { + log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) + + // Create a closure of the fetch and schedule in on a new thread + fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes + go func() { + if f.fetchingHook != nil { + f.fetchingHook(hashes) + } + for _, hash := range hashes { + headerFetchMeter.Mark(1) + fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals + } + }() + } + // Schedule the next fetch if blocks are still pending + f.rescheduleFetch(fetchTimer) + + case <-completeTimer.C: + // At least one header's timer ran out, retrieve everything + request := make(map[string][]common.Hash) + + for hash, announces := range f.fetched { + // Pick a random peer to retrieve from, reset all others + announce := announces[rand.Intn(len(announces))] + f.forgetHash(hash) + + // If the block still didn't arrive, queue for completion + if f.getBlock(hash) == nil { + request[announce.origin] = append(request[announce.origin], hash) + f.completing[hash] = announce + } + } + // Send out all block body requests + for peer, hashes := range request { + log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes) + + // Create a closure of the fetch and schedule in on a new thread + if f.completingHook != nil { + f.completingHook(hashes) + } + bodyFetchMeter.Mark(int64(len(hashes))) + go f.completing[hashes[0]].fetchBodies(hashes) + } + // Schedule the next fetch if blocks are still pending + f.rescheduleComplete(completeTimer) + + case filter := <-f.headerFilter: + // Headers arrived from a remote peer. Extract those that were explicitly + // requested by the fetcher, and return everything else so it's delivered + // to other parts of the system. + var task *headerFilterTask + select { + case task = <-filter: + case <-f.quit: + return + } + headerFilterInMeter.Mark(int64(len(task.headers))) + + // Split the batch of headers into unknown ones (to return to the caller), + // known incomplete ones (requiring body retrievals) and completed blocks. + unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} + for _, header := range task.headers { + hash := header.Hash() + + // Filter fetcher-requested headers from other synchronisation algorithms + if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { + // If the delivered header does not match the promised number, drop the announcer + if header.Number.Uint64() != announce.number { + log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) + f.dropPeer(announce.origin) + f.forgetHash(hash) + continue + } + // Only keep if not imported by other means + if f.getBlock(hash) == nil { + announce.header = header + announce.time = task.time + + // If the block is empty (header only), short circuit into the final import queue + if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { + log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) + + block := types.NewBlockWithHeader(header) + block.ReceivedAt = task.time + + complete = append(complete, block) + f.completing[hash] = announce + continue + } + // Otherwise add to the list of blocks needing completion + incomplete = append(incomplete, announce) + } else { + log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) + f.forgetHash(hash) + } + } else { + // Fetcher doesn't know about it, add to the return list + unknown = append(unknown, header) + } + } + headerFilterOutMeter.Mark(int64(len(unknown))) + select { + case filter <- &headerFilterTask{headers: unknown, time: task.time}: + case <-f.quit: + return + } + // Schedule the retrieved headers for body completion + for _, announce := range incomplete { + hash := announce.header.Hash() + if _, ok := f.completing[hash]; ok { + continue + } + f.fetched[hash] = append(f.fetched[hash], announce) + if len(f.fetched) == 1 { + f.rescheduleComplete(completeTimer) + } + } + // Schedule the header-only blocks for import + for _, block := range complete { + if announce := f.completing[block.Hash()]; announce != nil { + f.enqueue(announce.origin, block) + } + } + + case filter := <-f.bodyFilter: + // Block bodies arrived, extract any explicitly requested blocks, return the rest + var task *bodyFilterTask + select { + case task = <-filter: + case <-f.quit: + return + } + bodyFilterInMeter.Mark(int64(len(task.transactions))) + + blocks := []*types.Block{} + for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { + // Match up a body to any possible completion request + matched := false + + for hash, announce := range f.completing { + if f.queued[hash] == nil { + txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) + uncleHash := types.CalcUncleHash(task.uncles[i]) + + if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { + // Mark the body matched, reassemble if still unknown + matched = true + + if f.getBlock(hash) == nil { + block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) + block.ReceivedAt = task.time + + blocks = append(blocks, block) + } else { + f.forgetHash(hash) + } + } + } + } + if matched { + task.transactions = append(task.transactions[:i], task.transactions[i+1:]...) + task.uncles = append(task.uncles[:i], task.uncles[i+1:]...) + i-- + continue + } + } + + bodyFilterOutMeter.Mark(int64(len(task.transactions))) + select { + case filter <- task: + case <-f.quit: + return + } + // Schedule the retrieved blocks for ordered import + for _, block := range blocks { + if announce := f.completing[block.Hash()]; announce != nil { + f.enqueue(announce.origin, block) + } + } + } + } +} + +// rescheduleFetch resets the specified fetch timer to the next announce timeout. +func (f *Fetcher) rescheduleFetch(fetch *time.Timer) { + // Short circuit if no blocks are announced + if len(f.announced) == 0 { + return + } + // Otherwise find the earliest expiring announcement + earliest := time.Now() + for _, announces := range f.announced { + if earliest.After(announces[0].time) { + earliest = announces[0].time + } + } + fetch.Reset(arriveTimeout - time.Since(earliest)) +} + +// rescheduleComplete resets the specified completion timer to the next fetch timeout. +func (f *Fetcher) rescheduleComplete(complete *time.Timer) { + // Short circuit if no headers are fetched + if len(f.fetched) == 0 { + return + } + // Otherwise find the earliest expiring announcement + earliest := time.Now() + for _, announces := range f.fetched { + if earliest.After(announces[0].time) { + earliest = announces[0].time + } + } + complete.Reset(gatherSlack - time.Since(earliest)) +} + +// enqueue schedules a new future import operation, if the block to be imported +// has not yet been seen. +func (f *Fetcher) enqueue(peer string, block *types.Block) { + hash := block.Hash() + + // Ensure the peer isn't DOSing us + count := f.queues[peer] + 1 + if count > blockLimit { + log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) + propBroadcastDOSMeter.Mark(1) + f.forgetHash(hash) + return + } + // Discard any past or too distant blocks + if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { + log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) + propBroadcastDropMeter.Mark(1) + f.forgetHash(hash) + return + } + // Schedule the block for future importing + if _, ok := f.queued[hash]; !ok { + op := &inject{ + origin: peer, + block: block, + } + f.queues[peer] = count + f.queued[hash] = op + f.queue.Push(op, -int64(block.NumberU64())) + if f.queueChangeHook != nil { + f.queueChangeHook(op.block.Hash(), true) + } + log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) + } +} + +// insert spawns a new goroutine to run a block insertion into the chain. If the +// block's number is at the same height as the current import phase, it updates +// the phase states accordingly. +func (f *Fetcher) insert(peer string, block *types.Block) { + hash := block.Hash() + + // Run the import on a new thread + log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) + go func() { + defer func() { f.done <- hash }() + + // If the parent's unknown, abort insertion + parent := f.getBlock(block.ParentHash()) + if parent == nil { + log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + return + } + // Quickly validate the header and propagate the block if it passes + switch err := f.verifyHeader(block.Header()); err { + case nil: + // All ok, quickly propagate to our peers + propBroadcastOutTimer.UpdateSince(block.ReceivedAt) + go f.broadcastBlock(block, true) + + case consensus.ErrFutureBlock: + // Weird future block, don't fail, but neither propagate + + default: + // Something went very wrong, drop the peer + log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) + f.dropPeer(peer) + return + } + // Run the actual import and log any issues + if _, err := f.insertChain(types.Blocks{block}); err != nil { + log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) + return + } + // If import succeeded, broadcast the block + propAnnounceOutTimer.UpdateSince(block.ReceivedAt) + go f.broadcastBlock(block, false) + + // Invoke the testing hook if needed + if f.importedHook != nil { + f.importedHook(block) + } + }() +} + +// forgetHash removes all traces of a block announcement from the fetcher's +// internal state. +func (f *Fetcher) forgetHash(hash common.Hash) { + // Remove all pending announces and decrement DOS counters + for _, announce := range f.announced[hash] { + f.announces[announce.origin]-- + if f.announces[announce.origin] == 0 { + delete(f.announces, announce.origin) + } + } + delete(f.announced, hash) + if f.announceChangeHook != nil { + f.announceChangeHook(hash, false) + } + // Remove any pending fetches and decrement the DOS counters + if announce := f.fetching[hash]; announce != nil { + f.announces[announce.origin]-- + if f.announces[announce.origin] == 0 { + delete(f.announces, announce.origin) + } + delete(f.fetching, hash) + } + + // Remove any pending completion requests and decrement the DOS counters + for _, announce := range f.fetched[hash] { + f.announces[announce.origin]-- + if f.announces[announce.origin] == 0 { + delete(f.announces, announce.origin) + } + } + delete(f.fetched, hash) + + // Remove any pending completions and decrement the DOS counters + if announce := f.completing[hash]; announce != nil { + f.announces[announce.origin]-- + if f.announces[announce.origin] == 0 { + delete(f.announces, announce.origin) + } + delete(f.completing, hash) + } +} + +// forgetBlock removes all traces of a queued block from the fetcher's internal +// state. +func (f *Fetcher) forgetBlock(hash common.Hash) { + if insert := f.queued[hash]; insert != nil { + f.queues[insert.origin]-- + if f.queues[insert.origin] == 0 { + delete(f.queues, insert.origin) + } + delete(f.queued, hash) + } +} diff --git a/dex/fetcher/fetcher_test.go b/dex/fetcher/fetcher_test.go new file mode 100644 index 000000000..24611a8a0 --- /dev/null +++ b/dex/fetcher/fetcher_test.go @@ -0,0 +1,790 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package fetcher + +import ( + "errors" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/consensus/ethash" + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/params" +) + +var ( + testdb = ethdb.NewMemDatabase() + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddress = crypto.PubkeyToAddress(testKey.PublicKey) + genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) + unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil) +) + +// makeChain creates a chain of n blocks starting at and including parent. +// the returned hash chain is ordered head->parent. In addition, every 3rd block +// contains a transaction and every 5th an uncle to allow testing correct block +// reassembly. +func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) { + blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{seed}) + + // If the block number is multiple of 3, send a bonus transaction to the miner + if parent == genesis && i%3 == 0 { + signer := types.MakeSigner(params.TestChainConfig, block.Number()) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey) + if err != nil { + panic(err) + } + block.AddTx(tx) + } + // If the block number is a multiple of 5, add a bonus uncle to the block + if i%5 == 0 { + block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))}) + } + }) + hashes := make([]common.Hash, n+1) + hashes[len(hashes)-1] = parent.Hash() + blockm := make(map[common.Hash]*types.Block, n+1) + blockm[parent.Hash()] = parent + for i, b := range blocks { + hashes[len(hashes)-i-2] = b.Hash() + blockm[b.Hash()] = b + } + return hashes, blockm +} + +// fetcherTester is a test simulator for mocking out local block chain. +type fetcherTester struct { + fetcher *Fetcher + + hashes []common.Hash // Hash chain belonging to the tester + blocks map[common.Hash]*types.Block // Blocks belonging to the tester + drops map[string]bool // Map of peers dropped by the fetcher + + lock sync.RWMutex +} + +// newTester creates a new fetcher test mocker. +func newTester() *fetcherTester { + tester := &fetcherTester{ + hashes: []common.Hash{genesis.Hash()}, + blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, + drops: make(map[string]bool), + } + tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer) + tester.fetcher.Start() + + return tester +} + +// getBlock retrieves a block from the tester's block chain. +func (f *fetcherTester) getBlock(hash common.Hash) *types.Block { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.blocks[hash] +} + +// verifyHeader is a nop placeholder for the block header verification. +func (f *fetcherTester) verifyHeader(header *types.Header) error { + return nil +} + +// broadcastBlock is a nop placeholder for the block broadcasting. +func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) { +} + +// chainHeight retrieves the current height (block number) of the chain. +func (f *fetcherTester) chainHeight() uint64 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() +} + +// insertChain injects a new blocks into the simulated chain. +func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) { + f.lock.Lock() + defer f.lock.Unlock() + + for i, block := range blocks { + // Make sure the parent in known + if _, ok := f.blocks[block.ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + // Discard any new blocks if the same height already exists + if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() { + return i, nil + } + // Otherwise build our current chain + f.hashes = append(f.hashes, block.Hash()) + f.blocks[block.Hash()] = block + } + return 0, nil +} + +// dropPeer is an emulator for the peer removal, simply accumulating the various +// peers dropped by the fetcher. +func (f *fetcherTester) dropPeer(peer string) { + f.lock.Lock() + defer f.lock.Unlock() + + f.drops[peer] = true +} + +// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer. +func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn { + closure := make(map[common.Hash]*types.Block) + for hash, block := range blocks { + closure[hash] = block + } + // Create a function that return a header from the closure + return func(hash common.Hash) error { + // Gather the blocks to return + headers := make([]*types.Header, 0, 1) + if block, ok := closure[hash]; ok { + headers = append(headers, block.Header()) + } + // Return on a new thread + go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift)) + + return nil + } +} + +// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer. +func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn { + closure := make(map[common.Hash]*types.Block) + for hash, block := range blocks { + closure[hash] = block + } + // Create a function that returns blocks from the closure + return func(hashes []common.Hash) error { + // Gather the block bodies to return + transactions := make([][]*types.Transaction, 0, len(hashes)) + uncles := make([][]*types.Header, 0, len(hashes)) + + for _, hash := range hashes { + if block, ok := closure[hash]; ok { + transactions = append(transactions, block.Transactions()) + uncles = append(uncles, block.Uncles()) + } + } + // Return on a new thread + go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift)) + + return nil + } +} + +// verifyFetchingEvent verifies that one single event arrive on a fetching channel. +func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) { + if arrive { + select { + case <-fetching: + case <-time.After(time.Second): + t.Fatalf("fetching timeout") + } + } else { + select { + case <-fetching: + t.Fatalf("fetching invoked") + case <-time.After(10 * time.Millisecond): + } + } +} + +// verifyCompletingEvent verifies that one single event arrive on an completing channel. +func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) { + if arrive { + select { + case <-completing: + case <-time.After(time.Second): + t.Fatalf("completing timeout") + } + } else { + select { + case <-completing: + t.Fatalf("completing invoked") + case <-time.After(10 * time.Millisecond): + } + } +} + +// verifyImportEvent verifies that one single event arrive on an import channel. +func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) { + if arrive { + select { + case <-imported: + case <-time.After(time.Second): + t.Fatalf("import timeout") + } + } else { + select { + case <-imported: + t.Fatalf("import invoked") + case <-time.After(10 * time.Millisecond): + } + } +} + +// verifyImportCount verifies that exactly count number of events arrive on an +// import hook channel. +func verifyImportCount(t *testing.T, imported chan *types.Block, count int) { + for i := 0; i < count; i++ { + select { + case <-imported: + case <-time.After(time.Second): + t.Fatalf("block %d: import timeout", i+1) + } + } + verifyImportDone(t, imported) +} + +// verifyImportDone verifies that no more events are arriving on an import channel. +func verifyImportDone(t *testing.T, imported chan *types.Block) { + select { + case <-imported: + t.Fatalf("extra block imported") + case <-time.After(50 * time.Millisecond): + } +} + +// Tests that a fetcher accepts block announcements and initiates retrievals for +// them, successfully importing into the local chain. +func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) } +func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) } +func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) } + +func testSequentialAnnouncements(t *testing.T, protocol int) { + // Create a chain of blocks to import + targetBlocks := 4 * hashLimit + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + // Iteratively announce blocks until all are imported + imported := make(chan *types.Block) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + for i := len(hashes) - 2; i >= 0; i-- { + tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + verifyImportEvent(t, imported, true) + } + verifyImportDone(t, imported) +} + +// Tests that if blocks are announced by multiple peers (or even the same buggy +// peer), they will only get downloaded at most once. +func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) } +func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) } +func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) } + +func testConcurrentAnnouncements(t *testing.T, protocol int) { + // Create a chain of blocks to import + targetBlocks := 4 * hashLimit + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + // Assemble a tester with a built in counter for the requests + tester := newTester() + firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack) + firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0) + secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack) + secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0) + + counter := uint32(0) + firstHeaderWrapper := func(hash common.Hash) error { + atomic.AddUint32(&counter, 1) + return firstHeaderFetcher(hash) + } + secondHeaderWrapper := func(hash common.Hash) error { + atomic.AddUint32(&counter, 1) + return secondHeaderFetcher(hash) + } + // Iteratively announce blocks until all are imported + imported := make(chan *types.Block) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + for i := len(hashes) - 2; i >= 0; i-- { + tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher) + tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher) + tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher) + verifyImportEvent(t, imported, true) + } + verifyImportDone(t, imported) + + // Make sure no blocks were retrieved twice + if int(counter) != targetBlocks { + t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks) + } +} + +// Tests that announcements arriving while a previous is being fetched still +// results in a valid import. +func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) } +func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) } +func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) } + +func testOverlappingAnnouncements(t *testing.T, protocol int) { + // Create a chain of blocks to import + targetBlocks := 4 * hashLimit + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + // Iteratively announce blocks, but overlap them continuously + overlap := 16 + imported := make(chan *types.Block, len(hashes)-1) + for i := 0; i < overlap; i++ { + imported <- nil + } + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + for i := len(hashes) - 2; i >= 0; i-- { + tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + select { + case <-imported: + case <-time.After(time.Second): + t.Fatalf("block %d: import timeout", len(hashes)-i) + } + } + // Wait for all the imports to complete and check count + verifyImportCount(t, imported, overlap) +} + +// Tests that announces already being retrieved will not be duplicated. +func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) } +func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) } +func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) } + +func testPendingDeduplication(t *testing.T, protocol int) { + // Create a hash and corresponding block + hashes, blocks := makeChain(1, 0, genesis) + + // Assemble a tester with a built in counter and delayed fetcher + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0) + + delay := 50 * time.Millisecond + counter := uint32(0) + headerWrapper := func(hash common.Hash) error { + atomic.AddUint32(&counter, 1) + + // Simulate a long running fetch + go func() { + time.Sleep(delay) + headerFetcher(hash) + }() + return nil + } + // Announce the same block many times until it's fetched (wait for any pending ops) + for tester.getBlock(hashes[0]) == nil { + tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher) + time.Sleep(time.Millisecond) + } + time.Sleep(delay) + + // Check that all blocks were imported and none fetched twice + if imported := len(tester.blocks); imported != 2 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2) + } + if int(counter) != 1 { + t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1) + } +} + +// Tests that announcements retrieved in a random order are cached and eventually +// imported when all the gaps are filled in. +func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) } +func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) } +func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) } + +func testRandomArrivalImport(t *testing.T, protocol int) { + // Create a chain of blocks to import, and choose one to delay + targetBlocks := maxQueueDist + hashes, blocks := makeChain(targetBlocks, 0, genesis) + skip := targetBlocks / 2 + + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + // Iteratively announce blocks, skipping one entry + imported := make(chan *types.Block, len(hashes)-1) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + for i := len(hashes) - 1; i >= 0; i-- { + if i != skip { + tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + time.Sleep(time.Millisecond) + } + } + // Finally announce the skipped entry and check full import + tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + verifyImportCount(t, imported, len(hashes)-1) +} + +// Tests that direct block enqueues (due to block propagation vs. hash announce) +// are correctly schedule, filling and import queue gaps. +func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) } +func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) } +func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) } + +func testQueueGapFill(t *testing.T, protocol int) { + // Create a chain of blocks to import, and choose one to not announce at all + targetBlocks := maxQueueDist + hashes, blocks := makeChain(targetBlocks, 0, genesis) + skip := targetBlocks / 2 + + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + // Iteratively announce blocks, skipping one entry + imported := make(chan *types.Block, len(hashes)-1) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + for i := len(hashes) - 1; i >= 0; i-- { + if i != skip { + tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + time.Sleep(time.Millisecond) + } + } + // Fill the missing block directly as if propagated + tester.fetcher.Enqueue("valid", blocks[hashes[skip]]) + verifyImportCount(t, imported, len(hashes)-1) +} + +// Tests that blocks arriving from various sources (multiple propagations, hash +// announces, etc) do not get scheduled for import multiple times. +func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) } +func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) } +func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) } + +func testImportDeduplication(t *testing.T, protocol int) { + // Create two blocks to import (one for duplication, the other for stalling) + hashes, blocks := makeChain(2, 0, genesis) + + // Create the tester and wrap the importer with a counter + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + counter := uint32(0) + tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) { + atomic.AddUint32(&counter, uint32(len(blocks))) + return tester.insertChain(blocks) + } + // Instrument the fetching and imported events + fetching := make(chan []common.Hash) + imported := make(chan *types.Block, len(hashes)-1) + tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + // Announce the duplicating block, wait for retrieval, and also propagate directly + tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + <-fetching + + tester.fetcher.Enqueue("valid", blocks[hashes[0]]) + tester.fetcher.Enqueue("valid", blocks[hashes[0]]) + tester.fetcher.Enqueue("valid", blocks[hashes[0]]) + + // Fill the missing block directly as if propagated, and check import uniqueness + tester.fetcher.Enqueue("valid", blocks[hashes[1]]) + verifyImportCount(t, imported, 2) + + if counter != 2 { + t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2) + } +} + +// Tests that blocks with numbers much lower or higher than out current head get +// discarded to prevent wasting resources on useless blocks from faulty peers. +func TestDistantPropagationDiscarding(t *testing.T) { + // Create a long chain to import and define the discard boundaries + hashes, blocks := makeChain(3*maxQueueDist, 0, genesis) + head := hashes[len(hashes)/2] + + low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1 + + // Create a tester and simulate a head block being the middle of the above chain + tester := newTester() + + tester.lock.Lock() + tester.hashes = []common.Hash{head} + tester.blocks = map[common.Hash]*types.Block{head: blocks[head]} + tester.lock.Unlock() + + // Ensure that a block with a lower number than the threshold is discarded + tester.fetcher.Enqueue("lower", blocks[hashes[low]]) + time.Sleep(10 * time.Millisecond) + if !tester.fetcher.queue.Empty() { + t.Fatalf("fetcher queued stale block") + } + // Ensure that a block with a higher number than the threshold is discarded + tester.fetcher.Enqueue("higher", blocks[hashes[high]]) + time.Sleep(10 * time.Millisecond) + if !tester.fetcher.queue.Empty() { + t.Fatalf("fetcher queued future block") + } +} + +// Tests that announcements with numbers much lower or higher than out current +// head get discarded to prevent wasting resources on useless blocks from faulty +// peers. +func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) } +func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) } +func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) } + +func testDistantAnnouncementDiscarding(t *testing.T, protocol int) { + // Create a long chain to import and define the discard boundaries + hashes, blocks := makeChain(3*maxQueueDist, 0, genesis) + head := hashes[len(hashes)/2] + + low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1 + + // Create a tester and simulate a head block being the middle of the above chain + tester := newTester() + + tester.lock.Lock() + tester.hashes = []common.Hash{head} + tester.blocks = map[common.Hash]*types.Block{head: blocks[head]} + tester.lock.Unlock() + + headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0) + + fetching := make(chan struct{}, 2) + tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} } + + // Ensure that a block with a lower number than the threshold is discarded + tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + select { + case <-time.After(50 * time.Millisecond): + case <-fetching: + t.Fatalf("fetcher requested stale header") + } + // Ensure that a block with a higher number than the threshold is discarded + tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + select { + case <-time.After(50 * time.Millisecond): + case <-fetching: + t.Fatalf("fetcher requested future header") + } +} + +// Tests that peers announcing blocks with invalid numbers (i.e. not matching +// the headers provided afterwards) get dropped as malicious. +func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) } +func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) } +func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) } + +func testInvalidNumberAnnouncement(t *testing.T, protocol int) { + // Create a single block to import and check numbers against + hashes, blocks := makeChain(1, 0, genesis) + + tester := newTester() + badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack) + badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) + + imported := make(chan *types.Block) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + // Announce a block with a bad number, check for immediate drop + tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher) + verifyImportEvent(t, imported, false) + + tester.lock.RLock() + dropped := tester.drops["bad"] + tester.lock.RUnlock() + + if !dropped { + t.Fatalf("peer with invalid numbered announcement not dropped") + } + + goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack) + goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0) + // Make sure a good announcement passes without a drop + tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher) + verifyImportEvent(t, imported, true) + + tester.lock.RLock() + dropped = tester.drops["good"] + tester.lock.RUnlock() + + if dropped { + t.Fatalf("peer with valid numbered announcement dropped") + } + verifyImportDone(t, imported) +} + +// Tests that if a block is empty (i.e. header only), no body request should be +// made, and instead the header should be assembled into a whole block in itself. +func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) } +func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) } +func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) } + +func testEmptyBlockShortCircuit(t *testing.T, protocol int) { + // Create a chain of blocks to import + hashes, blocks := makeChain(32, 0, genesis) + + tester := newTester() + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + // Add a monitoring hook for all internal events + fetching := make(chan []common.Hash) + tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } + + completing := make(chan []common.Hash) + tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes } + + imported := make(chan *types.Block) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + + // Iteratively announce blocks until all are imported + for i := len(hashes) - 2; i >= 0; i-- { + tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + + // All announces should fetch the header + verifyFetchingEvent(t, fetching, true) + + // Only blocks with data contents should request bodies + verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0) + + // Irrelevant of the construct, import should succeed + verifyImportEvent(t, imported, true) + } + verifyImportDone(t, imported) +} + +// Tests that a peer is unable to use unbounded memory with sending infinite +// block announcements to a node, but that even in the face of such an attack, +// the fetcher remains operational. +func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) } +func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) } +func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) } + +func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { + // Create a tester with instrumented import hooks + tester := newTester() + + imported, announces := make(chan *types.Block), int32(0) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) { + if added { + atomic.AddInt32(&announces, 1) + } else { + atomic.AddInt32(&announces, -1) + } + } + // Create a valid chain and an infinite junk chain + targetBlocks := hashLimit + 2*maxQueueDist + hashes, blocks := makeChain(targetBlocks, 0, genesis) + validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) + + attack, _ := makeChain(targetBlocks, 0, unknownBlock) + attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack) + attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0) + + // Feed the tester a huge hashset from the attacker, and a limited from the valid peer + for i := 0; i < len(attack); i++ { + if i < maxQueueDist { + tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher) + } + tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher) + } + if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist { + t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist) + } + // Wait for fetches to complete + verifyImportCount(t, imported, maxQueueDist) + + // Feed the remaining valid hashes to ensure DOS protection state remains clean + for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- { + tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher) + verifyImportEvent(t, imported, true) + } + verifyImportDone(t, imported) +} + +// Tests that blocks sent to the fetcher (either through propagation or via hash +// announces and retrievals) don't pile up indefinitely, exhausting available +// system memory. +func TestBlockMemoryExhaustionAttack(t *testing.T) { + // Create a tester with instrumented import hooks + tester := newTester() + + imported, enqueued := make(chan *types.Block), int32(0) + tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) { + if added { + atomic.AddInt32(&enqueued, 1) + } else { + atomic.AddInt32(&enqueued, -1) + } + } + // Create a valid chain and a batch of dangling (but in range) blocks + targetBlocks := hashLimit + 2*maxQueueDist + hashes, blocks := makeChain(targetBlocks, 0, genesis) + attack := make(map[common.Hash]*types.Block) + for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ { + hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock) + for _, hash := range hashes[:maxQueueDist-2] { + attack[hash] = blocks[hash] + } + } + // Try to feed all the attacker blocks make sure only a limited batch is accepted + for _, block := range attack { + tester.fetcher.Enqueue("attacker", block) + } + time.Sleep(200 * time.Millisecond) + if queued := atomic.LoadInt32(&enqueued); queued != blockLimit { + t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit) + } + // Queue up a batch of valid blocks, and check that a new peer is allowed to do so + for i := 0; i < maxQueueDist-1; i++ { + tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]]) + } + time.Sleep(100 * time.Millisecond) + if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 { + t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1) + } + // Insert the missing piece (and sanity check the import) + tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]]) + verifyImportCount(t, imported, maxQueueDist) + + // Insert the remaining blocks in chunks to ensure clean DOS protection + for i := maxQueueDist; i < len(hashes)-1; i++ { + tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]]) + verifyImportEvent(t, imported, true) + } + verifyImportDone(t, imported) +} diff --git a/dex/fetcher/metrics.go b/dex/fetcher/metrics.go new file mode 100644 index 000000000..23b670549 --- /dev/null +++ b/dex/fetcher/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the metrics collected by the fetcher. + +package fetcher + +import ( + "github.com/dexon-foundation/dexon/metrics" +) + +var ( + propAnnounceInMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/in", nil) + propAnnounceOutTimer = metrics.NewRegisteredTimer("dex/fetcher/prop/announces/out", nil) + propAnnounceDropMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/drop", nil) + propAnnounceDOSMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/announces/dos", nil) + + propBroadcastInMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/in", nil) + propBroadcastOutTimer = metrics.NewRegisteredTimer("dex/fetcher/prop/broadcasts/out", nil) + propBroadcastDropMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/drop", nil) + propBroadcastDOSMeter = metrics.NewRegisteredMeter("dex/fetcher/prop/broadcasts/dos", nil) + + headerFetchMeter = metrics.NewRegisteredMeter("dex/fetcher/fetch/headers", nil) + bodyFetchMeter = metrics.NewRegisteredMeter("dex/fetcher/fetch/bodies", nil) + + headerFilterInMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/headers/in", nil) + headerFilterOutMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/headers/out", nil) + bodyFilterInMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/bodies/in", nil) + bodyFilterOutMeter = metrics.NewRegisteredMeter("dex/fetcher/filter/bodies/out", nil) +) |