diff options
Diffstat (limited to 'dex/downloader')
-rw-r--r-- | dex/downloader/downloader.go | 199 | ||||
-rw-r--r-- | dex/downloader/fakepeer.go | 17 | ||||
-rw-r--r-- | dex/downloader/governance.go | 170 | ||||
-rw-r--r-- | dex/downloader/metrics.go | 5 | ||||
-rw-r--r-- | dex/downloader/peer.go | 19 | ||||
-rw-r--r-- | dex/downloader/queue.go | 42 | ||||
-rw-r--r-- | dex/downloader/types.go | 11 |
7 files changed, 391 insertions, 72 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go index 0383a3709..69d95d801 100644 --- a/dex/downloader/downloader.go +++ b/dex/downloader/downloader.go @@ -1,4 +1,3 @@ -// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -25,15 +24,21 @@ import ( "sync/atomic" "time" + dexCore "github.com/dexon-foundation/dexon-consensus/core" + ethereum "github.com/dexon-foundation/dexon" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/metrics" "github.com/dexon-foundation/dexon/params" + "github.com/dexon-foundation/dexon/trie" ) var ( @@ -63,11 +68,10 @@ var ( reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync ) var ( @@ -103,6 +107,9 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database + gov *governance + verifierCache *dexCore.TSigVerifierCache + rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -125,12 +132,14 @@ type Downloader struct { committed int32 // Channels - headerCh chan dataPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + govStateCh chan dataPack + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + + headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks // for stateFetcher stateSyncStart chan *stateSync @@ -161,14 +170,18 @@ type LightChain interface { // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header + GetHeaderByNumber(number uint64) *types.Header + // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header + GetGovStateByNumber(number uint64) (*types.GovState, error) + // GetTd returns the total difficulty of a local block. GetTd(common.Hash, uint64) *big.Int - // InsertHeaderChain inserts a batch of headers into the local chain. - InsertHeaderChain([]*types.Header, int) (int, error) + // InsertHeaderChain2 inserts a batch of headers into the local chain. + InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error) // Rollback removes a few recently added elements from the local chain. Rollback([]common.Hash) @@ -193,8 +206,8 @@ type BlockChain interface { // FastSyncCommitHead directly commits the head block to a certain entity. FastSyncCommitHead(common.Hash) error - // InsertChain inserts a batch of blocks into the local chain. - InsertChain(types.Blocks) (int, error) + // InsertChain2 inserts a batch of blocks into the local chain. + InsertChain2(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) @@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC lightchain: lightchain, dropPeer: dropPeer, headerCh: make(chan dataPack, 1), + govStateCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), + headerProcCh: make(chan []*types.HeaderWithGovState, 1), quitCh: make(chan struct{}), stateCh: make(chan dataPack), stateSyncStart: make(chan *stateSync), @@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.mode == FastSync && pivot != 0 { d.committed = 0 } + + if d.mode == FastSync || d.mode == LightSync { + // fetch gov state + govState, err := d.fetchGovState(p, latest.Hash(), latest.Root) + if err != nil { + return err + } + + originHeader := d.lightchain.GetHeaderByNumber(origin) + if originHeader == nil { + return fmt.Errorf("origin header not exists, number: %d", origin) + } + + // prepare state origin - 2 + d.gov = newGovernance(govState) + for i := uint64(0); i < 3; i++ { + if originHeader.Round >= i { + h := d.gov.GetRoundHeight(originHeader.Round - i) + s, err := d.lightchain.GetGovStateByNumber(h) + if err != nil { + return err + } + d.gov.StoreState(s) + } + } + + d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5) + } + // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { @@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { // Request the advertised remote head block and wait for the response head, _ := p.peer.Head() - go p.peer.RequestHeadersByHash(head, 1, 0, false) + go p.peer.RequestHeadersByHash(head, 1, 0, false, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } - head := headers[0] + head := headers[0].Header p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil @@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { } } +func (d *Downloader) fetchGovState(p *peerConnection, + hash common.Hash, root common.Hash) (*types.GovState, error) { + go p.peer.RequestGovStateByHash(hash) + + ttl := d.requestTTL() + timeout := time.After(ttl) + for { + select { + case <-d.cancelCh: + return nil, errCancelBlockFetch + case packet := <-d.govStateCh: + if packet.PeerId() != p.id { + log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId()) + break + } + + // TODO(sonic): refactor this. + govState := packet.(*govStatePack).govState + + // reconstruct the gov state + db := ethdb.NewMemDatabase() + for _, value := range govState.Proof { + db.Put(crypto.Keccak256(value), value) + } + + // proof and state should split + // check the state object of governance contract + key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes()) + _, _, err := trie.VerifyProof(root, key, db) + if err != nil { + return nil, err + } + + triedb := trie.NewDatabase(db) + t, err := trie.New(common.Hash{}, triedb) + for _, entry := range govState.Storage { + t.TryUpdate(entry[0], entry[1]) + } + err = triedb.Commit(t.Hash(), false) + if err != nil { + return nil, err + } + + statedb, err := state.New(root, state.NewDatabase(db)) + if err != nil { + return nil, err + } + + storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress) + if storageTrie == nil { + return nil, fmt.Errorf("storage not match") + } + return govState, nil + case <-timeout: + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) + return nil, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + } + } +} + // findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N links should already get us a match. @@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err if count > limit { count = limit } - go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err ttl := d.requestTTL() timeout := time.After(ttl) - go p.peer.RequestHeadersByNumber(check, 1, 0, false) + go p.peer.RequestHeadersByNumber(check, 1, 0, false, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true) } } // Start pulling the header chain skeleton until all is done @@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) // // The method returns the entire filled skeleton and also the number of headers // already forwarded for processing. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) { log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) @@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er case <-d.cancelCh: return errCancelHeaderProcessing - case headers := <-d.headerProcCh: + case headersWithGovState := <-d.headerProcCh: // Terminate header processing if we synced up - if len(headers) == 0 { + if len(headersWithGovState) == 0 { // Notify everyone that headers are fully processed for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { @@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // Otherwise split the chunk of headers into batches and process them gotHeaders = true - for len(headers) > 0 { + for len(headersWithGovState) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: @@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } // Select the next chunk of headers to import limit := maxHeadersProcess - if limit > len(headers) { - limit = len(headers) + if limit > len(headersWithGovState) { + limit = len(headersWithGovState) } - chunk := headers[:limit] + chunk := headersWithGovState[:limit] // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { + // TODO(sonic) update the gov state to make TSigVerify correct // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) + unknown := make([]*types.Header, 0, len(headersWithGovState)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { - unknown = append(unknown, header) + unknown = append(unknown, header.Header) } } - // If we're importing pure headers, verify based on their recentness - frequency := fsHeaderCheckFrequency - if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { - frequency = 1 + + for _, header := range chunk { + if header.GovState != nil { + log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64()) + d.gov.StoreState(header.GovState) + } } - if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { + + if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { - rollback = append(rollback, chunk[:n]...) + for _, h := range chunk[:n] { + rollback = append(rollback, h.Header) + } } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) return errInvalidChain @@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er if len(rollback) > fsHeaderSafetyNet { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { @@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er return errBadPeer } } - headers = headers[limit:] + headersWithGovState = headersWithGovState[limit:] origin += uint64(limit) } @@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } - if index, err := d.blockchain.InsertChain(blocks); err != nil { + if index, err := d.blockchain.InsertChain2(blocks); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { // DeliverHeaders injects a new batch of block headers received from a remote // node into the download schedule. -func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { +func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) { return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } +func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error { + return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter) +} + // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go index 3e29357ba..f0d596a4b 100644 --- a/dex/downloader/fakepeer.go +++ b/dex/downloader/fakepeer.go @@ -88,7 +88,14 @@ func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, } } } - p.dl.DeliverHeaders(p.id, headers) + + // TODO(sonic): fix this + var headersWithGovState []*types.HeaderWithGovState + for _, h := range headers { + headersWithGovState = append(headersWithGovState, + &types.HeaderWithGovState{Header: h}) + } + p.dl.DeliverHeaders(p.id, headersWithGovState) return nil } @@ -115,7 +122,13 @@ func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, r } headers = append(headers, origin) } - p.dl.DeliverHeaders(p.id, headers) + // TODO(sonic): fix this + var headersWithGovState []*types.HeaderWithGovState + for _, h := range headers { + headersWithGovState = append(headersWithGovState, + &types.HeaderWithGovState{Header: h}) + } + p.dl.DeliverHeaders(p.id, headersWithGovState) return nil } diff --git a/dex/downloader/governance.go b/dex/downloader/governance.go new file mode 100644 index 000000000..40233f1a8 --- /dev/null +++ b/dex/downloader/governance.go @@ -0,0 +1,170 @@ +package downloader + +import ( + "math/big" + "sync" + "time" + + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + dkgTypes "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/rlp" + "github.com/dexon-foundation/dexon/trie" +) + +// This is a goverance for fast sync +type governance struct { + db ethdb.Database + headRoot common.Hash + headHeight uint64 + height2Root map[uint64]common.Hash + mu sync.Mutex +} + +func newGovernance(headState *types.GovState) *governance { + db := ethdb.NewMemDatabase() + g := &governance{ + db: db, + headRoot: headState.Root, + headHeight: headState.Number.Uint64(), + height2Root: make(map[uint64]common.Hash), + } + g.StoreState(headState) + + statedb, err := state.New(headState.Root, state.NewDatabase(g.db)) + if statedb == nil || err != nil { + log.Error("New governance fail", "statedb == nil", statedb == nil, "err", err) + } + return g +} + +func (g *governance) getHeadHelper() *vm.GovernanceStateHelper { + return g.getHelper(g.headRoot) +} + +func (g *governance) getHelperByRound(round uint64) *vm.GovernanceStateHelper { + height := g.GetRoundHeight(round) + root, exists := g.height2Root[height] + if !exists { + log.Debug("Gov get helper by round", "round", round, "exists", exists) + return nil + } + return g.getHelper(root) +} + +func (g *governance) getHelper(root common.Hash) *vm.GovernanceStateHelper { + statedb, err := state.New(root, state.NewDatabase(g.db)) + if statedb == nil || err != nil { + return nil + } + return &vm.GovernanceStateHelper{statedb} +} + +func (g *governance) GetRoundHeight(round uint64) uint64 { + var h uint64 + if helper := g.getHeadHelper(); helper != nil { + h = helper.RoundHeight(big.NewInt(int64(round))).Uint64() + } + return h +} + +func (g *governance) StoreState(s *types.GovState) { + g.mu.Lock() + defer g.mu.Unlock() + + // store the hight -> root mapping + g.height2Root[s.Number.Uint64()] = s.Root + + // store the account + for _, node := range s.Proof { + g.db.Put(crypto.Keccak256(node), node) + } + + // store the storage + triedb := trie.NewDatabase(g.db) + t, err := trie.New(common.Hash{}, triedb) + if err != nil { + panic(err) + } + for _, kv := range s.Storage { + t.TryUpdate(kv[0], kv[1]) + } + t.Commit(nil) + triedb.Commit(t.Hash(), false) + + if s.Number.Uint64() > g.headHeight { + log.Debug("Gov head root changed", "number", s.Number.Uint64()) + g.headRoot = s.Root + g.headHeight = s.Number.Uint64() + } +} + +// Return the genesis configuration if round == 0. +func (g *governance) Configuration(round uint64) *coreTypes.Config { + if round < dexCore.ConfigRoundShift { + round = 0 + } else { + round -= dexCore.ConfigRoundShift + } + helper := g.getHelperByRound(round) + if helper == nil { + log.Warn("Get config helper fail", "round - round shift", round) + return nil + } + c := helper.Configuration() + return &coreTypes.Config{ + NumChains: c.NumChains, + LambdaBA: time.Duration(c.LambdaBA) * time.Millisecond, + LambdaDKG: time.Duration(c.LambdaDKG) * time.Millisecond, + K: int(c.K), + PhiRatio: c.PhiRatio, + NotarySetSize: c.NotarySetSize, + DKGSetSize: c.DKGSetSize, + RoundInterval: time.Duration(c.RoundInterval) * time.Millisecond, + MinBlockInterval: time.Duration(c.MinBlockInterval) * time.Millisecond, + } +} + +// DKGComplaints gets all the DKGComplaints of round. +func (g *governance) DKGComplaints(round uint64) []*dkgTypes.Complaint { + helper := g.getHeadHelper() + var dkgComplaints []*dkgTypes.Complaint + for _, pk := range helper.DKGComplaints(big.NewInt(int64(round))) { + x := new(dkgTypes.Complaint) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgComplaints = append(dkgComplaints, x) + } + return dkgComplaints +} + +// DKGMasterPublicKeys gets all the DKGMasterPublicKey of round. +func (g *governance) DKGMasterPublicKeys(round uint64) []*dkgTypes.MasterPublicKey { + helper := g.getHeadHelper() + var dkgMasterPKs []*dkgTypes.MasterPublicKey + for _, pk := range helper.DKGMasterPublicKeys(big.NewInt(int64(round))) { + x := new(dkgTypes.MasterPublicKey) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgMasterPKs = append(dkgMasterPKs, x) + } + return dkgMasterPKs +} + +// IsDKGFinal checks if DKG is final. +func (g *governance) IsDKGFinal(round uint64) bool { + helper := g.getHeadHelper() + threshold := 2*uint64(g.Configuration(round).DKGSetSize)/3 + 1 + count := helper.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64() + return count >= threshold +} diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go index 0d6041712..395950759 100644 --- a/dex/downloader/metrics.go +++ b/dex/downloader/metrics.go @@ -28,6 +28,11 @@ var ( headerDropMeter = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil) headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil) + govStateInMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/in", nil) + govStateReqTimer = metrics.NewRegisteredTimer("dex/downloader/govStates/req", nil) + govStateDropMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/drop", nil) + govStateTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/timeout", nil) + bodyInMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil) bodyReqTimer = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil) bodyDropMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil) diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go index 1fd82fbe3..b6f2936b7 100644 --- a/dex/downloader/peer.go +++ b/dex/downloader/peer.go @@ -78,8 +78,9 @@ type peerConnection struct { // LightPeer encapsulates the methods required to synchronise with a remote light peer. type LightPeer interface { Head() (common.Hash, *big.Int) - RequestHeadersByHash(common.Hash, int, int, bool) error - RequestHeadersByNumber(uint64, int, int, bool) error + RequestHeadersByHash(common.Hash, int, int, bool, bool) error + RequestHeadersByNumber(uint64, int, int, bool, bool) error + RequestGovStateByHash(common.Hash) error } // Peer encapsulates the methods required to synchronise with a remote full peer. @@ -96,11 +97,15 @@ type lightPeerWrapper struct { } func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } -func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error { - return w.peer.RequestHeadersByHash(h, amount, skip, reverse) +func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse, withGov bool) error { + return w.peer.RequestHeadersByHash(h, amount, skip, reverse, withGov) } -func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error { - return w.peer.RequestHeadersByNumber(i, amount, skip, reverse) +func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse, withGov bool) error { + return w.peer.RequestHeadersByNumber(i, amount, skip, reverse, withGov) +} +func (w *lightPeerWrapper) RequestGovStateByHash(common.Hash) error { + // TODO(sonic): support this + panic("RequestGovStateByHash not supported in light client mode sync") } func (w *lightPeerWrapper) RequestBodies([]common.Hash) error { panic("RequestBodies not supported in light client mode sync") @@ -156,7 +161,7 @@ func (p *peerConnection) FetchHeaders(from uint64, count int) error { p.headerStarted = time.Now() // Issue the header retrieval request (absolut upwards without gaps) - go p.peer.RequestHeadersByNumber(from, count, 0, false) + go p.peer.RequestHeadersByNumber(from, count, 0, false, true) return nil } diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go index 12c75e793..f3a36ec3c 100644 --- a/dex/downloader/queue.go +++ b/dex/downloader/queue.go @@ -68,15 +68,15 @@ type queue struct { mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching // Headers are "special", they download in batches, supported by a skeleton chain - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order - headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for - headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable - headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations - headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers - headerProced int // [eth/62] Number of headers already processed from the results - headerOffset uint64 // [eth/62] Number of the first header in the result cache - headerContCh chan bool // [eth/62] Channel to notify when header download finishes + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + headerTaskPool map[uint64]*types.HeaderWithGovState // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers + headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable + headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations + headerResults []*types.HeaderWithGovState // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results + headerOffset uint64 // [eth/62] Number of the first header in the result cache + headerContCh chan bool // [eth/62] Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers @@ -267,7 +267,7 @@ func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[comm // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill // up an already retrieved header skeleton. -func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { +func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.HeaderWithGovState) { q.lock.Lock() defer q.lock.Unlock() @@ -276,10 +276,10 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { panic("skeleton assembly already in progress") } // Schedule all the header retrieval tasks for the skeleton assembly - q.headerTaskPool = make(map[uint64]*types.Header) + q.headerTaskPool = make(map[uint64]*types.HeaderWithGovState) q.headerTaskQueue = prque.New(nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains - q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerResults = make([]*types.HeaderWithGovState, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 q.headerOffset = from q.headerContCh = make(chan bool, 1) @@ -294,7 +294,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { // RetrieveHeaders retrieves the header chain assemble based on the scheduled // skeleton. -func (q *queue) RetrieveHeaders() ([]*types.Header, int) { +func (q *queue) RetrieveHeaders() ([]*types.HeaderWithGovState, int) { q.lock.Lock() defer q.lock.Unlock() @@ -306,7 +306,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, int) { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -333,14 +333,14 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { continue } // Queue the header for content retrieval - q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.blockTaskPool[hash] = header.Header + q.blockTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) if q.mode == FastSync { - q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.receiptTaskPool[hash] = header.Header + q.receiptTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) } - inserts = append(inserts, header) + inserts = append(inserts, header.Header) q.headerHead = hash from++ } @@ -679,7 +679,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // If the headers are accepted, the method makes an attempt to deliver the set // of ready headers to the processor to keep the pipeline full. However it will // not block to prevent stalling other pending deliveries. -func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { +func (q *queue) DeliverHeaders(id string, headers []*types.HeaderWithGovState, headerProcCh chan []*types.HeaderWithGovState) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -743,7 +743,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } if ready > 0 { // Headers are ready for delivery, gather them and push forward (non blocking) - process := make([]*types.Header, ready) + process := make([]*types.HeaderWithGovState, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { diff --git a/dex/downloader/types.go b/dex/downloader/types.go index d320b7590..a7143ccd9 100644 --- a/dex/downloader/types.go +++ b/dex/downloader/types.go @@ -35,13 +35,22 @@ type dataPack interface { // headerPack is a batch of block headers returned by a peer. type headerPack struct { peerID string - headers []*types.Header + headers []*types.HeaderWithGovState } func (p *headerPack) PeerId() string { return p.peerID } func (p *headerPack) Items() int { return len(p.headers) } func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) } +type govStatePack struct { + peerID string + govState *types.GovState +} + +func (p *govStatePack) PeerId() string { return p.peerID } +func (p *govStatePack) Items() int { return 1 } +func (p *govStatePack) Stats() string { return "1" } + // bodyPack is a batch of block bodies returned by a peer. type bodyPack struct { peerID string |