aboutsummaryrefslogtreecommitdiffstats
path: root/dex/downloader/queue.go
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2018-11-20 14:13:53 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:53 +0800
commitafc01df41cafd3a8b56db9f32e23da3bf0e6b7ef (patch)
tree28013bb14a19611adecda49d1dfbeb0a34dad66a /dex/downloader/queue.go
parent2113837c006aad6af75c09d37514591fd6863dbc (diff)
downloaddexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.gz
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.bz2
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.lz
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.xz
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.zst
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.zip
dex: implement downloader for dex
We need governance state to verify block's signature (randomness), but in ethereum fast sync mode, eth downloader only downloads the whole state of pivot block, so we don't have governance state to verify the downloaded block that is before pivot block if we don't processing transaction. To avoid running transactions, dex downloader also downloads the governance state (merkle proof and storage) at snapshot height of each round, so that we can verify blocks in fast sync mode.
Diffstat (limited to 'dex/downloader/queue.go')
-rw-r--r--dex/downloader/queue.go42
1 files changed, 21 insertions, 21 deletions
diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go
index 12c75e793..f3a36ec3c 100644
--- a/dex/downloader/queue.go
+++ b/dex/downloader/queue.go
@@ -68,15 +68,15 @@ type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
- headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
- headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
- headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
- headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
- headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
- headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
- headerProced int // [eth/62] Number of headers already processed from the results
- headerOffset uint64 // [eth/62] Number of the first header in the result cache
- headerContCh chan bool // [eth/62] Channel to notify when header download finishes
+ headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ headerTaskPool map[uint64]*types.HeaderWithGovState // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
+ headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
+ headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
+ headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
+ headerResults []*types.HeaderWithGovState // [eth/62] Result cache accumulating the completed headers
+ headerProced int // [eth/62] Number of headers already processed from the results
+ headerOffset uint64 // [eth/62] Number of the first header in the result cache
+ headerContCh chan bool // [eth/62] Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
@@ -267,7 +267,7 @@ func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[comm
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
-func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
+func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.HeaderWithGovState) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -276,10 +276,10 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
panic("skeleton assembly already in progress")
}
// Schedule all the header retrieval tasks for the skeleton assembly
- q.headerTaskPool = make(map[uint64]*types.Header)
+ q.headerTaskPool = make(map[uint64]*types.HeaderWithGovState)
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
- q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
+ q.headerResults = make([]*types.HeaderWithGovState, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
@@ -294,7 +294,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
-func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
+func (q *queue) RetrieveHeaders() ([]*types.HeaderWithGovState, int) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -306,7 +306,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
-func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
+func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
@@ -333,14 +333,14 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
continue
}
// Queue the header for content retrieval
- q.blockTaskPool[hash] = header
- q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ q.blockTaskPool[hash] = header.Header
+ q.blockTaskQueue.Push(header.Header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
- q.receiptTaskPool[hash] = header
- q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ q.receiptTaskPool[hash] = header.Header
+ q.receiptTaskQueue.Push(header.Header, -int64(header.Number.Uint64()))
}
- inserts = append(inserts, header)
+ inserts = append(inserts, header.Header)
q.headerHead = hash
from++
}
@@ -679,7 +679,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However it will
// not block to prevent stalling other pending deliveries.
-func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
+func (q *queue) DeliverHeaders(id string, headers []*types.HeaderWithGovState, headerProcCh chan []*types.HeaderWithGovState) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -743,7 +743,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking)
- process := make([]*types.Header, ready)
+ process := make([]*types.HeaderWithGovState, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
select {