aboutsummaryrefslogtreecommitdiffstats
path: root/dex/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/downloader/downloader.go')
-rw-r--r--dex/downloader/downloader.go199
1 files changed, 158 insertions, 41 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go
index 0383a3709..69d95d801 100644
--- a/dex/downloader/downloader.go
+++ b/dex/downloader/downloader.go
@@ -1,4 +1,3 @@
-// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -25,15 +24,21 @@ import (
"sync/atomic"
"time"
+ dexCore "github.com/dexon-foundation/dexon-consensus/core"
+
ethereum "github.com/dexon-foundation/dexon"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/rawdb"
+ "github.com/dexon-foundation/dexon/core/state"
"github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/core/vm"
+ "github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/ethdb"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/metrics"
"github.com/dexon-foundation/dexon/params"
+ "github.com/dexon-foundation/dexon/trie"
)
var (
@@ -63,11 +68,10 @@ var (
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
- fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
- fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
- fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
- fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
- fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
)
var (
@@ -103,6 +107,9 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
+ gov *governance
+ verifierCache *dexCore.TSigVerifierCache
+
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -125,12 +132,14 @@ type Downloader struct {
committed int32
// Channels
- headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ govStateCh chan dataPack
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+
+ headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks
// for stateFetcher
stateSyncStart chan *stateSync
@@ -161,14 +170,18 @@ type LightChain interface {
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header
+ GetHeaderByNumber(number uint64) *types.Header
+
// CurrentHeader retrieves the head header from the local chain.
CurrentHeader() *types.Header
+ GetGovStateByNumber(number uint64) (*types.GovState, error)
+
// GetTd returns the total difficulty of a local block.
GetTd(common.Hash, uint64) *big.Int
- // InsertHeaderChain inserts a batch of headers into the local chain.
- InsertHeaderChain([]*types.Header, int) (int, error)
+ // InsertHeaderChain2 inserts a batch of headers into the local chain.
+ InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error)
// Rollback removes a few recently added elements from the local chain.
Rollback([]common.Hash)
@@ -193,8 +206,8 @@ type BlockChain interface {
// FastSyncCommitHead directly commits the head block to a certain entity.
FastSyncCommitHead(common.Hash) error
- // InsertChain inserts a batch of blocks into the local chain.
- InsertChain(types.Blocks) (int, error)
+ // InsertChain2 inserts a batch of blocks into the local chain.
+ InsertChain2(types.Blocks) (int, error)
// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
@@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
+ govStateCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
- headerProcCh: make(chan []*types.Header, 1),
+ headerProcCh: make(chan []*types.HeaderWithGovState, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
stateSyncStart: make(chan *stateSync),
@@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
+
+ if d.mode == FastSync || d.mode == LightSync {
+ // fetch gov state
+ govState, err := d.fetchGovState(p, latest.Hash(), latest.Root)
+ if err != nil {
+ return err
+ }
+
+ originHeader := d.lightchain.GetHeaderByNumber(origin)
+ if originHeader == nil {
+ return fmt.Errorf("origin header not exists, number: %d", origin)
+ }
+
+ // prepare state origin - 2
+ d.gov = newGovernance(govState)
+ for i := uint64(0); i < 3; i++ {
+ if originHeader.Round >= i {
+ h := d.gov.GetRoundHeight(originHeader.Round - i)
+ s, err := d.lightchain.GetGovStateByNumber(h)
+ if err != nil {
+ return err
+ }
+ d.gov.StoreState(s)
+ }
+ }
+
+ d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5)
+ }
+
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
@@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
// Request the advertised remote head block and wait for the response
head, _ := p.peer.Head()
- go p.peer.RequestHeadersByHash(head, 1, 0, false)
+ go p.peer.RequestHeadersByHash(head, 1, 0, false, false)
ttl := d.requestTTL()
timeout := time.After(ttl)
@@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
}
- head := headers[0]
+ head := headers[0].Header
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil
@@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
}
}
+func (d *Downloader) fetchGovState(p *peerConnection,
+ hash common.Hash, root common.Hash) (*types.GovState, error) {
+ go p.peer.RequestGovStateByHash(hash)
+
+ ttl := d.requestTTL()
+ timeout := time.After(ttl)
+ for {
+ select {
+ case <-d.cancelCh:
+ return nil, errCancelBlockFetch
+ case packet := <-d.govStateCh:
+ if packet.PeerId() != p.id {
+ log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId())
+ break
+ }
+
+ // TODO(sonic): refactor this.
+ govState := packet.(*govStatePack).govState
+
+ // reconstruct the gov state
+ db := ethdb.NewMemDatabase()
+ for _, value := range govState.Proof {
+ db.Put(crypto.Keccak256(value), value)
+ }
+
+ // proof and state should split
+ // check the state object of governance contract
+ key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes())
+ _, _, err := trie.VerifyProof(root, key, db)
+ if err != nil {
+ return nil, err
+ }
+
+ triedb := trie.NewDatabase(db)
+ t, err := trie.New(common.Hash{}, triedb)
+ for _, entry := range govState.Storage {
+ t.TryUpdate(entry[0], entry[1])
+ }
+ err = triedb.Commit(t.Hash(), false)
+ if err != nil {
+ return nil, err
+ }
+
+ statedb, err := state.New(root, state.NewDatabase(db))
+ if err != nil {
+ return nil, err
+ }
+
+ storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress)
+ if storageTrie == nil {
+ return nil, fmt.Errorf("storage not match")
+ }
+ return govState, nil
+ case <-timeout:
+ p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+ return nil, errTimeout
+
+ case <-d.bodyCh:
+ case <-d.receiptCh:
+ }
+ }
+}
+
// findAncestor tries to locate the common ancestor link of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N links should already get us a match.
@@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
if count > limit {
count = limit
}
- go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
+ go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
ttl := d.requestTTL()
timeout := time.After(ttl)
- go p.peer.RequestHeadersByNumber(check, 1, 0, false)
+ go p.peer.RequestHeadersByNumber(check, 1, 0, false, false)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
@@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
if skeleton {
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
- go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true)
} else {
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
- go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
+ go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true)
}
}
// Start pulling the header chain skeleton until all is done
@@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
//
// The method returns the entire filled skeleton and also the number of headers
// already forwarded for processing.
-func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) {
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)
@@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
case <-d.cancelCh:
return errCancelHeaderProcessing
- case headers := <-d.headerProcCh:
+ case headersWithGovState := <-d.headerProcCh:
// Terminate header processing if we synced up
- if len(headers) == 0 {
+ if len(headersWithGovState) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
@@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
- for len(headers) > 0 {
+ for len(headersWithGovState) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
@@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
- if limit > len(headers) {
- limit = len(headers)
+ if limit > len(headersWithGovState) {
+ limit = len(headersWithGovState)
}
- chunk := headers[:limit]
+ chunk := headersWithGovState[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
+ // TODO(sonic) update the gov state to make TSigVerify correct
// Collect the yet unknown headers to mark them as uncertain
- unknown := make([]*types.Header, 0, len(headers))
+ unknown := make([]*types.Header, 0, len(headersWithGovState))
for _, header := range chunk {
if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
- unknown = append(unknown, header)
+ unknown = append(unknown, header.Header)
}
}
- // If we're importing pure headers, verify based on their recentness
- frequency := fsHeaderCheckFrequency
- if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
- frequency = 1
+
+ for _, header := range chunk {
+ if header.GovState != nil {
+ log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64())
+ d.gov.StoreState(header.GovState)
+ }
}
- if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
+
+ if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
- rollback = append(rollback, chunk[:n]...)
+ for _, h := range chunk[:n] {
+ rollback = append(rollback, h.Header)
+ }
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
@@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
if len(rollback) > fsHeaderSafetyNet {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
+
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
@@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
return errBadPeer
}
}
- headers = headers[limit:]
+ headersWithGovState = headersWithGovState[limit:]
origin += uint64(limit)
}
@@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
- if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ if index, err := d.blockchain.InsertChain2(blocks); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
@@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
-func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
+func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) {
return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
+func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error {
+ return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter)
+}
+
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)