aboutsummaryrefslogtreecommitdiffstats
path: root/dex/downloader/downloader.go
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2018-11-20 14:13:53 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:53 +0800
commitafc01df41cafd3a8b56db9f32e23da3bf0e6b7ef (patch)
tree28013bb14a19611adecda49d1dfbeb0a34dad66a /dex/downloader/downloader.go
parent2113837c006aad6af75c09d37514591fd6863dbc (diff)
downloaddexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.gz
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.bz2
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.lz
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.xz
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.zst
dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.zip
dex: implement downloader for dex
We need governance state to verify block's signature (randomness), but in ethereum fast sync mode, eth downloader only downloads the whole state of pivot block, so we don't have governance state to verify the downloaded block that is before pivot block if we don't processing transaction. To avoid running transactions, dex downloader also downloads the governance state (merkle proof and storage) at snapshot height of each round, so that we can verify blocks in fast sync mode.
Diffstat (limited to 'dex/downloader/downloader.go')
-rw-r--r--dex/downloader/downloader.go199
1 files changed, 158 insertions, 41 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go
index 0383a3709..69d95d801 100644
--- a/dex/downloader/downloader.go
+++ b/dex/downloader/downloader.go
@@ -1,4 +1,3 @@
-// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -25,15 +24,21 @@ import (
"sync/atomic"
"time"
+ dexCore "github.com/dexon-foundation/dexon-consensus/core"
+
ethereum "github.com/dexon-foundation/dexon"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/rawdb"
+ "github.com/dexon-foundation/dexon/core/state"
"github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/core/vm"
+ "github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/ethdb"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/metrics"
"github.com/dexon-foundation/dexon/params"
+ "github.com/dexon-foundation/dexon/trie"
)
var (
@@ -63,11 +68,10 @@ var (
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
- fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
- fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
- fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
- fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
- fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
)
var (
@@ -103,6 +107,9 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
+ gov *governance
+ verifierCache *dexCore.TSigVerifierCache
+
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -125,12 +132,14 @@ type Downloader struct {
committed int32
// Channels
- headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ govStateCh chan dataPack
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+
+ headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks
// for stateFetcher
stateSyncStart chan *stateSync
@@ -161,14 +170,18 @@ type LightChain interface {
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header
+ GetHeaderByNumber(number uint64) *types.Header
+
// CurrentHeader retrieves the head header from the local chain.
CurrentHeader() *types.Header
+ GetGovStateByNumber(number uint64) (*types.GovState, error)
+
// GetTd returns the total difficulty of a local block.
GetTd(common.Hash, uint64) *big.Int
- // InsertHeaderChain inserts a batch of headers into the local chain.
- InsertHeaderChain([]*types.Header, int) (int, error)
+ // InsertHeaderChain2 inserts a batch of headers into the local chain.
+ InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error)
// Rollback removes a few recently added elements from the local chain.
Rollback([]common.Hash)
@@ -193,8 +206,8 @@ type BlockChain interface {
// FastSyncCommitHead directly commits the head block to a certain entity.
FastSyncCommitHead(common.Hash) error
- // InsertChain inserts a batch of blocks into the local chain.
- InsertChain(types.Blocks) (int, error)
+ // InsertChain2 inserts a batch of blocks into the local chain.
+ InsertChain2(types.Blocks) (int, error)
// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
@@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
+ govStateCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
- headerProcCh: make(chan []*types.Header, 1),
+ headerProcCh: make(chan []*types.HeaderWithGovState, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
stateSyncStart: make(chan *stateSync),
@@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
+
+ if d.mode == FastSync || d.mode == LightSync {
+ // fetch gov state
+ govState, err := d.fetchGovState(p, latest.Hash(), latest.Root)
+ if err != nil {
+ return err
+ }
+
+ originHeader := d.lightchain.GetHeaderByNumber(origin)
+ if originHeader == nil {
+ return fmt.Errorf("origin header not exists, number: %d", origin)
+ }
+
+ // prepare state origin - 2
+ d.gov = newGovernance(govState)
+ for i := uint64(0); i < 3; i++ {
+ if originHeader.Round >= i {
+ h := d.gov.GetRoundHeight(originHeader.Round - i)
+ s, err := d.lightchain.GetGovStateByNumber(h)
+ if err != nil {
+ return err
+ }
+ d.gov.StoreState(s)
+ }
+ }
+
+ d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5)
+ }
+
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
@@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
// Request the advertised remote head block and wait for the response
head, _ := p.peer.Head()
- go p.peer.RequestHeadersByHash(head, 1, 0, false)
+ go p.peer.RequestHeadersByHash(head, 1, 0, false, false)
ttl := d.requestTTL()
timeout := time.After(ttl)
@@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
}
- head := headers[0]
+ head := headers[0].Header
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil
@@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
}
}
+func (d *Downloader) fetchGovState(p *peerConnection,
+ hash common.Hash, root common.Hash) (*types.GovState, error) {
+ go p.peer.RequestGovStateByHash(hash)
+
+ ttl := d.requestTTL()
+ timeout := time.After(ttl)
+ for {
+ select {
+ case <-d.cancelCh:
+ return nil, errCancelBlockFetch
+ case packet := <-d.govStateCh:
+ if packet.PeerId() != p.id {
+ log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId())
+ break
+ }
+
+ // TODO(sonic): refactor this.
+ govState := packet.(*govStatePack).govState
+
+ // reconstruct the gov state
+ db := ethdb.NewMemDatabase()
+ for _, value := range govState.Proof {
+ db.Put(crypto.Keccak256(value), value)
+ }
+
+ // proof and state should split
+ // check the state object of governance contract
+ key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes())
+ _, _, err := trie.VerifyProof(root, key, db)
+ if err != nil {
+ return nil, err
+ }
+
+ triedb := trie.NewDatabase(db)
+ t, err := trie.New(common.Hash{}, triedb)
+ for _, entry := range govState.Storage {
+ t.TryUpdate(entry[0], entry[1])
+ }
+ err = triedb.Commit(t.Hash(), false)
+ if err != nil {
+ return nil, err
+ }
+
+ statedb, err := state.New(root, state.NewDatabase(db))
+ if err != nil {
+ return nil, err
+ }
+
+ storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress)
+ if storageTrie == nil {
+ return nil, fmt.Errorf("storage not match")
+ }
+ return govState, nil
+ case <-timeout:
+ p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+ return nil, errTimeout
+
+ case <-d.bodyCh:
+ case <-d.receiptCh:
+ }
+ }
+}
+
// findAncestor tries to locate the common ancestor link of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N links should already get us a match.
@@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
if count > limit {
count = limit
}
- go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
+ go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
ttl := d.requestTTL()
timeout := time.After(ttl)
- go p.peer.RequestHeadersByNumber(check, 1, 0, false)
+ go p.peer.RequestHeadersByNumber(check, 1, 0, false, false)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
@@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
if skeleton {
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
- go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true)
} else {
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
- go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
+ go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true)
}
}
// Start pulling the header chain skeleton until all is done
@@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
//
// The method returns the entire filled skeleton and also the number of headers
// already forwarded for processing.
-func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) {
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)
@@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
case <-d.cancelCh:
return errCancelHeaderProcessing
- case headers := <-d.headerProcCh:
+ case headersWithGovState := <-d.headerProcCh:
// Terminate header processing if we synced up
- if len(headers) == 0 {
+ if len(headersWithGovState) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
@@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
- for len(headers) > 0 {
+ for len(headersWithGovState) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
@@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
- if limit > len(headers) {
- limit = len(headers)
+ if limit > len(headersWithGovState) {
+ limit = len(headersWithGovState)
}
- chunk := headers[:limit]
+ chunk := headersWithGovState[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
+ // TODO(sonic) update the gov state to make TSigVerify correct
// Collect the yet unknown headers to mark them as uncertain
- unknown := make([]*types.Header, 0, len(headers))
+ unknown := make([]*types.Header, 0, len(headersWithGovState))
for _, header := range chunk {
if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
- unknown = append(unknown, header)
+ unknown = append(unknown, header.Header)
}
}
- // If we're importing pure headers, verify based on their recentness
- frequency := fsHeaderCheckFrequency
- if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
- frequency = 1
+
+ for _, header := range chunk {
+ if header.GovState != nil {
+ log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64())
+ d.gov.StoreState(header.GovState)
+ }
}
- if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
+
+ if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
- rollback = append(rollback, chunk[:n]...)
+ for _, h := range chunk[:n] {
+ rollback = append(rollback, h.Header)
+ }
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
@@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
if len(rollback) > fsHeaderSafetyNet {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
+
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
@@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
return errBadPeer
}
}
- headers = headers[limit:]
+ headersWithGovState = headersWithGovState[limit:]
origin += uint64(limit)
}
@@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
- if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ if index, err := d.blockchain.InsertChain2(blocks); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
@@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
-func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
+func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) {
return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
+func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error {
+ return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter)
+}
+
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)