aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/downloader/downloader.go199
-rw-r--r--dex/downloader/fakepeer.go17
-rw-r--r--dex/downloader/governance.go170
-rw-r--r--dex/downloader/metrics.go5
-rw-r--r--dex/downloader/peer.go19
-rw-r--r--dex/downloader/queue.go42
-rw-r--r--dex/downloader/types.go11
-rw-r--r--dex/governance.go4
-rw-r--r--dex/handler.go85
-rw-r--r--dex/peer.go25
-rw-r--r--dex/protocol.go10
11 files changed, 499 insertions, 88 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go
index 0383a3709..69d95d801 100644
--- a/dex/downloader/downloader.go
+++ b/dex/downloader/downloader.go
@@ -1,4 +1,3 @@
-// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -25,15 +24,21 @@ import (
"sync/atomic"
"time"
+ dexCore "github.com/dexon-foundation/dexon-consensus/core"
+
ethereum "github.com/dexon-foundation/dexon"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/rawdb"
+ "github.com/dexon-foundation/dexon/core/state"
"github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/core/vm"
+ "github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/ethdb"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/metrics"
"github.com/dexon-foundation/dexon/params"
+ "github.com/dexon-foundation/dexon/trie"
)
var (
@@ -63,11 +68,10 @@ var (
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
- fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
- fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
- fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
- fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
- fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
)
var (
@@ -103,6 +107,9 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
+ gov *governance
+ verifierCache *dexCore.TSigVerifierCache
+
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -125,12 +132,14 @@ type Downloader struct {
committed int32
// Channels
- headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ govStateCh chan dataPack
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+
+ headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks
// for stateFetcher
stateSyncStart chan *stateSync
@@ -161,14 +170,18 @@ type LightChain interface {
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header
+ GetHeaderByNumber(number uint64) *types.Header
+
// CurrentHeader retrieves the head header from the local chain.
CurrentHeader() *types.Header
+ GetGovStateByNumber(number uint64) (*types.GovState, error)
+
// GetTd returns the total difficulty of a local block.
GetTd(common.Hash, uint64) *big.Int
- // InsertHeaderChain inserts a batch of headers into the local chain.
- InsertHeaderChain([]*types.Header, int) (int, error)
+ // InsertHeaderChain2 inserts a batch of headers into the local chain.
+ InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error)
// Rollback removes a few recently added elements from the local chain.
Rollback([]common.Hash)
@@ -193,8 +206,8 @@ type BlockChain interface {
// FastSyncCommitHead directly commits the head block to a certain entity.
FastSyncCommitHead(common.Hash) error
- // InsertChain inserts a batch of blocks into the local chain.
- InsertChain(types.Blocks) (int, error)
+ // InsertChain2 inserts a batch of blocks into the local chain.
+ InsertChain2(types.Blocks) (int, error)
// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
@@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
+ govStateCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
- headerProcCh: make(chan []*types.Header, 1),
+ headerProcCh: make(chan []*types.HeaderWithGovState, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
stateSyncStart: make(chan *stateSync),
@@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
+
+ if d.mode == FastSync || d.mode == LightSync {
+ // fetch gov state
+ govState, err := d.fetchGovState(p, latest.Hash(), latest.Root)
+ if err != nil {
+ return err
+ }
+
+ originHeader := d.lightchain.GetHeaderByNumber(origin)
+ if originHeader == nil {
+ return fmt.Errorf("origin header not exists, number: %d", origin)
+ }
+
+ // prepare state origin - 2
+ d.gov = newGovernance(govState)
+ for i := uint64(0); i < 3; i++ {
+ if originHeader.Round >= i {
+ h := d.gov.GetRoundHeight(originHeader.Round - i)
+ s, err := d.lightchain.GetGovStateByNumber(h)
+ if err != nil {
+ return err
+ }
+ d.gov.StoreState(s)
+ }
+ }
+
+ d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5)
+ }
+
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
@@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
// Request the advertised remote head block and wait for the response
head, _ := p.peer.Head()
- go p.peer.RequestHeadersByHash(head, 1, 0, false)
+ go p.peer.RequestHeadersByHash(head, 1, 0, false, false)
ttl := d.requestTTL()
timeout := time.After(ttl)
@@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
}
- head := headers[0]
+ head := headers[0].Header
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil
@@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
}
}
+func (d *Downloader) fetchGovState(p *peerConnection,
+ hash common.Hash, root common.Hash) (*types.GovState, error) {
+ go p.peer.RequestGovStateByHash(hash)
+
+ ttl := d.requestTTL()
+ timeout := time.After(ttl)
+ for {
+ select {
+ case <-d.cancelCh:
+ return nil, errCancelBlockFetch
+ case packet := <-d.govStateCh:
+ if packet.PeerId() != p.id {
+ log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId())
+ break
+ }
+
+ // TODO(sonic): refactor this.
+ govState := packet.(*govStatePack).govState
+
+ // reconstruct the gov state
+ db := ethdb.NewMemDatabase()
+ for _, value := range govState.Proof {
+ db.Put(crypto.Keccak256(value), value)
+ }
+
+ // proof and state should split
+ // check the state object of governance contract
+ key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes())
+ _, _, err := trie.VerifyProof(root, key, db)
+ if err != nil {
+ return nil, err
+ }
+
+ triedb := trie.NewDatabase(db)
+ t, err := trie.New(common.Hash{}, triedb)
+ for _, entry := range govState.Storage {
+ t.TryUpdate(entry[0], entry[1])
+ }
+ err = triedb.Commit(t.Hash(), false)
+ if err != nil {
+ return nil, err
+ }
+
+ statedb, err := state.New(root, state.NewDatabase(db))
+ if err != nil {
+ return nil, err
+ }
+
+ storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress)
+ if storageTrie == nil {
+ return nil, fmt.Errorf("storage not match")
+ }
+ return govState, nil
+ case <-timeout:
+ p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
+ return nil, errTimeout
+
+ case <-d.bodyCh:
+ case <-d.receiptCh:
+ }
+ }
+}
+
// findAncestor tries to locate the common ancestor link of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N links should already get us a match.
@@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
if count > limit {
count = limit
}
- go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
+ go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
ttl := d.requestTTL()
timeout := time.After(ttl)
- go p.peer.RequestHeadersByNumber(check, 1, 0, false)
+ go p.peer.RequestHeadersByNumber(check, 1, 0, false, false)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
@@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
if skeleton {
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
- go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true)
} else {
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
- go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
+ go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true)
}
}
// Start pulling the header chain skeleton until all is done
@@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
//
// The method returns the entire filled skeleton and also the number of headers
// already forwarded for processing.
-func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) {
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)
@@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
case <-d.cancelCh:
return errCancelHeaderProcessing
- case headers := <-d.headerProcCh:
+ case headersWithGovState := <-d.headerProcCh:
// Terminate header processing if we synced up
- if len(headers) == 0 {
+ if len(headersWithGovState) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
@@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
- for len(headers) > 0 {
+ for len(headersWithGovState) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
@@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
- if limit > len(headers) {
- limit = len(headers)
+ if limit > len(headersWithGovState) {
+ limit = len(headersWithGovState)
}
- chunk := headers[:limit]
+ chunk := headersWithGovState[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
+ // TODO(sonic) update the gov state to make TSigVerify correct
// Collect the yet unknown headers to mark them as uncertain
- unknown := make([]*types.Header, 0, len(headers))
+ unknown := make([]*types.Header, 0, len(headersWithGovState))
for _, header := range chunk {
if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
- unknown = append(unknown, header)
+ unknown = append(unknown, header.Header)
}
}
- // If we're importing pure headers, verify based on their recentness
- frequency := fsHeaderCheckFrequency
- if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
- frequency = 1
+
+ for _, header := range chunk {
+ if header.GovState != nil {
+ log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64())
+ d.gov.StoreState(header.GovState)
+ }
}
- if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
+
+ if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
- rollback = append(rollback, chunk[:n]...)
+ for _, h := range chunk[:n] {
+ rollback = append(rollback, h.Header)
+ }
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
@@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
if len(rollback) > fsHeaderSafetyNet {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
+
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
@@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
return errBadPeer
}
}
- headers = headers[limit:]
+ headersWithGovState = headersWithGovState[limit:]
origin += uint64(limit)
}
@@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
- if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ if index, err := d.blockchain.InsertChain2(blocks); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
@@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
-func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
+func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) {
return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
+func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error {
+ return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter)
+}
+
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go
index 3e29357ba..f0d596a4b 100644
--- a/dex/downloader/fakepeer.go
+++ b/dex/downloader/fakepeer.go
@@ -88,7 +88,14 @@ func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int,
}
}
}
- p.dl.DeliverHeaders(p.id, headers)
+
+ // TODO(sonic): fix this
+ var headersWithGovState []*types.HeaderWithGovState
+ for _, h := range headers {
+ headersWithGovState = append(headersWithGovState,
+ &types.HeaderWithGovState{Header: h})
+ }
+ p.dl.DeliverHeaders(p.id, headersWithGovState)
return nil
}
@@ -115,7 +122,13 @@ func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, r
}
headers = append(headers, origin)
}
- p.dl.DeliverHeaders(p.id, headers)
+ // TODO(sonic): fix this
+ var headersWithGovState []*types.HeaderWithGovState
+ for _, h := range headers {
+ headersWithGovState = append(headersWithGovState,
+ &types.HeaderWithGovState{Header: h})
+ }
+ p.dl.DeliverHeaders(p.id, headersWithGovState)
return nil
}
diff --git a/dex/downloader/governance.go b/dex/downloader/governance.go
new file mode 100644
index 000000000..40233f1a8
--- /dev/null
+++ b/dex/downloader/governance.go
@@ -0,0 +1,170 @@
+package downloader
+
+import (
+ "math/big"
+ "sync"
+ "time"
+
+ dexCore "github.com/dexon-foundation/dexon-consensus/core"
+ coreTypes "github.com/dexon-foundation/dexon-consensus/core/types"
+ dkgTypes "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core/state"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/core/vm"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/rlp"
+ "github.com/dexon-foundation/dexon/trie"
+)
+
+// This is a goverance for fast sync
+type governance struct {
+ db ethdb.Database
+ headRoot common.Hash
+ headHeight uint64
+ height2Root map[uint64]common.Hash
+ mu sync.Mutex
+}
+
+func newGovernance(headState *types.GovState) *governance {
+ db := ethdb.NewMemDatabase()
+ g := &governance{
+ db: db,
+ headRoot: headState.Root,
+ headHeight: headState.Number.Uint64(),
+ height2Root: make(map[uint64]common.Hash),
+ }
+ g.StoreState(headState)
+
+ statedb, err := state.New(headState.Root, state.NewDatabase(g.db))
+ if statedb == nil || err != nil {
+ log.Error("New governance fail", "statedb == nil", statedb == nil, "err", err)
+ }
+ return g
+}
+
+func (g *governance) getHeadHelper() *vm.GovernanceStateHelper {
+ return g.getHelper(g.headRoot)
+}
+
+func (g *governance) getHelperByRound(round uint64) *vm.GovernanceStateHelper {
+ height := g.GetRoundHeight(round)
+ root, exists := g.height2Root[height]
+ if !exists {
+ log.Debug("Gov get helper by round", "round", round, "exists", exists)
+ return nil
+ }
+ return g.getHelper(root)
+}
+
+func (g *governance) getHelper(root common.Hash) *vm.GovernanceStateHelper {
+ statedb, err := state.New(root, state.NewDatabase(g.db))
+ if statedb == nil || err != nil {
+ return nil
+ }
+ return &vm.GovernanceStateHelper{statedb}
+}
+
+func (g *governance) GetRoundHeight(round uint64) uint64 {
+ var h uint64
+ if helper := g.getHeadHelper(); helper != nil {
+ h = helper.RoundHeight(big.NewInt(int64(round))).Uint64()
+ }
+ return h
+}
+
+func (g *governance) StoreState(s *types.GovState) {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+
+ // store the hight -> root mapping
+ g.height2Root[s.Number.Uint64()] = s.Root
+
+ // store the account
+ for _, node := range s.Proof {
+ g.db.Put(crypto.Keccak256(node), node)
+ }
+
+ // store the storage
+ triedb := trie.NewDatabase(g.db)
+ t, err := trie.New(common.Hash{}, triedb)
+ if err != nil {
+ panic(err)
+ }
+ for _, kv := range s.Storage {
+ t.TryUpdate(kv[0], kv[1])
+ }
+ t.Commit(nil)
+ triedb.Commit(t.Hash(), false)
+
+ if s.Number.Uint64() > g.headHeight {
+ log.Debug("Gov head root changed", "number", s.Number.Uint64())
+ g.headRoot = s.Root
+ g.headHeight = s.Number.Uint64()
+ }
+}
+
+// Return the genesis configuration if round == 0.
+func (g *governance) Configuration(round uint64) *coreTypes.Config {
+ if round < dexCore.ConfigRoundShift {
+ round = 0
+ } else {
+ round -= dexCore.ConfigRoundShift
+ }
+ helper := g.getHelperByRound(round)
+ if helper == nil {
+ log.Warn("Get config helper fail", "round - round shift", round)
+ return nil
+ }
+ c := helper.Configuration()
+ return &coreTypes.Config{
+ NumChains: c.NumChains,
+ LambdaBA: time.Duration(c.LambdaBA) * time.Millisecond,
+ LambdaDKG: time.Duration(c.LambdaDKG) * time.Millisecond,
+ K: int(c.K),
+ PhiRatio: c.PhiRatio,
+ NotarySetSize: c.NotarySetSize,
+ DKGSetSize: c.DKGSetSize,
+ RoundInterval: time.Duration(c.RoundInterval) * time.Millisecond,
+ MinBlockInterval: time.Duration(c.MinBlockInterval) * time.Millisecond,
+ }
+}
+
+// DKGComplaints gets all the DKGComplaints of round.
+func (g *governance) DKGComplaints(round uint64) []*dkgTypes.Complaint {
+ helper := g.getHeadHelper()
+ var dkgComplaints []*dkgTypes.Complaint
+ for _, pk := range helper.DKGComplaints(big.NewInt(int64(round))) {
+ x := new(dkgTypes.Complaint)
+ if err := rlp.DecodeBytes(pk, x); err != nil {
+ panic(err)
+ }
+ dkgComplaints = append(dkgComplaints, x)
+ }
+ return dkgComplaints
+}
+
+// DKGMasterPublicKeys gets all the DKGMasterPublicKey of round.
+func (g *governance) DKGMasterPublicKeys(round uint64) []*dkgTypes.MasterPublicKey {
+ helper := g.getHeadHelper()
+ var dkgMasterPKs []*dkgTypes.MasterPublicKey
+ for _, pk := range helper.DKGMasterPublicKeys(big.NewInt(int64(round))) {
+ x := new(dkgTypes.MasterPublicKey)
+ if err := rlp.DecodeBytes(pk, x); err != nil {
+ panic(err)
+ }
+ dkgMasterPKs = append(dkgMasterPKs, x)
+ }
+ return dkgMasterPKs
+}
+
+// IsDKGFinal checks if DKG is final.
+func (g *governance) IsDKGFinal(round uint64) bool {
+ helper := g.getHeadHelper()
+ threshold := 2*uint64(g.Configuration(round).DKGSetSize)/3 + 1
+ count := helper.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64()
+ return count >= threshold
+}
diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go
index 0d6041712..395950759 100644
--- a/dex/downloader/metrics.go
+++ b/dex/downloader/metrics.go
@@ -28,6 +28,11 @@ var (
headerDropMeter = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil)
headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil)
+ govStateInMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/in", nil)
+ govStateReqTimer = metrics.NewRegisteredTimer("dex/downloader/govStates/req", nil)
+ govStateDropMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/drop", nil)
+ govStateTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/timeout", nil)
+
bodyInMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil)
bodyReqTimer = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil)
bodyDropMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil)
diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go
index 1fd82fbe3..b6f2936b7 100644
--- a/dex/downloader/peer.go
+++ b/dex/downloader/peer.go
@@ -78,8 +78,9 @@ type peerConnection struct {
// LightPeer encapsulates the methods required to synchronise with a remote light peer.
type LightPeer interface {
Head() (common.Hash, *big.Int)
- RequestHeadersByHash(common.Hash, int, int, bool) error
- RequestHeadersByNumber(uint64, int, int, bool) error
+ RequestHeadersByHash(common.Hash, int, int, bool, bool) error
+ RequestHeadersByNumber(uint64, int, int, bool, bool) error
+ RequestGovStateByHash(common.Hash) error
}
// Peer encapsulates the methods required to synchronise with a remote full peer.
@@ -96,11 +97,15 @@ type lightPeerWrapper struct {
}
func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
-func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error {
- return w.peer.RequestHeadersByHash(h, amount, skip, reverse)
+func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse, withGov bool) error {
+ return w.peer.RequestHeadersByHash(h, amount, skip, reverse, withGov)
}
-func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error {
- return w.peer.RequestHeadersByNumber(i, amount, skip, reverse)
+func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse, withGov bool) error {
+ return w.peer.RequestHeadersByNumber(i, amount, skip, reverse, withGov)
+}
+func (w *lightPeerWrapper) RequestGovStateByHash(common.Hash) error {
+ // TODO(sonic): support this
+ panic("RequestGovStateByHash not supported in light client mode sync")
}
func (w *lightPeerWrapper) RequestBodies([]common.Hash) error {
panic("RequestBodies not supported in light client mode sync")
@@ -156,7 +161,7 @@ func (p *peerConnection) FetchHeaders(from uint64, count int) error {
p.headerStarted = time.Now()
// Issue the header retrieval request (absolut upwards without gaps)
- go p.peer.RequestHeadersByNumber(from, count, 0, false)
+ go p.peer.RequestHeadersByNumber(from, count, 0, false, true)
return nil
}
diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go
index 12c75e793..f3a36ec3c 100644
--- a/dex/downloader/queue.go
+++ b/dex/downloader/queue.go
@@ -68,15 +68,15 @@ type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
- headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
- headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
- headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
- headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
- headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
- headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
- headerProced int // [eth/62] Number of headers already processed from the results
- headerOffset uint64 // [eth/62] Number of the first header in the result cache
- headerContCh chan bool // [eth/62] Channel to notify when header download finishes
+ headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ headerTaskPool map[uint64]*types.HeaderWithGovState // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
+ headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
+ headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
+ headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
+ headerResults []*types.HeaderWithGovState // [eth/62] Result cache accumulating the completed headers
+ headerProced int // [eth/62] Number of headers already processed from the results
+ headerOffset uint64 // [eth/62] Number of the first header in the result cache
+ headerContCh chan bool // [eth/62] Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
@@ -267,7 +267,7 @@ func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[comm
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
-func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
+func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.HeaderWithGovState) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -276,10 +276,10 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
panic("skeleton assembly already in progress")
}
// Schedule all the header retrieval tasks for the skeleton assembly
- q.headerTaskPool = make(map[uint64]*types.Header)
+ q.headerTaskPool = make(map[uint64]*types.HeaderWithGovState)
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
- q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
+ q.headerResults = make([]*types.HeaderWithGovState, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
@@ -294,7 +294,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
-func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
+func (q *queue) RetrieveHeaders() ([]*types.HeaderWithGovState, int) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -306,7 +306,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
-func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
+func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
@@ -333,14 +333,14 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
continue
}
// Queue the header for content retrieval
- q.blockTaskPool[hash] = header
- q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ q.blockTaskPool[hash] = header.Header
+ q.blockTaskQueue.Push(header.Header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
- q.receiptTaskPool[hash] = header
- q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+ q.receiptTaskPool[hash] = header.Header
+ q.receiptTaskQueue.Push(header.Header, -int64(header.Number.Uint64()))
}
- inserts = append(inserts, header)
+ inserts = append(inserts, header.Header)
q.headerHead = hash
from++
}
@@ -679,7 +679,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However it will
// not block to prevent stalling other pending deliveries.
-func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
+func (q *queue) DeliverHeaders(id string, headers []*types.HeaderWithGovState, headerProcCh chan []*types.HeaderWithGovState) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -743,7 +743,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking)
- process := make([]*types.Header, ready)
+ process := make([]*types.HeaderWithGovState, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
select {
diff --git a/dex/downloader/types.go b/dex/downloader/types.go
index d320b7590..a7143ccd9 100644
--- a/dex/downloader/types.go
+++ b/dex/downloader/types.go
@@ -35,13 +35,22 @@ type dataPack interface {
// headerPack is a batch of block headers returned by a peer.
type headerPack struct {
peerID string
- headers []*types.Header
+ headers []*types.HeaderWithGovState
}
func (p *headerPack) PeerId() string { return p.peerID }
func (p *headerPack) Items() int { return len(p.headers) }
func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
+type govStatePack struct {
+ peerID string
+ govState *types.GovState
+}
+
+func (p *govStatePack) PeerId() string { return p.peerID }
+func (p *govStatePack) Items() int { return 1 }
+func (p *govStatePack) Stats() string { return "1" }
+
// bodyPack is a batch of block bodies returned by a peer.
type bodyPack struct {
peerID string
diff --git a/dex/governance.go b/dex/governance.go
index db33f7820..01f39b68c 100644
--- a/dex/governance.go
+++ b/dex/governance.go
@@ -46,7 +46,7 @@ func NewDexconGovernance(backend *DexAPIBackend, chainConfig *params.ChainConfig
return g
}
-func (d *DexconGovernance) getRoundHeight(ctx context.Context, round uint64) (uint64, error) {
+func (d *DexconGovernance) GetRoundHeight(ctx context.Context, round uint64) (uint64, error) {
state, _, err := d.b.StateAndHeaderByNumber(ctx, rpc.LatestBlockNumber)
if state == nil || err != nil {
return 0, err
@@ -72,7 +72,7 @@ func (d *DexconGovernance) getGovStateAtRound(round uint64) *vm.GovernanceStateH
round -= dexCore.ConfigRoundShift
}
ctx := context.Background()
- blockHeight, err := d.getRoundHeight(ctx, round)
+ blockHeight, err := d.GetRoundHeight(ctx, round)
if err != nil {
return nil
}
diff --git a/dex/handler.go b/dex/handler.go
index d66403fe6..0c7a3e919 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -34,6 +34,7 @@
package dex
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -220,7 +221,7 @@ func NewProtocolManager(
return 0, nil
}
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
- return manager.blockchain.InsertChain(blocks)
+ return manager.blockchain.InsertChain2(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
@@ -414,10 +415,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
first := true
maxNonCanonical := uint64(100)
+ round := map[uint64]uint64{}
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
- headers []*types.Header
+ headers []*types.HeaderWithGovState
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
@@ -439,7 +441,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if origin == nil {
break
}
- headers = append(headers, origin)
+ headers = append(headers, &types.HeaderWithGovState{Header: origin})
+ if round[origin.Round] == 0 {
+ round[origin.Round] = origin.Number.Uint64()
+ }
bytes += estHeaderRlpSize
// Advance to the next header of the query
@@ -489,20 +494,71 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
query.Origin.Number += query.Skip + 1
}
}
+
+ ctx := context.Background()
+ if query.WithGov && len(headers) > 0 {
+ last := headers[len(headers)-1]
+ currentBlock := pm.blockchain.CurrentBlock()
+
+ // Do not reply if we don't have current gov state
+ if currentBlock.NumberU64() < last.Number.Uint64() {
+ log.Debug("Current block < last request",
+ "current", currentBlock.NumberU64(), "last", last.Number.Uint64())
+ return p.SendBlockHeaders([]*types.HeaderWithGovState{})
+ }
+
+ snapshotHeight := map[uint64]struct{}{}
+ for r, height := range round {
+ log.Trace("#Include round", "round", r)
+ if r == 0 {
+ continue
+ }
+ h, err := pm.gov.GetRoundHeight(ctx, r)
+ if err != nil {
+ log.Warn("Get round height fail", "err", err)
+ return p.SendBlockHeaders([]*types.HeaderWithGovState{})
+ }
+ log.Trace("#Snapshot height", "height", h)
+ if h == 0 {
+ h = height
+ }
+ snapshotHeight[h] = struct{}{}
+ }
+
+ for _, header := range headers {
+ if _, exist := snapshotHeight[header.Number.Uint64()]; exist {
+ s, err := pm.blockchain.GetGovStateByHash(header.Hash())
+ if err != nil {
+ log.Warn("Get gov state by hash fail", "number", header.Number.Uint64(), "err", err)
+ return p.SendBlockHeaders([]*types.HeaderWithGovState{})
+ }
+ header.GovState = s
+ }
+ log.Trace("Send header", "round", header.Round, "number", header.Number.Uint64(), "gov state == nil", header.GovState == nil)
+ }
+ }
return p.SendBlockHeaders(headers)
case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
- var headers []*types.Header
+ var headers []*types.HeaderWithGovState
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
- headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
+ h := []*types.Header{headers[0].Header}
+ h = pm.fetcher.FilterHeaders(p.id, h, time.Now())
+ if len(h) == 0 {
+ headers = nil
+ }
+ }
+ for _, header := range headers {
+ log.Trace("Received header", "round", header.Round, "number", header.Number.Uint64(), "gov state == nil", header.GovState == nil)
}
if len(headers) > 0 || !filter {
+ // if the header that has gov state is filter out, the header's gov state is useless
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
@@ -834,6 +890,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return err
}
}
+ case msg.Code == GetGovStateMsg:
+ var hash common.Hash
+ if err := msg.Decode(&hash); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ govState, err := pm.blockchain.GetGovStateByHash(hash)
+ if err != nil {
+ // TODO(sonic): handle this error
+ panic(err)
+ }
+ return p.SendGovState(govState)
+ case msg.Code == GovStateMsg:
+ var govState types.GovState
+ if err := msg.Decode(&govState); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ if err := pm.downloader.DeliverGovState(p.id, &govState); err != nil {
+ log.Debug("Failed to deliver govstates", "err", err)
+ }
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
diff --git a/dex/peer.go b/dex/peer.go
index 263dc5647..67aa50694 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -536,7 +536,7 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
}
// SendBlockHeaders sends a batch of block headers to the remote peer.
-func (p *peer) SendBlockHeaders(headers []*types.Header) error {
+func (p *peer) SendBlockHeaders(headers []*types.HeaderWithGovState) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
}
@@ -563,25 +563,34 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts)
}
+func (p *peer) SendGovState(govState *types.GovState) error {
+ return p2p.Send(p.rw, GovStateMsg, govState)
+}
+
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash)
- return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false, WithGov: false})
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
-func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
- p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
- return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse, withGov bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse, "withgov", withGov)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov})
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
-func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
- p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
- return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse, withGov bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse, "withgov", withGov)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov})
+}
+
+func (p *peer) RequestGovStateByHash(hash common.Hash) error {
+ p.Log().Debug("Fetching one gov state", "hash", hash)
+ return p2p.Send(p.rw, GetGovStateMsg, hash)
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
diff --git a/dex/protocol.go b/dex/protocol.go
index 04928d2b2..0c271248e 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -34,6 +34,7 @@
package dex
import (
+ "context"
"crypto/ecdsa"
"fmt"
"io"
@@ -60,7 +61,7 @@ var ProtocolName = "dex"
var ProtocolVersions = []uint{dex64}
// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{40}
+var ProtocolLengths = []uint64{42}
const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
@@ -93,6 +94,9 @@ const (
DKGPartialSignatureMsg = 0x25
PullBlocksMsg = 0x26
PullVotesMsg = 0x27
+
+ GetGovStateMsg = 0x28
+ GovStateMsg = 0x29
)
type errCode int
@@ -140,6 +144,8 @@ type txPool interface {
}
type governance interface {
+ GetRoundHeight(context.Context, uint64) (uint64, error)
+
GetNumChains(uint64) uint32
LenCRS() uint64
@@ -184,6 +190,8 @@ type getBlockHeadersData struct {
Amount uint64 // Maximum number of headers to retrieve
Skip uint64 // Blocks to skip between consecutive headers
Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
+
+ WithGov bool
}
// hashOrNumber is a combined field for specifying an origin block.