diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-07 17:14:30 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-19 15:03:10 +0800 |
commit | b97e34a8e4d06b315cc495819ba6612f89dec54f (patch) | |
tree | 22ddf740ffe180b29b9b5a3a94684d7ac2a5ae19 /eth/downloader | |
parent | ab27bee25a845be90bd60e774ff68d2ea1501772 (diff) | |
download | go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.gz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.bz2 go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.lz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.xz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.zst go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.zip |
eth/downloader: concurrent receipt and state processing
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 64 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 4 | ||||
-rw-r--r-- | eth/downloader/queue.go | 160 | ||||
-rw-r--r-- | eth/downloader/types.go | 2 |
4 files changed, 158 insertions, 72 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 96177ae8a..e19b70dfd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -830,7 +830,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // If there's nothing more to fetch, wait or terminate if d.queue.PendingBlocks() == 0 { - if d.queue.InFlight() == 0 && finished { + if !d.queue.InFlightBlocks() && finished { glog.V(logger.Debug).Infof("Block fetching completed") return nil } @@ -864,7 +864,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !throttled && d.queue.InFlight() == 0 && len(idles) == total { + if !throttled && !d.queue.InFlightBlocks() && len(idles) == total { return errPeersUnavailable } } @@ -1124,7 +1124,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) if d.mode == FastSync || d.mode == LightSync { - if n, err := d.insertHeaders(headers, false); err != nil { + if n, err := d.insertHeaders(headers, headerCheckFrequency); err != nil { glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) return errInvalidChain } @@ -1194,8 +1194,8 @@ func (d *Downloader) fetchBodies(from uint64) error { setIdle = func(p *peer) { p.SetBlocksIdle() } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, - d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, - fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body") + d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, + d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body") glog.V(logger.Debug).Infof("Block body download terminated: %v", err) return err @@ -1218,8 +1218,8 @@ func (d *Downloader) fetchReceipts(from uint64) error { setIdle = func(p *peer) { p.SetReceiptsIdle() } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, - d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, - fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") + d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, + d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") glog.V(logger.Debug).Infof("Receipt download terminated: %v", err) return err @@ -1234,15 +1234,29 @@ func (d *Downloader) fetchNodeData() error { var ( deliver = func(packet dataPack) error { start := time.Now() - done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states) - - d.syncStatsLock.Lock() - totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found) - d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown - d.syncStatsLock.Unlock() + return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { + if err != nil { + // If the node data processing failed, the root hash is very wrong, abort + glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err) + d.cancel() + return + } + // Processing succeeded, notify state fetcher and processor of continuation + if d.queue.PendingNodeData() == 0 { + go d.process() + } else { + select { + case d.stateWakeCh <- true: + default: + } + } + // Log a message to the user and return + d.syncStatsLock.Lock() + defer d.syncStatsLock.Unlock() - glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start)) - return err + d.syncStatsStateDone += uint64(delivered) + glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) + }) } expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } throttle = func() bool { return false } @@ -1254,8 +1268,8 @@ func (d *Downloader) fetchNodeData() error { setIdle = func(p *peer) { p.SetNodeDataIdle() } ) err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData, - capacity, d.peers.ReceiptIdlePeers, setIdle, "State") + d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, + d.queue.CancelNodeData, capacity, d.peers.ReceiptIdlePeers, setIdle, "State") glog.V(logger.Debug).Infof("Node state data download terminated: %v", err) return err @@ -1265,8 +1279,9 @@ func (d *Downloader) fetchNodeData() error { // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, - expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), - fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), + fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, + idle func() ([]*peer, int), setIdle func(*peer), kind string) error { // Create a ticker to detect expired retreival tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1378,14 +1393,14 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } // If there's nothing more to fetch, wait or terminate if pending() == 0 { - if d.queue.InFlight() == 0 && finished { + if !inFlight() && finished { glog.V(logger.Debug).Infof("%s fetching completed", kind) return nil } break } // Send a download request to all idle peers, until throttled - progressed, throttled := false, false + progressed, throttled, running := false, false, inFlight() idles, total := idle() for _, peer := range idles { @@ -1423,10 +1438,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv glog.V(logger.Error).Infof("%v: %s fetch failed, rescheduling", peer, strings.ToLower(kind)) cancel(request) } + running = true } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !progressed && !throttled && d.queue.InFlight() == 0 && len(idles) == total { + if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable } } @@ -1514,12 +1530,12 @@ func (d *Downloader) process() { ) switch { case len(headers) > 0: - index, err = d.insertHeaders(headers, true) + index, err = d.insertHeaders(headers, headerCheckFrequency) case len(receipts) > 0: index, err = d.insertReceipts(blocks, receipts) if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot { - err = d.commitHeadBlock(blocks[len(blocks)-1].Hash()) + index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) } default: index, err = d.insertBlocks(blocks) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8944ae4b0..0e60371b3 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -268,7 +268,7 @@ func (dl *downloadTester) getTd(hash common.Hash) *big.Int { } // insertHeaders injects a new batch of headers into the simulated chain. -func (dl *downloadTester) insertHeaders(headers []*types.Header, verify bool) (int, error) { +func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -1262,7 +1262,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) { pending.Wait() // Simulate a successful sync above the fork - tester.downloader.syncStatsOrigin = tester.downloader.syncStatsHeight + tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight // Synchronise with the second fork and check boundary resets tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 942ed0d63..bb8d892cd 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -93,8 +94,10 @@ type queue struct { stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator + stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly + stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator + stateProcessors int32 // [eth/63] Number of currently running state processors + stateSchedLock sync.RWMutex // [eth/63] Lock serializing access to the state scheduler resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block-chain @@ -175,18 +178,40 @@ func (q *queue) PendingReceipts() int { // PendingNodeData retrieves the number of node data entries pending for retrieval. func (q *queue) PendingNodeData() int { + q.stateSchedLock.RLock() + defer q.stateSchedLock.RUnlock() + + if q.stateScheduler != nil { + return q.stateScheduler.Pending() + } + return 0 +} + +// InFlightBlocks retrieves whether there are block fetch requests currently in +// flight. +func (q *queue) InFlightBlocks() bool { q.lock.RLock() defer q.lock.RUnlock() - return q.stateTaskQueue.Size() + return len(q.blockPendPool) > 0 } -// InFlight retrieves the number of fetch requests currently in flight. -func (q *queue) InFlight() int { +// InFlightReceipts retrieves whether there are receipt fetch requests currently +// in flight. +func (q *queue) InFlightReceipts() bool { q.lock.RLock() defer q.lock.RUnlock() - return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) + return len(q.receiptPendPool) > 0 +} + +// InFlightNodeData retrieves whether there are node data entry fetch requests +// currently in flight. +func (q *queue) InFlightNodeData() bool { + q.lock.RLock() + defer q.lock.RUnlock() + + return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 } // Idle returns if the queue is fully idle or has some data still inside. This @@ -199,6 +224,12 @@ func (q *queue) Idle() bool { pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) + q.stateSchedLock.RLock() + if q.stateScheduler != nil { + queued += q.stateScheduler.Pending() + } + q.stateSchedLock.RUnlock() + return (queued + pending + cached) == 0 } @@ -299,12 +330,9 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { } if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { // Pivoting point of the fast sync, retrieve the state tries + q.stateSchedLock.Lock() q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - for _, hash := range q.stateScheduler.Missing(0) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - } + q.stateSchedLock.Unlock() } inserts = append(inserts, header) q.headerHead = hash @@ -325,8 +353,13 @@ func (q *queue) GetHeadResult() *fetchResult { if q.resultCache[0].Pending > 0 { return nil } - if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { - return nil + if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { + if len(q.stateTaskPool) > 0 { + return nil + } + if q.PendingNodeData() > 0 { + return nil + } } return q.resultCache[0] } @@ -345,8 +378,13 @@ func (q *queue) TakeResults() []*fetchResult { break } // The fast sync pivot block may only be processed after state fetch completes - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { - break + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { + if len(q.stateTaskPool) > 0 { + break + } + if q.PendingNodeData() > 0 { + break + } } // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { @@ -373,26 +411,34 @@ func (q *queue) TakeResults() []*fetchResult { // ReserveBlocks reserves a set of block hashes for the given peer, skipping any // previously failed download. func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { - return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool)) + return q.reserveHashes(p, count, q.hashQueue, nil, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool)) } // ReserveNodeData reserves a set of node data hashes for the given peer, skipping // any previously failed download. func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { - return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0) + // Create a task generator to fetch status-fetch tasks if all schedules ones are done + generator := func(max int) { + q.stateSchedLock.Lock() + defer q.stateSchedLock.Unlock() + + for _, hash := range q.stateScheduler.Missing(max) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + } + } + return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, count) } // reserveHashes reserves a set of hashes for the given peer, skipping previously // failed ones. -func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { +func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the pool has been depleted, or if the peer's already - // downloading something (sanity check not to corrupt state) - if taskQueue.Empty() { - return nil - } + // Short circuit if the peer's already downloading something (sanity check not + // to corrupt state) if _, ok := pendPool[p.id]; ok { return nil } @@ -403,6 +449,13 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPo allowance -= len(request.Hashes) } } + // If there's a task generator, ask it to fill our task queue + if taskGen != nil && taskQueue.Size() < allowance { + taskGen(allowance - taskQueue.Size()) + } + if taskQueue.Empty() { + return nil + } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) @@ -809,14 +862,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } // DeliverNodeData injects a node state data retrieval response into the queue. -func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) { +func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the data was never requested request := q.statePendPool[id] if request == nil { - return 0, 0, errNoFetchesPending + return errNoFetchesPending } stateReqTimer.UpdateSince(request.Time) delete(q.statePendPool, id) @@ -829,7 +882,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) { } // Iterate over the downloaded data and verify each of them errs := make([]error, 0) - processed := 0 + process := []trie.SyncResult{} for _, blob := range data { // Skip any blocks that were not requested hash := common.BytesToHash(crypto.Sha3(blob)) @@ -837,41 +890,58 @@ func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) { errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) continue } - // Inject the next state trie item into the database - if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil { - errs = []error{err} - break - } - processed++ + // Inject the next state trie item into the processing queue + process = append(process, trie.SyncResult{hash, blob}) delete(request.Hashes, hash) delete(q.stateTaskPool, hash) } + // Start the asynchronous node state data injection + atomic.AddInt32(&q.stateProcessors, 1) + go func() { + defer atomic.AddInt32(&q.stateProcessors, -1) + q.deliverNodeData(process, callback) + }() // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.stateTaskQueue.Push(hash, float32(index)) } - // Also enqueue any newly required state trie nodes - discovered := 0 - if len(q.stateTaskPool) < maxQueuedStates { - for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - discovered++ - } - } // If none of the data items were good, it's a stale delivery switch { case len(errs) == 0: - return processed, discovered, nil + return nil case len(errs) == len(request.Hashes): - return processed, discovered, errStaleDelivery + return errStaleDelivery default: - return processed, discovered, fmt.Errorf("multiple failures: %v", errs) + return fmt.Errorf("multiple failures: %v", errs) + } +} + +// deliverNodeData is the asynchronous node data processor that injects a batch +// of sync results into the state scheduler. +func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { + // Process results one by one to permit task fetches in between + for i, result := range results { + q.stateSchedLock.Lock() + + if q.stateScheduler == nil { + // Syncing aborted since this async delivery started, bail out + q.stateSchedLock.Unlock() + callback(errNoFetchesPending, i) + return + } + if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil { + // Processing a state result failed, bail out + q.stateSchedLock.Unlock() + callback(err, i) + return + } + // Item processing succeeded, release the lock (temporarily) + q.stateSchedLock.Unlock() } + callback(nil, len(results)) } // Prepare configures the result cache to allow accepting and caching inbound diff --git a/eth/downloader/types.go b/eth/downloader/types.go index 221ef38f6..60d9a2b12 100644 --- a/eth/downloader/types.go +++ b/eth/downloader/types.go @@ -52,7 +52,7 @@ type headBlockCommitterFn func(common.Hash) error type tdRetrievalFn func(common.Hash) *big.Int // headerChainInsertFn is a callback type to insert a batch of headers into the local chain. -type headerChainInsertFn func([]*types.Header, bool) (int, error) +type headerChainInsertFn func([]*types.Header, int) (int, error) // blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. type blockChainInsertFn func(types.Blocks) (int, error) |