From 6a33954731658667056466bf7573ed1c397f4750 Mon Sep 17 00:00:00 2001 From: Wenbiao Zheng Date: Mon, 3 Sep 2018 23:33:21 +0800 Subject: core, eth, trie: use common/prque (#17508) --- eth/downloader/queue.go | 34 +++++++++++++++++----------------- eth/fetcher/fetcher.go | 8 ++++---- 2 files changed, 21 insertions(+), 21 deletions(-) (limited to 'eth') diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 8529535ba..a0e8a6d48 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,10 +26,10 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) var ( @@ -105,11 +105,11 @@ func newQueue() *queue { headerPendPool: make(map[string]*fetchRequest), headerContCh: make(chan bool), blockTaskPool: make(map[common.Hash]*types.Header), - blockTaskQueue: prque.New(), + blockTaskQueue: prque.New(nil), blockPendPool: make(map[string]*fetchRequest), blockDonePool: make(map[common.Hash]struct{}), receiptTaskPool: make(map[common.Hash]*types.Header), - receiptTaskQueue: prque.New(), + receiptTaskQueue: prque.New(nil), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), resultCache: make([]*fetchResult, blockCacheItems), @@ -277,7 +277,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { } // Schedule all the header retrieval tasks for the skeleton assembly q.headerTaskPool = make(map[uint64]*types.Header) - q.headerTaskQueue = prque.New() + q.headerTaskQueue = prque.New(nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 @@ -288,7 +288,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { index := from + uint64(i*MaxHeaderFetch) q.headerTaskPool[index] = header - q.headerTaskQueue.Push(index, -float32(index)) + q.headerTaskQueue.Push(index, -int64(index)) } } @@ -334,11 +334,11 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { } // Queue the header for content retrieval q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) + q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) if q.mode == FastSync { q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) + q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } inserts = append(inserts, header) q.headerHead = hash @@ -436,7 +436,7 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { } // Merge all the skipped batches back for _, from := range skip { - q.headerTaskQueue.Push(from, -float32(from)) + q.headerTaskQueue.Push(from, -int64(from)) } // Assemble and return the block download request if send == 0 { @@ -542,7 +542,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common } // Merge all the skipped headers back for _, header := range skip { - taskQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -int64(header.Number.Uint64())) } if progress { // Wake WaitResults, resultCache was modified @@ -585,10 +585,10 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m defer q.lock.Unlock() if request.From > 0 { - taskQueue.Push(request.From, -float32(request.From)) + taskQueue.Push(request.From, -int64(request.From)) } for _, header := range request.Headers { - taskQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -int64(header.Number.Uint64())) } delete(pendPool, request.Peer.id) } @@ -602,13 +602,13 @@ func (q *queue) Revoke(peerID string) { if request, ok := q.blockPendPool[peerID]; ok { for _, header := range request.Headers { - q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) + q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) } delete(q.blockPendPool, peerID) } if request, ok := q.receiptPendPool[peerID]; ok { for _, header := range request.Headers { - q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) + q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } delete(q.receiptPendPool, peerID) } @@ -657,10 +657,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // Return any non satisfied requests to the pool if request.From > 0 { - taskQueue.Push(request.From, -float32(request.From)) + taskQueue.Push(request.From, -int64(request.From)) } for _, header := range request.Headers { - taskQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -int64(header.Number.Uint64())) } // Add the peer to the expiry report along the number of failed requests expiries[id] = len(request.Headers) @@ -731,7 +731,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } miss[request.From] = struct{}{} - q.headerTaskQueue.Push(request.From, -float32(request.From)) + q.headerTaskQueue.Push(request.From, -int64(request.From)) return 0, errors.New("delivery not accepted") } // Clean up a successful fetch and try to deliver any sub-results @@ -854,7 +854,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ // Return all failed or missing fetches to the queue for _, header := range request.Headers { if header != nil { - taskQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -int64(header.Number.Uint64())) } } // Wake up WaitResults diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 277f14b81..f0b5e8064 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -23,10 +23,10 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) const ( @@ -160,7 +160,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc fetching: make(map[common.Hash]*announce), fetched: make(map[common.Hash][]*announce), completing: make(map[common.Hash]*announce), - queue: prque.New(), + queue: prque.New(nil), queues: make(map[string]int), queued: make(map[common.Hash]*inject), getBlock: getBlock, @@ -299,7 +299,7 @@ func (f *Fetcher) loop() { // If too high up the chain or phase, continue later number := op.block.NumberU64() if number > height+1 { - f.queue.Push(op, -float32(number)) + f.queue.Push(op, -int64(number)) if f.queueChangeHook != nil { f.queueChangeHook(hash, true) } @@ -624,7 +624,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { } f.queues[peer] = count f.queued[hash] = op - f.queue.Push(op, -float32(block.NumberU64())) + f.queue.Push(op, -int64(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } -- cgit v1.2.3