aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go209
1 files changed, 170 insertions, 39 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index f6dbb4610..92124cfeb 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -54,14 +54,15 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
- headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
- headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
- bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
- bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
- receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
- receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
- stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
- stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
+ rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
+ rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
+ rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
+ ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
+ ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
+
+ qosTuningPeers = 5 // Number of peers to tune based on (best peers)
+ qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
+ qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -73,6 +74,7 @@ var (
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
+ fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
)
var (
@@ -103,14 +105,17 @@ var (
)
type Downloader struct {
- mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
- noFast bool // Flag to disable fast syncing in case of a security error
- mux *event.TypeMux // Event multiplexer to announce sync operation events
+ mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
+ mux *event.TypeMux // Event multiplexer to announce sync operation events
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
- interrupt int32 // Atomic boolean to signal termination
+ fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
+ fsPivotFails int // Number of fast sync failures in the critical section
+
+ rttEstimate uint64 // Round trip time to target for download requests
+ rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
@@ -156,6 +161,9 @@ type Downloader struct {
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
+ quitCh chan struct{} // Quit channel to signal termination
+ quitLock sync.RWMutex // Lock to prevent double closes
+
// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@@ -169,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
- return &Downloader{
+ dl := &Downloader{
mode: FullSync,
mux: mux,
queue: newQueue(stateDb),
peers: newPeerSet(),
+ rttEstimate: uint64(rttMaxEstimate),
+ rttConfidence: uint64(1000000),
hasHeader: hasHeader,
hasBlockAndState: hasBlockAndState,
getHeader: getHeader,
@@ -200,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
+ quitCh: make(chan struct{}),
}
+ go dl.qosTuner()
+ return dl
}
// Progress retrieves the synchronisation boundaries, specifically the origin
@@ -247,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
+ d.qosReduceConfidence()
+
return nil
}
@@ -314,6 +329,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default:
}
}
+ for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
+ for empty := false; !empty; {
+ select {
+ case <-ch:
+ default:
+ empty = true
+ }
+ }
+ }
for empty := false; !empty; {
select {
case <-d.headerProcCh:
@@ -330,7 +354,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden
d.mode = mode
- if d.mode == FastSync && d.noFast {
+ if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
d.mode = FullSync
}
// Retrieve the origin peer and initiate the downloading process
@@ -413,12 +437,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
pivot = height
case FastSync:
// Calculate the new fast/slow sync pivot point
- pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
- if err != nil {
- panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
- }
- if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
- pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
+ if d.fsPivotLock == nil {
+ pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
+ if err != nil {
+ panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
+ }
+ if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
+ pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
+ }
+ } else {
+ // Pivot point locked in, use this and do not pick a new one!
+ pivot = d.fsPivotLock.Number.Uint64()
}
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
@@ -498,7 +527,16 @@ func (d *Downloader) cancel() {
// Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
- atomic.StoreInt32(&d.interrupt, 1)
+ // Close the termination channel (make sure double close is allowed)
+ d.quitLock.Lock()
+ select {
+ case <-d.quitCh:
+ default:
+ close(d.quitCh)
+ }
+ d.quitLock.Unlock()
+
+ // Cancel any pending download requests
d.cancel()
}
@@ -915,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
- request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
+ request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT))
if request == nil {
continue
}
@@ -956,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// Request the advertised remote head block and wait for the response
go p.getRelHeaders(p.head, 1, 0, false)
- timeout := time.After(headerTTL)
+ timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
@@ -1024,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
- timeout := time.After(hashTTL)
+ timeout := time.After(d.requestTTL())
for finished := false; !finished; {
select {
@@ -1101,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
- timeout := time.After(hashTTL)
+ timeout := time.After(d.requestTTL())
go p.getAbsHeaders(uint64(check), 1, 0, false)
// Wait until a reply arrives to this request
@@ -1182,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
getHeaders := func(from uint64) {
request = time.Now()
- timeout.Reset(headerTTL)
+ timeout.Reset(d.requestTTL())
if skeleton {
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
@@ -1218,8 +1256,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
- d.headerProcCh <- nil
- return nil
+ select {
+ case d.headerProcCh <- nil:
+ return nil
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
}
headers := packet.(*headerPack).headers
@@ -1290,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
}
- expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
+ expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
- capacity = func(p *peer) int { return p.HeaderCapacity() }
+ capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
@@ -1320,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
+ expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
- capacity = func(p *peer) int { return p.BlockCapacity() }
+ capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
@@ -1344,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
- expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
- capacity = func(p *peer) int { return p.ReceiptCapacity() }
+ capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
@@ -1396,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error {
}
})
}
- expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
+ expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
- capacity = func(p *peer) int { return p.NodeDataCapacity() }
+ capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
@@ -1611,9 +1653,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
- // If we're already past the pivot point, this could be an attack, disable fast sync
+ // If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
- d.noFast = true
+ // If we didn't ever fail, lock in te pivot header (must! not! change!)
+ if d.fsPivotFails == 0 {
+ for _, header := range rollback {
+ if header.Number.Uint64() == pivot {
+ glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
+ d.fsPivotLock = header
+ }
+ }
+ }
+ d.fsPivotFails++
}
}
}()
@@ -1712,6 +1763,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
+ // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
+ if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
+ if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
+ glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
+ return errInvalidChain
+ }
+ }
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
@@ -1762,8 +1820,10 @@ func (d *Downloader) processContent() error {
}
for len(results) != 0 {
// Check for any termination requests
- if atomic.LoadInt32(&d.interrupt) == 1 {
+ select {
+ case <-d.quitCh:
return errCancelContentProcessing
+ default:
}
// Retrieve the a batch of results to import
var (
@@ -1864,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
return errNoSyncActive
}
}
+
+// qosTuner is the quality of service tuning loop that occasionally gathers the
+// peer latency statistics and updates the estimated request round trip time.
+func (d *Downloader) qosTuner() {
+ for {
+ // Retrieve the current median RTT and integrate into the previoust target RTT
+ rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
+ atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
+
+ // A new RTT cycle passed, increase our confidence in the estimated RTT
+ conf := atomic.LoadUint64(&d.rttConfidence)
+ conf = conf + (1000000-conf)/2
+ atomic.StoreUint64(&d.rttConfidence, conf)
+
+ // Log the new QoS values and sleep until the next RTT
+ glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
+ select {
+ case <-d.quitCh:
+ return
+ case <-time.After(rtt):
+ }
+ }
+}
+
+// qosReduceConfidence is meant to be called when a new peer joins the downloader's
+// peer set, needing to reduce the confidence we have in out QoS estimates.
+func (d *Downloader) qosReduceConfidence() {
+ // If we have a single peer, confidence is always 1
+ peers := uint64(d.peers.Len())
+ if peers == 1 {
+ atomic.StoreUint64(&d.rttConfidence, 1000000)
+ return
+ }
+ // If we have a ton of peers, don't drop confidence)
+ if peers >= uint64(qosConfidenceCap) {
+ return
+ }
+ // Otherwise drop the confidence factor
+ conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
+ if float64(conf)/1000000 < rttMinConfidence {
+ conf = uint64(rttMinConfidence * 1000000)
+ }
+ atomic.StoreUint64(&d.rttConfidence, conf)
+
+ rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
+ glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
+}
+
+// requestRTT returns the current target round trip time for a download request
+// to complete in.
+//
+// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
+// the downloader tries to adapt queries to the RTT, so multiple RTT values can
+// be adapted to, but smaller ones are preffered (stabler download stream).
+func (d *Downloader) requestRTT() time.Duration {
+ return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
+}
+
+// requestTTL returns the current timeout allowance for a single download request
+// to finish under.
+func (d *Downloader) requestTTL() time.Duration {
+ var (
+ rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
+ conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
+ )
+ ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
+ if ttl > ttlLimit {
+ ttl = ttlLimit
+ }
+ return ttl
+}