aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorWenbiao Zheng <delweng@gmail.com>2018-09-03 23:33:21 +0800
committerFelix Lange <fjl@users.noreply.github.com>2018-09-03 23:33:21 +0800
commit6a33954731658667056466bf7573ed1c397f4750 (patch)
treed61c7672f1b0f451587efce656b94db3e187c4cc /eth
parent6fc84946203929143c51010e0a10d9fa61fb9b92 (diff)
downloaddexon-6a33954731658667056466bf7573ed1c397f4750.tar
dexon-6a33954731658667056466bf7573ed1c397f4750.tar.gz
dexon-6a33954731658667056466bf7573ed1c397f4750.tar.bz2
dexon-6a33954731658667056466bf7573ed1c397f4750.tar.lz
dexon-6a33954731658667056466bf7573ed1c397f4750.tar.xz
dexon-6a33954731658667056466bf7573ed1c397f4750.tar.zst
dexon-6a33954731658667056466bf7573ed1c397f4750.zip
core, eth, trie: use common/prque (#17508)
Diffstat (limited to 'eth')
-rw-r--r--eth/downloader/queue.go34
-rw-r--r--eth/fetcher/fetcher.go8
2 files changed, 21 insertions, 21 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 8529535ba..a0e8a6d48 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -26,10 +26,10 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@@ -105,11 +105,11 @@ func newQueue() *queue {
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
- blockTaskQueue: prque.New(),
+ blockTaskQueue: prque.New(nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
- receiptTaskQueue: prque.New(),
+ receiptTaskQueue: prque.New(nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
@@ -277,7 +277,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
}
// Schedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
- q.headerTaskQueue = prque.New()
+ q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
@@ -288,7 +288,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
- q.headerTaskQueue.Push(index, -float32(index))
+ q.headerTaskQueue.Push(index, -int64(index))
}
}
@@ -334,11 +334,11 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
- q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
q.receiptTaskPool[hash] = header
- q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
@@ -436,7 +436,7 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
}
// Merge all the skipped batches back
for _, from := range skip {
- q.headerTaskQueue.Push(from, -float32(from))
+ q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
@@ -542,7 +542,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
// Merge all the skipped headers back
for _, header := range skip {
- taskQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
// Wake WaitResults, resultCache was modified
@@ -585,10 +585,10 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
defer q.lock.Unlock()
if request.From > 0 {
- taskQueue.Push(request.From, -float32(request.From))
+ taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
- taskQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
@@ -602,13 +602,13 @@ func (q *queue) Revoke(peerID string) {
if request, ok := q.blockPendPool[peerID]; ok {
for _, header := range request.Headers {
- q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.blockPendPool, peerID)
}
if request, ok := q.receiptPendPool[peerID]; ok {
for _, header := range request.Headers {
- q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerID)
}
@@ -657,10 +657,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// Return any non satisfied requests to the pool
if request.From > 0 {
- taskQueue.Push(request.From, -float32(request.From))
+ taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
- taskQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the number of failed requests
expiries[id] = len(request.Headers)
@@ -731,7 +731,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
miss[request.From] = struct{}{}
- q.headerTaskQueue.Push(request.From, -float32(request.From))
+ q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
@@ -854,7 +854,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
- taskQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up WaitResults
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index 277f14b81..f0b5e8064 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -23,10 +23,10 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
const (
@@ -160,7 +160,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc
fetching: make(map[common.Hash]*announce),
fetched: make(map[common.Hash][]*announce),
completing: make(map[common.Hash]*announce),
- queue: prque.New(),
+ queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*inject),
getBlock: getBlock,
@@ -299,7 +299,7 @@ func (f *Fetcher) loop() {
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
if number > height+1 {
- f.queue.Push(op, -float32(number))
+ f.queue.Push(op, -int64(number))
if f.queueChangeHook != nil {
f.queueChangeHook(hash, true)
}
@@ -624,7 +624,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
}
f.queues[peer] = count
f.queued[hash] = op
- f.queue.Push(op, -float32(block.NumberU64()))
+ f.queue.Push(op, -int64(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}