diff options
Diffstat (limited to 'dex/downloader/downloader.go')
-rw-r--r-- | dex/downloader/downloader.go | 199 |
1 files changed, 158 insertions, 41 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go index 0383a3709..69d95d801 100644 --- a/dex/downloader/downloader.go +++ b/dex/downloader/downloader.go @@ -1,4 +1,3 @@ -// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -25,15 +24,21 @@ import ( "sync/atomic" "time" + dexCore "github.com/dexon-foundation/dexon-consensus/core" + ethereum "github.com/dexon-foundation/dexon" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/metrics" "github.com/dexon-foundation/dexon/params" + "github.com/dexon-foundation/dexon/trie" ) var ( @@ -63,11 +68,10 @@ var ( reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync ) var ( @@ -103,6 +107,9 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database + gov *governance + verifierCache *dexCore.TSigVerifierCache + rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -125,12 +132,14 @@ type Downloader struct { committed int32 // Channels - headerCh chan dataPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + govStateCh chan dataPack + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + + headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks // for stateFetcher stateSyncStart chan *stateSync @@ -161,14 +170,18 @@ type LightChain interface { // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header + GetHeaderByNumber(number uint64) *types.Header + // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header + GetGovStateByNumber(number uint64) (*types.GovState, error) + // GetTd returns the total difficulty of a local block. GetTd(common.Hash, uint64) *big.Int - // InsertHeaderChain inserts a batch of headers into the local chain. - InsertHeaderChain([]*types.Header, int) (int, error) + // InsertHeaderChain2 inserts a batch of headers into the local chain. + InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error) // Rollback removes a few recently added elements from the local chain. Rollback([]common.Hash) @@ -193,8 +206,8 @@ type BlockChain interface { // FastSyncCommitHead directly commits the head block to a certain entity. FastSyncCommitHead(common.Hash) error - // InsertChain inserts a batch of blocks into the local chain. - InsertChain(types.Blocks) (int, error) + // InsertChain2 inserts a batch of blocks into the local chain. + InsertChain2(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) @@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC lightchain: lightchain, dropPeer: dropPeer, headerCh: make(chan dataPack, 1), + govStateCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), + headerProcCh: make(chan []*types.HeaderWithGovState, 1), quitCh: make(chan struct{}), stateCh: make(chan dataPack), stateSyncStart: make(chan *stateSync), @@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.mode == FastSync && pivot != 0 { d.committed = 0 } + + if d.mode == FastSync || d.mode == LightSync { + // fetch gov state + govState, err := d.fetchGovState(p, latest.Hash(), latest.Root) + if err != nil { + return err + } + + originHeader := d.lightchain.GetHeaderByNumber(origin) + if originHeader == nil { + return fmt.Errorf("origin header not exists, number: %d", origin) + } + + // prepare state origin - 2 + d.gov = newGovernance(govState) + for i := uint64(0); i < 3; i++ { + if originHeader.Round >= i { + h := d.gov.GetRoundHeight(originHeader.Round - i) + s, err := d.lightchain.GetGovStateByNumber(h) + if err != nil { + return err + } + d.gov.StoreState(s) + } + } + + d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5) + } + // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { @@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { // Request the advertised remote head block and wait for the response head, _ := p.peer.Head() - go p.peer.RequestHeadersByHash(head, 1, 0, false) + go p.peer.RequestHeadersByHash(head, 1, 0, false, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } - head := headers[0] + head := headers[0].Header p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil @@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { } } +func (d *Downloader) fetchGovState(p *peerConnection, + hash common.Hash, root common.Hash) (*types.GovState, error) { + go p.peer.RequestGovStateByHash(hash) + + ttl := d.requestTTL() + timeout := time.After(ttl) + for { + select { + case <-d.cancelCh: + return nil, errCancelBlockFetch + case packet := <-d.govStateCh: + if packet.PeerId() != p.id { + log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId()) + break + } + + // TODO(sonic): refactor this. + govState := packet.(*govStatePack).govState + + // reconstruct the gov state + db := ethdb.NewMemDatabase() + for _, value := range govState.Proof { + db.Put(crypto.Keccak256(value), value) + } + + // proof and state should split + // check the state object of governance contract + key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes()) + _, _, err := trie.VerifyProof(root, key, db) + if err != nil { + return nil, err + } + + triedb := trie.NewDatabase(db) + t, err := trie.New(common.Hash{}, triedb) + for _, entry := range govState.Storage { + t.TryUpdate(entry[0], entry[1]) + } + err = triedb.Commit(t.Hash(), false) + if err != nil { + return nil, err + } + + statedb, err := state.New(root, state.NewDatabase(db)) + if err != nil { + return nil, err + } + + storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress) + if storageTrie == nil { + return nil, fmt.Errorf("storage not match") + } + return govState, nil + case <-timeout: + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) + return nil, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + } + } +} + // findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N links should already get us a match. @@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err if count > limit { count = limit } - go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err ttl := d.requestTTL() timeout := time.After(ttl) - go p.peer.RequestHeadersByNumber(check, 1, 0, false) + go p.peer.RequestHeadersByNumber(check, 1, 0, false, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true) } } // Start pulling the header chain skeleton until all is done @@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) // // The method returns the entire filled skeleton and also the number of headers // already forwarded for processing. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) { log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) @@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er case <-d.cancelCh: return errCancelHeaderProcessing - case headers := <-d.headerProcCh: + case headersWithGovState := <-d.headerProcCh: // Terminate header processing if we synced up - if len(headers) == 0 { + if len(headersWithGovState) == 0 { // Notify everyone that headers are fully processed for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { @@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // Otherwise split the chunk of headers into batches and process them gotHeaders = true - for len(headers) > 0 { + for len(headersWithGovState) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: @@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } // Select the next chunk of headers to import limit := maxHeadersProcess - if limit > len(headers) { - limit = len(headers) + if limit > len(headersWithGovState) { + limit = len(headersWithGovState) } - chunk := headers[:limit] + chunk := headersWithGovState[:limit] // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { + // TODO(sonic) update the gov state to make TSigVerify correct // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) + unknown := make([]*types.Header, 0, len(headersWithGovState)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { - unknown = append(unknown, header) + unknown = append(unknown, header.Header) } } - // If we're importing pure headers, verify based on their recentness - frequency := fsHeaderCheckFrequency - if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { - frequency = 1 + + for _, header := range chunk { + if header.GovState != nil { + log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64()) + d.gov.StoreState(header.GovState) + } } - if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { + + if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { - rollback = append(rollback, chunk[:n]...) + for _, h := range chunk[:n] { + rollback = append(rollback, h.Header) + } } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) return errInvalidChain @@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er if len(rollback) > fsHeaderSafetyNet { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { @@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er return errBadPeer } } - headers = headers[limit:] + headersWithGovState = headersWithGovState[limit:] origin += uint64(limit) } @@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } - if index, err := d.blockchain.InsertChain(blocks); err != nil { + if index, err := d.blockchain.InsertChain2(blocks); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { // DeliverHeaders injects a new batch of block headers received from a remote // node into the download schedule. -func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { +func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) { return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } +func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error { + return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter) +} + // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) |