diff options
Diffstat (limited to 'dex/downloader/queue.go')
-rw-r--r-- | dex/downloader/queue.go | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go index 12c75e793..f3a36ec3c 100644 --- a/dex/downloader/queue.go +++ b/dex/downloader/queue.go @@ -68,15 +68,15 @@ type queue struct { mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching // Headers are "special", they download in batches, supported by a skeleton chain - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order - headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for - headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable - headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations - headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers - headerProced int // [eth/62] Number of headers already processed from the results - headerOffset uint64 // [eth/62] Number of the first header in the result cache - headerContCh chan bool // [eth/62] Channel to notify when header download finishes + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + headerTaskPool map[uint64]*types.HeaderWithGovState // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers + headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable + headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations + headerResults []*types.HeaderWithGovState // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results + headerOffset uint64 // [eth/62] Number of the first header in the result cache + headerContCh chan bool // [eth/62] Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers @@ -267,7 +267,7 @@ func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[comm // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill // up an already retrieved header skeleton. -func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { +func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.HeaderWithGovState) { q.lock.Lock() defer q.lock.Unlock() @@ -276,10 +276,10 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { panic("skeleton assembly already in progress") } // Schedule all the header retrieval tasks for the skeleton assembly - q.headerTaskPool = make(map[uint64]*types.Header) + q.headerTaskPool = make(map[uint64]*types.HeaderWithGovState) q.headerTaskQueue = prque.New(nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains - q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerResults = make([]*types.HeaderWithGovState, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 q.headerOffset = from q.headerContCh = make(chan bool, 1) @@ -294,7 +294,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { // RetrieveHeaders retrieves the header chain assemble based on the scheduled // skeleton. -func (q *queue) RetrieveHeaders() ([]*types.Header, int) { +func (q *queue) RetrieveHeaders() ([]*types.HeaderWithGovState, int) { q.lock.Lock() defer q.lock.Unlock() @@ -306,7 +306,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, int) { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -333,14 +333,14 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { continue } // Queue the header for content retrieval - q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.blockTaskPool[hash] = header.Header + q.blockTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) if q.mode == FastSync { - q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.receiptTaskPool[hash] = header.Header + q.receiptTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) } - inserts = append(inserts, header) + inserts = append(inserts, header.Header) q.headerHead = hash from++ } @@ -679,7 +679,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // If the headers are accepted, the method makes an attempt to deliver the set // of ready headers to the processor to keep the pipeline full. However it will // not block to prevent stalling other pending deliveries. -func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { +func (q *queue) DeliverHeaders(id string, headers []*types.HeaderWithGovState, headerProcCh chan []*types.HeaderWithGovState) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -743,7 +743,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } if ready > 0 { // Headers are ready for delivery, gather them and push forward (non blocking) - process := make([]*types.Header, ready) + process := make([]*types.HeaderWithGovState, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { |