diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 188 |
1 files changed, 140 insertions, 48 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d28985b3e..f038e24e4 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -130,10 +130,9 @@ type Downloader struct { interrupt int32 // Atomic boolean to signal termination // Statistics - importStart time.Time // Instance when the last blocks were taken from the cache - importQueue []*Block // Previously taken blocks to check import progress - importDone int // Number of taken blocks already imported from the last batch - importLock sync.Mutex + syncStatsOrigin uint64 // Origin block number where syncing started at + syncStatsHeight uint64 // Highest block number known when syncing started + syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks hasBlock hashCheckFn // Checks if a block is present in the chain @@ -161,6 +160,7 @@ type Downloader struct { cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers // Testing hooks + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } @@ -192,27 +192,14 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he } } -// Stats retrieves the current status of the downloader. -func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) { - // Fetch the download status - pending, cached = d.queue.Size() +// Boundaries retrieves the synchronisation boundaries, specifically the origin +// block where synchronisation started at (may have failed/suspended) and the +// latest known block which the synchonisation targets. +func (d *Downloader) Boundaries() (uint64, uint64) { + d.syncStatsLock.RLock() + defer d.syncStatsLock.RUnlock() - // Figure out the import progress - d.importLock.Lock() - defer d.importLock.Unlock() - - for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) { - d.importQueue = d.importQueue[1:] - d.importDone++ - } - importing = len(d.importQueue) - - // Make an estimate on the total sync - estimate = 0 - if d.importDone > 0 { - estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing) - } - return + return d.syncStatsOrigin, d.syncStatsHeight } // Synchronising returns whether the downloader is currently retrieving blocks. @@ -333,14 +320,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e switch { case p.version == eth61: - // Old eth/61, use forward, concurrent hash and block retrieval algorithm - number, err := d.findAncestor61(p) + // Look up the sync boundaries: the common ancestor and the target block + latest, err := d.fetchHeight61(p) + if err != nil { + return err + } + origin, err := d.findAncestor61(p) if err != nil { return err } + d.syncStatsLock.Lock() + if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { + d.syncStatsOrigin = origin + } + d.syncStatsHeight = latest + d.syncStatsLock.Unlock() + + // Initiate the sync using a concurrent hash and block retrieval algorithm + if d.syncInitHook != nil { + d.syncInitHook(origin, latest) + } errc := make(chan error, 2) - go func() { errc <- d.fetchHashes61(p, td, number+1) }() - go func() { errc <- d.fetchBlocks61(number + 1) }() + go func() { errc <- d.fetchHashes61(p, td, origin+1) }() + go func() { errc <- d.fetchBlocks61(origin + 1) }() // If any fetcher fails, cancel the other if err := <-errc; err != nil { @@ -351,14 +353,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return <-errc case p.version >= eth62: - // New eth/62, use forward, concurrent header and block body retrieval algorithm - number, err := d.findAncestor(p) + // Look up the sync boundaries: the common ancestor and the target block + latest, err := d.fetchHeight(p) if err != nil { return err } + origin, err := d.findAncestor(p) + if err != nil { + return err + } + d.syncStatsLock.Lock() + if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { + d.syncStatsOrigin = origin + } + d.syncStatsHeight = latest + d.syncStatsLock.Unlock() + + // Initiate the sync using a concurrent hash and block retrieval algorithm + if d.syncInitHook != nil { + d.syncInitHook(origin, latest) + } errc := make(chan error, 2) - go func() { errc <- d.fetchHeaders(p, td, number+1) }() - go func() { errc <- d.fetchBodies(number + 1) }() + go func() { errc <- d.fetchHeaders(p, td, origin+1) }() + go func() { errc <- d.fetchBodies(origin + 1) }() // If any fetcher fails, cancel the other if err := <-errc; err != nil { @@ -401,6 +418,50 @@ func (d *Downloader) Terminate() { d.cancel() } +// fetchHeight61 retrieves the head block of the remote peer to aid in estimating +// the total time a pending synchronisation would take. +func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { + glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) + + // Request the advertised remote head block and wait for the response + go p.getBlocks([]common.Hash{p.head}) + + timeout := time.After(blockSoftTTL) + for { + select { + case <-d.cancelCh: + return 0, errCancelBlockFetch + + case <-d.headerCh: + // Out of bounds eth/62 block headers received, ignore them + + case <-d.bodyCh: + // Out of bounds eth/62 block bodies received, ignore them + + case <-d.hashCh: + // Out of bounds hashes received, ignore them + + case blockPack := <-d.blockCh: + // Discard anything not from the origin peer + if blockPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId) + break + } + // Make sure the peer actually gave something valid + blocks := blockPack.blocks + if len(blocks) != 1 { + glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks)) + return 0, errBadPeer + } + return blocks[0].NumberU64(), nil + + case <-timeout: + glog.V(logger.Debug).Infof("%v: head block timeout", p) + return 0, errTimeout + } + } +} + // findAncestor61 tries to locate the common ancestor block of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N blocks should already get us a match. @@ -776,6 +837,50 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } } +// fetchHeight retrieves the head header of the remote peer to aid in estimating +// the total time a pending synchronisation would take. +func (d *Downloader) fetchHeight(p *peer) (uint64, error) { + glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) + + // Request the advertised remote head block and wait for the response + go p.getRelHeaders(p.head, 1, 0, false) + + timeout := time.After(headerTTL) + for { + select { + case <-d.cancelCh: + return 0, errCancelBlockFetch + + case headerPack := <-d.headerCh: + // Discard anything not from the origin peer + if headerPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + break + } + // Make sure the peer actually gave something valid + headers := headerPack.headers + if len(headers) != 1 { + glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) + return 0, errBadPeer + } + return headers[0].Number.Uint64(), nil + + case <-d.bodyCh: + // Out of bounds block bodies received, ignore them + + case <-d.hashCh: + // Out of bounds eth/61 hashes received, ignore them + + case <-d.blockCh: + // Out of bounds eth/61 blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + } + } +} + // findAncestor tries to locate the common ancestor block of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N blocks should already get us a match. @@ -973,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // Otherwise insert all the new headers, aborting in case of junk glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from) - inserts := d.queue.Insert(headerPack.headers) + inserts := d.queue.Insert(headerPack.headers, from) if len(inserts) != len(headerPack.headers) { glog.V(logger.Debug).Infof("%v: stale headers", p) return errBadPeer @@ -1203,16 +1308,10 @@ func (d *Downloader) process() { d.process() } }() - // Release the lock upon exit (note, before checking for reentry!), and set + // Release the lock upon exit (note, before checking for reentry!) // the import statistics to zero. - defer func() { - d.importLock.Lock() - d.importQueue = nil - d.importDone = 0 - d.importLock.Unlock() + defer atomic.StoreInt32(&d.processing, 0) - atomic.StoreInt32(&d.processing, 0) - }() // Repeat the processing as long as there are blocks to import for { // Fetch the next batch of blocks @@ -1223,13 +1322,6 @@ func (d *Downloader) process() { if d.chainInsertHook != nil { d.chainInsertHook(blocks) } - // Reset the import statistics - d.importLock.Lock() - d.importStart = time.Now() - d.importQueue = blocks - d.importDone = 0 - d.importLock.Unlock() - // Actually import the blocks glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) for len(blocks) != 0 { |