diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 560 |
1 files changed, 305 insertions, 255 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 839969f03..cca4fe7a9 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -99,8 +98,9 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section @@ -109,26 +109,16 @@ type Downloader struct { rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics - syncStatsChainOrigin uint64 // Origin block number where syncing started at - syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + lightchain LightChain + blockchain BlockChain + // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain - commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - rollback chainRollbackFn // Removes a batch of recently added chain links - dropPeer peerDropFn // Drops a peer for misbehaving + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -136,16 +126,18 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer headerCh chan dataPack // [eth/62] Channel receiving inbound block headers bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -161,45 +153,83 @@ type Downloader struct { chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } +// LightChain encapsulates functions required to synchronise a light chain. +type LightChain interface { + // HasHeader verifies a header's presence in the local chain. + HasHeader(h common.Hash, number uint64) bool + + // GetHeaderByHash retrieves a header from the local chain. + GetHeaderByHash(common.Hash) *types.Header + + // CurrentHeader retrieves the head header from the local chain. + CurrentHeader() *types.Header + + // GetTdByHash returns the total difficulty of a local block. + GetTdByHash(common.Hash) *big.Int + + // InsertHeaderChain inserts a batch of headers into the local chain. + InsertHeaderChain([]*types.Header, int) (int, error) + + // Rollback removes a few recently added elements from the local chain. + Rollback([]common.Hash) +} + +// BlockChain encapsulates functions required to sync a (full or fast) blockchain. +type BlockChain interface { + LightChain + + // HasBlockAndState verifies block and associated states' presence in the local chain. + HasBlockAndState(common.Hash) bool + + // GetBlockByHash retrieves a block from the local chain. + GetBlockByHash(common.Hash) *types.Block + + // CurrentBlock retrieves the head block from the local chain. + CurrentBlock() *types.Block + + // CurrentFastBlock retrieves the head fast block from the local chain. + CurrentFastBlock() *types.Block + + // FastSyncCommitHead directly commits the head block to a certain entity. + FastSyncCommitHead(common.Hash) error + + // InsertChain inserts a batch of blocks into the local chain. + InsertChain(types.Blocks) (int, error) + + // InsertReceiptChain inserts a batch of receipts into the local chain. + InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) +} + // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn, - getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, - headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, - insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { + if lightchain == nil { + lightchain = chain + } dl := &Downloader{ - mode: mode, - mux: mux, - queue: newQueue(stateDb), - peers: newPeerSet(), - rttEstimate: uint64(rttMaxEstimate), - rttConfidence: uint64(1000000), - hasHeader: hasHeader, - hasBlockAndState: hasBlockAndState, - getHeader: getHeader, - getBlock: getBlock, - headHeader: headHeader, - headBlock: headBlock, - headFastBlock: headFastBlock, - commitHeadBlock: commitHeadBlock, - getTd: getTd, - insertHeaders: insertHeaders, - insertBlocks: insertBlocks, - insertReceipts: insertReceipts, - rollback: rollback, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - headerCh: make(chan dataPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - stateCh: make(chan dataPack, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), - stateWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), - quitCh: make(chan struct{}), + mode: mode, + stateDB: stateDb, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), + stateCh: make(chan dataPack), + stateSyncStart: make(chan *stateSync), + trackStateReq: make(chan *stateReq), } go dl.qosTuner() + go dl.stateFetcher() return dl } @@ -211,9 +241,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he // of processed and the total number of known states are also returned. Otherwise // these are zero. func (d *Downloader) Progress() ethereum.SyncProgress { - // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks - pendingStates := uint64(d.queue.PendingNodeData()) - // Lock the current stats and return the progress d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() @@ -221,18 +248,18 @@ func (d *Downloader) Progress() ethereum.SyncProgress { current := uint64(0) switch d.mode { case FullSync: - current = d.headBlock().NumberU64() + current = d.blockchain.CurrentBlock().NumberU64() case FastSync: - current = d.headFastBlock().NumberU64() + current = d.blockchain.CurrentFastBlock().NumberU64() case LightSync: - current = d.headHeader().Number.Uint64() + current = d.lightchain.CurrentHeader().Number.Uint64() } return ethereum.SyncProgress{ StartingBlock: d.syncStatsChainOrigin, CurrentBlock: current, HighestBlock: d.syncStatsChainHeight, - PulledStates: d.syncStatsStateDone, - KnownStates: d.syncStatsStateDone + pendingStates, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, } } @@ -243,13 +270,11 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn, - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error { logger := log.New("peer", id) logger.Trace("Registering sync peer") - if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil { + if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { logger.Error("Failed to register sync peer", "err", err) return err } @@ -258,6 +283,11 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea return nil } +// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer. +func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error { + return d.RegisterPeer(id, version, &lightPeerWrapper{peer}) +} + // UnregisterPeer remove a peer from the known list, preventing any action from // the specified peer. An effort is also made to return any pending fetches into // the queue. @@ -324,13 +354,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case <-ch: default: } } - for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { for empty := false; !empty; { select { case <-ch: @@ -369,7 +399,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -439,30 +469,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, height) } - return d.spawnSync(origin+1, - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync - ) + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + err = d.spawnSync(fetchers) + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + // If sync failed in the critical section, bump the fail counter. + atomic.AddUint32(&d.fsPivotFails, 1) + } + return err } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { +func (d *Downloader) spawnSync(fetchers []func() error) error { var wg sync.WaitGroup - errc := make(chan error, len(fetchers)+1) - wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.processContent() }() + errc := make(chan error, len(fetchers)) + wg.Add(len(fetchers)) for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() } // Wait for the first error, then terminate the others. var err error - for i := 0; i < len(fetchers)+1; i++ { - if i == len(fetchers) { + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { // Close the queue when all fetchers have exited. // This will cause the block processor to end when // it has processed the queue. @@ -475,11 +515,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.Cancel() wg.Wait() - - // If sync failed in the critical section, bump the fail counter - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - atomic.AddUint32(&d.fsPivotFails, 1) - } return err } @@ -517,12 +552,12 @@ func (d *Downloader) Terminate() { // fetchHeight retrieves the head header of the remote peer to aid in estimating // the total time a pending synchronisation would take. -func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { +func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Retrieving remote chain height") // Request the advertised remote head block and wait for the response - head, _ := p.currentHead() - go p.getRelHeaders(head, 1, 0, false) + head, _ := p.peer.Head() + go p.peer.RequestHeadersByHash(head, 1, 0, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -552,7 +587,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { return nil, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -564,15 +598,15 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganisation (i.e. none of // the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { +func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks - floor, ceil := int64(-1), d.headHeader().Number.Uint64() + floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) if d.mode == FullSync { - ceil = d.headBlock().NumberU64() + ceil = d.blockchain.CurrentBlock().NumberU64() } else if d.mode == FastSync { - ceil = d.headFastBlock().NumberU64() + ceil = d.blockchain.CurrentFastBlock().NumberU64() } if ceil >= MaxForkAncestry { floor = int64(ceil - MaxForkAncestry) @@ -592,7 +626,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { if count > limit { count = limit } - go p.getAbsHeaders(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -632,7 +666,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() // If every header is known, even future ones, the peer straight out lied about its head @@ -649,7 +683,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -675,7 +708,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { ttl := d.requestTTL() timeout := time.After(ttl) - go p.getAbsHeaders(uint64(check), 1, 0, false) + go p.peer.RequestHeadersByNumber(check, 1, 0, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -698,11 +731,11 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { arrived = true // Modify the search interval based on the response - if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) { + if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { end = check break } - header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists + header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, errBadPeer @@ -714,7 +747,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -737,7 +769,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. -func (d *Downloader) fetchHeaders(p *peer, from uint64) error { +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { p.log.Debug("Directing header downloads", "origin", from) defer p.log.Debug("Header download terminated") @@ -757,10 +789,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) } } // Start pulling the header chain skeleton until all is done @@ -827,7 +859,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -862,12 +894,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( } expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } - capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } + capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } ) err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, @@ -891,9 +923,9 @@ func (d *Downloader) fetchBodies(from uint64) error { return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } - capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } + capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -915,9 +947,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } - capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } + capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -927,68 +959,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any -// available peers, reserving a chunk of nodes for each, waiting for delivery and -// also periodically checking for timeouts. -func (d *Downloader) fetchNodeData() error { - log.Debug("Downloading node state data") - - var ( - deliver = func(packet dataPack) (int, error) { - start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { - // If the peer returned old-requested data, forgive - if err == trie.ErrNotRequested { - log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) - return - } - if err != nil { - // If the node data processing failed, the root hash is very wrong, abort - log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.Cancel() - return - } - // Processing succeeded, notify state fetcher of continuation - pending := d.queue.PendingNodeData() - if pending > 0 { - select { - case d.stateWakeCh <- true: - default: - } - } - d.syncStatsLock.Lock() - d.syncStatsStateDone += uint64(delivered) - syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below - d.syncStatsLock.Unlock() - - // If real database progress was made, reset any fast-sync pivot failure - if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) - atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Log a message to the user and return - if delivered > 0 { - log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) - } - }) - } - expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveNodeData(p, count), false, nil - } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } - ) - err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - - log.Debug("Node state data download terminated", "err", err) - return err -} - // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. @@ -1015,9 +985,9 @@ func (d *Downloader) fetchNodeData() error { // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - kind: textual label of the type being downloaded to display in log mesages func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, - expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), - fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), + fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, + idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1123,6 +1093,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv throttled = true break } + // Short circuit if there is no more available task. + if pending() == 0 { + break + } // Reserve a chunk of fetches for a peer. A nil can mean either that // no more headers are available, or that the peer is known not to // have them. @@ -1182,29 +1156,25 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { for i, header := range rollback { hashes[i] = header.Hash() } - lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0 - if d.headFastBlock != nil { - lastFastBlock = d.headFastBlock().Number() - } - if d.headBlock != nil { - lastBlock = d.headBlock().Number() + lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 + if d.mode != LightSync { + lastFastBlock = d.blockchain.CurrentFastBlock().Number() + lastBlock = d.blockchain.CurrentBlock().Number() } - d.rollback(hashes) + d.lightchain.Rollback(hashes) curFastBlock, curBlock := common.Big0, common.Big0 - if d.headFastBlock != nil { - curFastBlock = d.headFastBlock().Number() - } - if d.headBlock != nil { - curBlock = d.headBlock().Number() + if d.mode != LightSync { + curFastBlock = d.blockchain.CurrentFastBlock().Number() + curBlock = d.blockchain.CurrentBlock().Number() } log.Warn("Rolled back headers", "count", len(hashes), - "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number), + "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { - // If we didn't ever fail, lock in te pivot header (must! not! change!) + // If we didn't ever fail, lock in the pivot header (must! not! change!) if atomic.LoadUint32(&d.fsPivotFails) == 0 { for _, header := range rollback { if header.Number.Uint64() == pivot { @@ -1229,7 +1199,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1248,7 +1218,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give if d.mode != LightSync { - if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { + if !gotHeaders && td.Cmp(d.blockchain.GetTdByHash(d.blockchain.CurrentBlock().Hash())) > 0 { return errStallingPeer } } @@ -1260,7 +1230,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // queued for processing when the header download completes. However, as long as the // peer gave us something useful, we're already happy/progressed (above check). if d.mode == FastSync || d.mode == LightSync { - if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { + if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 { return errStallingPeer } } @@ -1290,7 +1260,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Collect the yet unknown headers to mark them as uncertain unknown := make([]*types.Header, 0, len(headers)) for _, header := range chunk { - if !d.hasHeader(header.Hash()) { + if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { unknown = append(unknown, header) } } @@ -1299,7 +1269,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } - if n, err := d.insertHeaders(chunk, frequency); err != nil { + if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { rollback = append(rollback, chunk[:n]...) @@ -1341,7 +1311,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: @@ -1351,71 +1321,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } -// processContent takes fetch results from the queue and tries to import them -// into the chain. The type of import operation will depend on the result contents. -func (d *Downloader) processContent() error { - pivot := d.queue.FastSyncPivot() +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.WaitResults() if len(results) == 0 { - return nil // queue empty + return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Actually import the blocks - first, last := results[0].Header, results[len(results)-1].Header + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + for len(results) != 0 { + // Check for any termination requests. This makes clean shutdown faster. + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) - for len(results) != 0 { - // Check for any termination requests - select { - case <-d.quitCh: - return errCancelContentProcessing - default: - } - // Retrieve the a batch of results to import - var ( - blocks = make([]*types.Block, 0, maxResultsProcess) - receipts = make([]types.Receipts, 0, maxResultsProcess) - ) - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - for _, result := range results[:items] { - switch { - case d.mode == FullSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - case d.mode == FastSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - if result.Header.Number.Uint64() <= pivot { - receipts = append(receipts, result.Receipts) - } - } - } - // Try to process the results, aborting if there's an error - var ( - err error - index int - ) - switch { - case len(receipts) > 0: - index, err = d.insertReceipts(blocks, receipts) - if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash()) - index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) - } - default: - index, err = d.insertBlocks(blocks) + blocks := make([]*types.Block, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.blockchain.InsertChain(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. + // This should get us most of the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil { + d.queue.Close() // wake up WaitResults + } + }() + + pivot := d.queue.FastSyncPivot() + for { + results := d.queue.WaitResults() + if len(results) == 0 { + return stateSync.Cancel() + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + stateSync.Cancel() + if err := d.commitPivotBlock(P); err != nil { + return err } - if err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + } + if err := d.importBlockResults(afterP); err != nil { + return err + } + } +} + +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + for len(results) != 0 { + // Check for any termination requests. + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err } - // Shift the results to the next batch - results = results[items:] + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, items) + receipts := make([]types.Receipts, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + // Sync the pivot block state. This should complete reasonably quickly because + // we've already synced up to the reported head block state earlier. + if err := d.syncState(b.Root()).Wait(); err != nil { + return err + } + log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + return err } + return d.blockchain.FastSyncCommitHead(b.Hash()) } // DeliverHeaders injects a new batch of block headers received from a remote @@ -1468,7 +1518,7 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i func (d *Downloader) qosTuner() { for { // Retrieve the current median RTT and integrate into the previoust target RTT - rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) + rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) // A new RTT cycle passed, increase our confidence in the estimated RTT |