diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 209 |
1 files changed, 170 insertions, 39 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f6dbb4610..92124cfeb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,14 +54,15 @@ var ( blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired - headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) - headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out - bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request - bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired - receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request - receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired - stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request - stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired + rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests + rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests + rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value + ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion + ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts + + qosTuningPeers = 5 // Number of peers to tune based on (best peers) + qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence + qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) @@ -73,6 +74,7 @@ var ( fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync + fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing ) var ( @@ -103,14 +105,17 @@ var ( ) type Downloader struct { - mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) - noFast bool // Flag to disable fast syncing in case of a security error - mux *event.TypeMux // Event multiplexer to announce sync operation events + mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) + mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed - interrupt int32 // Atomic boolean to signal termination + fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) + fsPivotFails int // Number of fast sync failures in the critical section + + rttEstimate uint64 // Round trip time to target for download requests + rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics syncStatsChainOrigin uint64 // Origin block number where syncing started at @@ -156,6 +161,9 @@ type Downloader struct { cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers + quitCh chan struct{} // Quit channel to signal termination + quitLock sync.RWMutex // Lock to prevent double closes + // Testing hooks syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch @@ -169,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { - return &Downloader{ + dl := &Downloader{ mode: FullSync, mux: mux, queue: newQueue(stateDb), peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), hasHeader: hasHeader, hasBlockAndState: hasBlockAndState, getHeader: getHeader, @@ -200,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha receiptWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), } + go dl.qosTuner() + return dl } // Progress retrieves the synchronisation boundaries, specifically the origin @@ -247,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, glog.V(logger.Error).Infoln("Register failed:", err) return err } + d.qosReduceConfidence() + return nil } @@ -314,6 +329,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode default: } } + for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for empty := false; !empty; { + select { + case <-ch: + default: + empty = true + } + } + } for empty := false; !empty; { select { case <-d.headerProcCh: @@ -330,7 +354,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Set the requested sync mode, unless it's forbidden d.mode = mode - if d.mode == FastSync && d.noFast { + if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials { d.mode = FullSync } // Retrieve the origin peer and initiate the downloading process @@ -413,12 +437,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e pivot = height case FastSync: // Calculate the new fast/slow sync pivot point - pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) - if err != nil { - panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) - } - if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { - pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + if d.fsPivotLock == nil { + pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) + if err != nil { + panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) + } + if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { + pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + } + } else { + // Pivot point locked in, use this and do not pick a new one! + pivot = d.fsPivotLock.Number.Uint64() } // If the point is below the origin, move origin back to ensure state download if pivot < origin { @@ -498,7 +527,16 @@ func (d *Downloader) cancel() { // Terminate interrupts the downloader, canceling all pending operations. // The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { - atomic.StoreInt32(&d.interrupt, 1) + // Close the termination channel (make sure double close is allowed) + d.quitLock.Lock() + select { + case <-d.quitCh: + default: + close(d.quitCh) + } + d.quitLock.Unlock() + + // Cancel any pending download requests d.cancel() } @@ -915,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.ReserveBlocks(peer, peer.BlockCapacity()) + request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT)) if request == nil { continue } @@ -956,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // Request the advertised remote head block and wait for the response go p.getRelHeaders(p.head, 1, 0, false) - timeout := time.After(headerTTL) + timeout := time.After(d.requestTTL()) for { select { case <-d.cancelCh: @@ -1024,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} - timeout := time.After(hashTTL) + timeout := time.After(d.requestTTL()) for finished := false; !finished; { select { @@ -1101,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 - timeout := time.After(hashTTL) + timeout := time.After(d.requestTTL()) go p.getAbsHeaders(uint64(check), 1, 0, false) // Wait until a reply arrives to this request @@ -1182,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { getHeaders := func(from uint64) { request = time.Now() - timeout.Reset(headerTTL) + timeout.Reset(d.requestTTL()) if skeleton { glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from) @@ -1218,8 +1256,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - d.headerProcCh <- nil - return nil + select { + case d.headerProcCh <- nil: + return nil + case <-d.cancelCh: + return errCancelHeaderFetch + } } headers := packet.(*headerPack).headers @@ -1290,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) } - expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } + expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } - capacity = func(p *peer) int { return p.HeaderCapacity() } + capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } ) err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, @@ -1320,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } + expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } - capacity = func(p *peer) int { return p.BlockCapacity() } + capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, @@ -1344,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } - expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } + expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } - capacity = func(p *peer) int { return p.ReceiptCapacity() } + capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, @@ -1396,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error { } }) } - expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } + expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveNodeData(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity() } + capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, @@ -1611,9 +1653,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) - // If we're already past the pivot point, this could be an attack, disable fast sync + // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { - d.noFast = true + // If we didn't ever fail, lock in te pivot header (must! not! change!) + if d.fsPivotFails == 0 { + for _, header := range rollback { + if header.Number.Uint64() == pivot { + glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]) + d.fsPivotLock = header + } + } + } + d.fsPivotFails++ } } }() @@ -1712,6 +1763,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } + // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in + if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot { + if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() { + glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4]) + return errInvalidChain + } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit @@ -1762,8 +1820,10 @@ func (d *Downloader) processContent() error { } for len(results) != 0 { // Check for any termination requests - if atomic.LoadInt32(&d.interrupt) == 1 { + select { + case <-d.quitCh: return errCancelContentProcessing + default: } // Retrieve the a batch of results to import var ( @@ -1864,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i return errNoSyncActive } } + +// qosTuner is the quality of service tuning loop that occasionally gathers the +// peer latency statistics and updates the estimated request round trip time. +func (d *Downloader) qosTuner() { + for { + // Retrieve the current median RTT and integrate into the previoust target RTT + rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) + atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) + + // A new RTT cycle passed, increase our confidence in the estimated RTT + conf := atomic.LoadUint64(&d.rttConfidence) + conf = conf + (1000000-conf)/2 + atomic.StoreUint64(&d.rttConfidence, conf) + + // Log the new QoS values and sleep until the next RTT + glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL()) + select { + case <-d.quitCh: + return + case <-time.After(rtt): + } + } +} + +// qosReduceConfidence is meant to be called when a new peer joins the downloader's +// peer set, needing to reduce the confidence we have in out QoS estimates. +func (d *Downloader) qosReduceConfidence() { + // If we have a single peer, confidence is always 1 + peers := uint64(d.peers.Len()) + if peers == 1 { + atomic.StoreUint64(&d.rttConfidence, 1000000) + return + } + // If we have a ton of peers, don't drop confidence) + if peers >= uint64(qosConfidenceCap) { + return + } + // Otherwise drop the confidence factor + conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers + if float64(conf)/1000000 < rttMinConfidence { + conf = uint64(rttMinConfidence * 1000000) + } + atomic.StoreUint64(&d.rttConfidence, conf) + + rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate)) + glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL()) +} + +// requestRTT returns the current target round trip time for a download request +// to complete in. +// +// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that +// the downloader tries to adapt queries to the RTT, so multiple RTT values can +// be adapted to, but smaller ones are preffered (stabler download stream). +func (d *Downloader) requestRTT() time.Duration { + return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10 +} + +// requestTTL returns the current timeout allowance for a single download request +// to finish under. +func (d *Downloader) requestTTL() time.Duration { + var ( + rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate)) + conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0 + ) + ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf) + if ttl > ttlLimit { + ttl = ttlLimit + } + return ttl +} |