diff options
author | Sonic <sonic@dexon.org> | 2018-11-20 14:13:53 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:27:18 +0800 |
commit | c5206f748e1a4a57707cffc8f867386f0d84f7be (patch) | |
tree | 7e840c9a4b686718f4bcb31b9f64d6330bbbd470 /dex/downloader | |
parent | 91d3c571ef5e5628917c5a27d16bc0d8d7083224 (diff) | |
download | go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.tar go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.tar.gz go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.tar.bz2 go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.tar.lz go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.tar.xz go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.tar.zst go-tangerine-c5206f748e1a4a57707cffc8f867386f0d84f7be.zip |
dex: implement downloader for dex
We need governance state to verify block's signature (randomness),
but in ethereum fast sync mode, eth downloader only downloads the whole
state of pivot block, so we don't have governance state to verify the
downloaded block that is before pivot block if we don't processing
transaction.
To avoid running transactions, dex downloader also downloads the
governance state (merkle proof and storage) at snapshot height of each round,
so that we can verify blocks in fast sync mode.
Diffstat (limited to 'dex/downloader')
-rw-r--r-- | dex/downloader/downloader.go | 199 | ||||
-rw-r--r-- | dex/downloader/fakepeer.go | 17 | ||||
-rw-r--r-- | dex/downloader/governance.go | 170 | ||||
-rw-r--r-- | dex/downloader/metrics.go | 5 | ||||
-rw-r--r-- | dex/downloader/peer.go | 19 | ||||
-rw-r--r-- | dex/downloader/queue.go | 42 | ||||
-rw-r--r-- | dex/downloader/types.go | 11 |
7 files changed, 391 insertions, 72 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go index 0383a3709..69d95d801 100644 --- a/dex/downloader/downloader.go +++ b/dex/downloader/downloader.go @@ -1,4 +1,3 @@ -// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -25,15 +24,21 @@ import ( "sync/atomic" "time" + dexCore "github.com/dexon-foundation/dexon-consensus/core" + ethereum "github.com/dexon-foundation/dexon" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/metrics" "github.com/dexon-foundation/dexon/params" + "github.com/dexon-foundation/dexon/trie" ) var ( @@ -63,11 +68,10 @@ var ( reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync ) var ( @@ -103,6 +107,9 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database + gov *governance + verifierCache *dexCore.TSigVerifierCache + rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -125,12 +132,14 @@ type Downloader struct { committed int32 // Channels - headerCh chan dataPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + govStateCh chan dataPack + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + + headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks // for stateFetcher stateSyncStart chan *stateSync @@ -161,14 +170,18 @@ type LightChain interface { // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header + GetHeaderByNumber(number uint64) *types.Header + // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header + GetGovStateByNumber(number uint64) (*types.GovState, error) + // GetTd returns the total difficulty of a local block. GetTd(common.Hash, uint64) *big.Int - // InsertHeaderChain inserts a batch of headers into the local chain. - InsertHeaderChain([]*types.Header, int) (int, error) + // InsertHeaderChain2 inserts a batch of headers into the local chain. + InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error) // Rollback removes a few recently added elements from the local chain. Rollback([]common.Hash) @@ -193,8 +206,8 @@ type BlockChain interface { // FastSyncCommitHead directly commits the head block to a certain entity. FastSyncCommitHead(common.Hash) error - // InsertChain inserts a batch of blocks into the local chain. - InsertChain(types.Blocks) (int, error) + // InsertChain2 inserts a batch of blocks into the local chain. + InsertChain2(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) @@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC lightchain: lightchain, dropPeer: dropPeer, headerCh: make(chan dataPack, 1), + govStateCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), + headerProcCh: make(chan []*types.HeaderWithGovState, 1), quitCh: make(chan struct{}), stateCh: make(chan dataPack), stateSyncStart: make(chan *stateSync), @@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.mode == FastSync && pivot != 0 { d.committed = 0 } + + if d.mode == FastSync || d.mode == LightSync { + // fetch gov state + govState, err := d.fetchGovState(p, latest.Hash(), latest.Root) + if err != nil { + return err + } + + originHeader := d.lightchain.GetHeaderByNumber(origin) + if originHeader == nil { + return fmt.Errorf("origin header not exists, number: %d", origin) + } + + // prepare state origin - 2 + d.gov = newGovernance(govState) + for i := uint64(0); i < 3; i++ { + if originHeader.Round >= i { + h := d.gov.GetRoundHeight(originHeader.Round - i) + s, err := d.lightchain.GetGovStateByNumber(h) + if err != nil { + return err + } + d.gov.StoreState(s) + } + } + + d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5) + } + // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { @@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { // Request the advertised remote head block and wait for the response head, _ := p.peer.Head() - go p.peer.RequestHeadersByHash(head, 1, 0, false) + go p.peer.RequestHeadersByHash(head, 1, 0, false, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } - head := headers[0] + head := headers[0].Header p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil @@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { } } +func (d *Downloader) fetchGovState(p *peerConnection, + hash common.Hash, root common.Hash) (*types.GovState, error) { + go p.peer.RequestGovStateByHash(hash) + + ttl := d.requestTTL() + timeout := time.After(ttl) + for { + select { + case <-d.cancelCh: + return nil, errCancelBlockFetch + case packet := <-d.govStateCh: + if packet.PeerId() != p.id { + log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId()) + break + } + + // TODO(sonic): refactor this. + govState := packet.(*govStatePack).govState + + // reconstruct the gov state + db := ethdb.NewMemDatabase() + for _, value := range govState.Proof { + db.Put(crypto.Keccak256(value), value) + } + + // proof and state should split + // check the state object of governance contract + key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes()) + _, _, err := trie.VerifyProof(root, key, db) + if err != nil { + return nil, err + } + + triedb := trie.NewDatabase(db) + t, err := trie.New(common.Hash{}, triedb) + for _, entry := range govState.Storage { + t.TryUpdate(entry[0], entry[1]) + } + err = triedb.Commit(t.Hash(), false) + if err != nil { + return nil, err + } + + statedb, err := state.New(root, state.NewDatabase(db)) + if err != nil { + return nil, err + } + + storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress) + if storageTrie == nil { + return nil, fmt.Errorf("storage not match") + } + return govState, nil + case <-timeout: + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) + return nil, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + } + } +} + // findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N links should already get us a match. @@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err if count > limit { count = limit } - go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err ttl := d.requestTTL() timeout := time.After(ttl) - go p.peer.RequestHeadersByNumber(check, 1, 0, false) + go p.peer.RequestHeadersByNumber(check, 1, 0, false, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true) } } // Start pulling the header chain skeleton until all is done @@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) // // The method returns the entire filled skeleton and also the number of headers // already forwarded for processing. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) { log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) @@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er case <-d.cancelCh: return errCancelHeaderProcessing - case headers := <-d.headerProcCh: + case headersWithGovState := <-d.headerProcCh: // Terminate header processing if we synced up - if len(headers) == 0 { + if len(headersWithGovState) == 0 { // Notify everyone that headers are fully processed for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { @@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // Otherwise split the chunk of headers into batches and process them gotHeaders = true - for len(headers) > 0 { + for len(headersWithGovState) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: @@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } // Select the next chunk of headers to import limit := maxHeadersProcess - if limit > len(headers) { - limit = len(headers) + if limit > len(headersWithGovState) { + limit = len(headersWithGovState) } - chunk := headers[:limit] + chunk := headersWithGovState[:limit] // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { + // TODO(sonic) update the gov state to make TSigVerify correct // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) + unknown := make([]*types.Header, 0, len(headersWithGovState)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { - unknown = append(unknown, header) + unknown = append(unknown, header.Header) } } - // If we're importing pure headers, verify based on their recentness - frequency := fsHeaderCheckFrequency - if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { - frequency = 1 + + for _, header := range chunk { + if header.GovState != nil { + log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64()) + d.gov.StoreState(header.GovState) + } } - if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { + + if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { - rollback = append(rollback, chunk[:n]...) + for _, h := range chunk[:n] { + rollback = append(rollback, h.Header) + } } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) return errInvalidChain @@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er if len(rollback) > fsHeaderSafetyNet { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { @@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er return errBadPeer } } - headers = headers[limit:] + headersWithGovState = headersWithGovState[limit:] origin += uint64(limit) } @@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } - if index, err := d.blockchain.InsertChain(blocks); err != nil { + if index, err := d.blockchain.InsertChain2(blocks); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { // DeliverHeaders injects a new batch of block headers received from a remote // node into the download schedule. -func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { +func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) { return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } +func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error { + return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter) +} + // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go index 3e29357ba..f0d596a4b 100644 --- a/dex/downloader/fakepeer.go +++ b/dex/downloader/fakepeer.go @@ -88,7 +88,14 @@ func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, } } } - p.dl.DeliverHeaders(p.id, headers) + + // TODO(sonic): fix this + var headersWithGovState []*types.HeaderWithGovState + for _, h := range headers { + headersWithGovState = append(headersWithGovState, + &types.HeaderWithGovState{Header: h}) + } + p.dl.DeliverHeaders(p.id, headersWithGovState) return nil } @@ -115,7 +122,13 @@ func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, r } headers = append(headers, origin) } - p.dl.DeliverHeaders(p.id, headers) + // TODO(sonic): fix this + var headersWithGovState []*types.HeaderWithGovState + for _, h := range headers { + headersWithGovState = append(headersWithGovState, + &types.HeaderWithGovState{Header: h}) + } + p.dl.DeliverHeaders(p.id, headersWithGovState) return nil } diff --git a/dex/downloader/governance.go b/dex/downloader/governance.go new file mode 100644 index 000000000..40233f1a8 --- /dev/null +++ b/dex/downloader/governance.go @@ -0,0 +1,170 @@ +package downloader + +import ( + "math/big" + "sync" + "time" + + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + dkgTypes "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/rlp" + "github.com/dexon-foundation/dexon/trie" +) + +// This is a goverance for fast sync +type governance struct { + db ethdb.Database + headRoot common.Hash + headHeight uint64 + height2Root map[uint64]common.Hash + mu sync.Mutex +} + +func newGovernance(headState *types.GovState) *governance { + db := ethdb.NewMemDatabase() + g := &governance{ + db: db, + headRoot: headState.Root, + headHeight: headState.Number.Uint64(), + height2Root: make(map[uint64]common.Hash), + } + g.StoreState(headState) + + statedb, err := state.New(headState.Root, state.NewDatabase(g.db)) + if statedb == nil || err != nil { + log.Error("New governance fail", "statedb == nil", statedb == nil, "err", err) + } + return g +} + +func (g *governance) getHeadHelper() *vm.GovernanceStateHelper { + return g.getHelper(g.headRoot) +} + +func (g *governance) getHelperByRound(round uint64) *vm.GovernanceStateHelper { + height := g.GetRoundHeight(round) + root, exists := g.height2Root[height] + if !exists { + log.Debug("Gov get helper by round", "round", round, "exists", exists) + return nil + } + return g.getHelper(root) +} + +func (g *governance) getHelper(root common.Hash) *vm.GovernanceStateHelper { + statedb, err := state.New(root, state.NewDatabase(g.db)) + if statedb == nil || err != nil { + return nil + } + return &vm.GovernanceStateHelper{statedb} +} + +func (g *governance) GetRoundHeight(round uint64) uint64 { + var h uint64 + if helper := g.getHeadHelper(); helper != nil { + h = helper.RoundHeight(big.NewInt(int64(round))).Uint64() + } + return h +} + +func (g *governance) StoreState(s *types.GovState) { + g.mu.Lock() + defer g.mu.Unlock() + + // store the hight -> root mapping + g.height2Root[s.Number.Uint64()] = s.Root + + // store the account + for _, node := range s.Proof { + g.db.Put(crypto.Keccak256(node), node) + } + + // store the storage + triedb := trie.NewDatabase(g.db) + t, err := trie.New(common.Hash{}, triedb) + if err != nil { + panic(err) + } + for _, kv := range s.Storage { + t.TryUpdate(kv[0], kv[1]) + } + t.Commit(nil) + triedb.Commit(t.Hash(), false) + + if s.Number.Uint64() > g.headHeight { + log.Debug("Gov head root changed", "number", s.Number.Uint64()) + g.headRoot = s.Root + g.headHeight = s.Number.Uint64() + } +} + +// Return the genesis configuration if round == 0. +func (g *governance) Configuration(round uint64) *coreTypes.Config { + if round < dexCore.ConfigRoundShift { + round = 0 + } else { + round -= dexCore.ConfigRoundShift + } + helper := g.getHelperByRound(round) + if helper == nil { + log.Warn("Get config helper fail", "round - round shift", round) + return nil + } + c := helper.Configuration() + return &coreTypes.Config{ + NumChains: c.NumChains, + LambdaBA: time.Duration(c.LambdaBA) * time.Millisecond, + LambdaDKG: time.Duration(c.LambdaDKG) * time.Millisecond, + K: int(c.K), + PhiRatio: c.PhiRatio, + NotarySetSize: c.NotarySetSize, + DKGSetSize: c.DKGSetSize, + RoundInterval: time.Duration(c.RoundInterval) * time.Millisecond, + MinBlockInterval: time.Duration(c.MinBlockInterval) * time.Millisecond, + } +} + +// DKGComplaints gets all the DKGComplaints of round. +func (g *governance) DKGComplaints(round uint64) []*dkgTypes.Complaint { + helper := g.getHeadHelper() + var dkgComplaints []*dkgTypes.Complaint + for _, pk := range helper.DKGComplaints(big.NewInt(int64(round))) { + x := new(dkgTypes.Complaint) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgComplaints = append(dkgComplaints, x) + } + return dkgComplaints +} + +// DKGMasterPublicKeys gets all the DKGMasterPublicKey of round. +func (g *governance) DKGMasterPublicKeys(round uint64) []*dkgTypes.MasterPublicKey { + helper := g.getHeadHelper() + var dkgMasterPKs []*dkgTypes.MasterPublicKey + for _, pk := range helper.DKGMasterPublicKeys(big.NewInt(int64(round))) { + x := new(dkgTypes.MasterPublicKey) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgMasterPKs = append(dkgMasterPKs, x) + } + return dkgMasterPKs +} + +// IsDKGFinal checks if DKG is final. +func (g *governance) IsDKGFinal(round uint64) bool { + helper := g.getHeadHelper() + threshold := 2*uint64(g.Configuration(round).DKGSetSize)/3 + 1 + count := helper.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64() + return count >= threshold +} diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go index 0d6041712..395950759 100644 --- a/dex/downloader/metrics.go +++ b/dex/downloader/metrics.go @@ -28,6 +28,11 @@ var ( headerDropMeter = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil) headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil) + govStateInMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/in", nil) + govStateReqTimer = metrics.NewRegisteredTimer("dex/downloader/govStates/req", nil) + govStateDropMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/drop", nil) + govStateTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/timeout", nil) + bodyInMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil) bodyReqTimer = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil) bodyDropMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil) diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go index 1fd82fbe3..b6f2936b7 100644 --- a/dex/downloader/peer.go +++ b/dex/downloader/peer.go @@ -78,8 +78,9 @@ type peerConnection struct { // LightPeer encapsulates the methods required to synchronise with a remote light peer. type LightPeer interface { Head() (common.Hash, *big.Int) - RequestHeadersByHash(common.Hash, int, int, bool) error - RequestHeadersByNumber(uint64, int, int, bool) error + RequestHeadersByHash(common.Hash, int, int, bool, bool) error + RequestHeadersByNumber(uint64, int, int, bool, bool) error + RequestGovStateByHash(common.Hash) error } // Peer encapsulates the methods required to synchronise with a remote full peer. @@ -96,11 +97,15 @@ type lightPeerWrapper struct { } func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } -func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error { - return w.peer.RequestHeadersByHash(h, amount, skip, reverse) +func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse, withGov bool) error { + return w.peer.RequestHeadersByHash(h, amount, skip, reverse, withGov) } -func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error { - return w.peer.RequestHeadersByNumber(i, amount, skip, reverse) +func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse, withGov bool) error { + return w.peer.RequestHeadersByNumber(i, amount, skip, reverse, withGov) +} +func (w *lightPeerWrapper) RequestGovStateByHash(common.Hash) error { + // TODO(sonic): support this + panic("RequestGovStateByHash not supported in light client mode sync") } func (w *lightPeerWrapper) RequestBodies([]common.Hash) error { panic("RequestBodies not supported in light client mode sync") @@ -156,7 +161,7 @@ func (p *peerConnection) FetchHeaders(from uint64, count int) error { p.headerStarted = time.Now() // Issue the header retrieval request (absolut upwards without gaps) - go p.peer.RequestHeadersByNumber(from, count, 0, false) + go p.peer.RequestHeadersByNumber(from, count, 0, false, true) return nil } diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go index 12c75e793..f3a36ec3c 100644 --- a/dex/downloader/queue.go +++ b/dex/downloader/queue.go @@ -68,15 +68,15 @@ type queue struct { mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching // Headers are "special", they download in batches, supported by a skeleton chain - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order - headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for - headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable - headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations - headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers - headerProced int // [eth/62] Number of headers already processed from the results - headerOffset uint64 // [eth/62] Number of the first header in the result cache - headerContCh chan bool // [eth/62] Channel to notify when header download finishes + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + headerTaskPool map[uint64]*types.HeaderWithGovState // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers + headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable + headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations + headerResults []*types.HeaderWithGovState // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results + headerOffset uint64 // [eth/62] Number of the first header in the result cache + headerContCh chan bool // [eth/62] Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers @@ -267,7 +267,7 @@ func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[comm // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill // up an already retrieved header skeleton. -func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { +func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.HeaderWithGovState) { q.lock.Lock() defer q.lock.Unlock() @@ -276,10 +276,10 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { panic("skeleton assembly already in progress") } // Schedule all the header retrieval tasks for the skeleton assembly - q.headerTaskPool = make(map[uint64]*types.Header) + q.headerTaskPool = make(map[uint64]*types.HeaderWithGovState) q.headerTaskQueue = prque.New(nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains - q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerResults = make([]*types.HeaderWithGovState, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 q.headerOffset = from q.headerContCh = make(chan bool, 1) @@ -294,7 +294,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { // RetrieveHeaders retrieves the header chain assemble based on the scheduled // skeleton. -func (q *queue) RetrieveHeaders() ([]*types.Header, int) { +func (q *queue) RetrieveHeaders() ([]*types.HeaderWithGovState, int) { q.lock.Lock() defer q.lock.Unlock() @@ -306,7 +306,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, int) { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -333,14 +333,14 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { continue } // Queue the header for content retrieval - q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.blockTaskPool[hash] = header.Header + q.blockTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) if q.mode == FastSync { - q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.receiptTaskPool[hash] = header.Header + q.receiptTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) } - inserts = append(inserts, header) + inserts = append(inserts, header.Header) q.headerHead = hash from++ } @@ -679,7 +679,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // If the headers are accepted, the method makes an attempt to deliver the set // of ready headers to the processor to keep the pipeline full. However it will // not block to prevent stalling other pending deliveries. -func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { +func (q *queue) DeliverHeaders(id string, headers []*types.HeaderWithGovState, headerProcCh chan []*types.HeaderWithGovState) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -743,7 +743,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } if ready > 0 { // Headers are ready for delivery, gather them and push forward (non blocking) - process := make([]*types.Header, ready) + process := make([]*types.HeaderWithGovState, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { diff --git a/dex/downloader/types.go b/dex/downloader/types.go index d320b7590..a7143ccd9 100644 --- a/dex/downloader/types.go +++ b/dex/downloader/types.go @@ -35,13 +35,22 @@ type dataPack interface { // headerPack is a batch of block headers returned by a peer. type headerPack struct { peerID string - headers []*types.Header + headers []*types.HeaderWithGovState } func (p *headerPack) PeerId() string { return p.peerID } func (p *headerPack) Items() int { return len(p.headers) } func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) } +type govStatePack struct { + peerID string + govState *types.GovState +} + +func (p *govStatePack) PeerId() string { return p.peerID } +func (p *govStatePack) Items() int { return 1 } +func (p *govStatePack) Stats() string { return "1" } + // bodyPack is a batch of block bodies returned by a peer. type bodyPack struct { peerID string |