diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-07 17:14:30 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-19 15:03:10 +0800 |
commit | b97e34a8e4d06b315cc495819ba6612f89dec54f (patch) | |
tree | 22ddf740ffe180b29b9b5a3a94684d7ac2a5ae19 /core | |
parent | ab27bee25a845be90bd60e774ff68d2ea1501772 (diff) | |
download | go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.gz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.bz2 go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.lz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.xz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.zst go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.zip |
eth/downloader: concurrent receipt and state processing
Diffstat (limited to 'core')
-rw-r--r-- | core/block_processor.go | 11 | ||||
-rw-r--r-- | core/blockchain.go | 266 | ||||
-rw-r--r-- | core/blockchain_test.go | 29 | ||||
-rw-r--r-- | core/chain_makers.go | 2 | ||||
-rw-r--r-- | core/error.go | 2 | ||||
-rw-r--r-- | core/state/sync.go | 93 | ||||
-rw-r--r-- | core/state/sync_test.go | 16 | ||||
-rw-r--r-- | core/types/common.go | 2 |
8 files changed, 254 insertions, 167 deletions
diff --git a/core/block_processor.go b/core/block_processor.go index ba6350805..60f0258c4 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -383,6 +383,15 @@ func (sm *BlockProcessor) ValidateHeader(header *types.Header, checkPow, uncle b } } +// ValidateHeaderWithParent verifies the validity of a header, relying on the database and +// POW behind the block processor. +func (sm *BlockProcessor) ValidateHeaderWithParent(header, parent *types.Header, checkPow, uncle bool) error { + if sm.bc.HasHeader(header.Hash()) { + return nil + } + return ValidateHeader(sm.Pow, header, parent, checkPow, uncle) +} + // See YP section 4.3.4. "Block Header Validity" // Validates a header. Returns an error if the header is invalid. func ValidateHeader(pow pow.PoW, header *types.Header, parent *types.Header, checkPow, uncle bool) error { @@ -425,7 +434,7 @@ func ValidateHeader(pow pow.PoW, header *types.Header, parent *types.Header, che if checkPow { // Verify the nonce of the header. Return an error if it's not valid if !pow.Verify(types.NewBlockWithHeader(header)) { - return ValidationError("Header's nonce is invalid (= %x)", header.Nonce) + return &BlockNonceErr{Hash: header.Hash(), Number: header.Number, Nonce: header.Nonce.Uint64()} } } return nil diff --git a/core/blockchain.go b/core/blockchain.go index 6c8a24751..3e7dfa9ee 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -22,6 +22,8 @@ import ( "fmt" "io" "math/big" + "math/rand" + "runtime" "sync" "sync/atomic" "time" @@ -671,7 +673,7 @@ func (self *BlockChain) writeHeader(header *types.Header) error { // should be done or not. The reason behind the optional check is because some // of the header retrieval mechanisms already need to verfy nonces, as well as // because nonces can be verified sparsely, not needing to check each. -func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (int, error) { +func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { self.wg.Add(1) defer self.wg.Done() @@ -683,16 +685,85 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i stats := struct{ processed, ignored int }{} start := time.Now() - // Start the parallel nonce verifier, with a fake nonce if not requested - verifier := self.pow - if !verify { - verifier = FakePow{} + // Generate the list of headers that should be POW verified + verify := make([]bool, len(chain)) + for i := 0; i < len(verify)/checkFreq; i++ { + index := i*checkFreq + rand.Intn(checkFreq) + if index >= len(verify) { + index = len(verify) - 1 + } + verify[index] = true } - nonceAbort, nonceResults := verifyNoncesFromHeaders(verifier, chain) - defer close(nonceAbort) + verify[len(verify)-1] = true // Last should always be verified to avoid junk + + // Create the header verification task queue and worker functions + tasks := make(chan int, len(chain)) + for i := 0; i < len(chain); i++ { + tasks <- i + } + close(tasks) - // Iterate over the headers, inserting any new ones - complete := make([]bool, len(chain)) + errs, failed := make([]error, len(tasks)), int32(0) + process := func(worker int) { + for index := range tasks { + header, hash := chain[index], chain[index].Hash() + + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&self.procInterrupt) == 1 { + return + } + if atomic.LoadInt32(&failed) > 0 { + return + } + // Short circuit if the header is bad or already known + if BadHashes[hash] { + errs[index] = BadHashError(hash) + atomic.AddInt32(&failed, 1) + return + } + if self.HasHeader(hash) { + continue + } + // Verify that the header honors the chain parameters + checkPow := verify[index] + + var err error + if index == 0 { + err = self.processor.ValidateHeader(header, checkPow, false) + } else { + err = self.processor.ValidateHeaderWithParent(header, chain[index-1], checkPow, false) + } + if err != nil { + errs[index] = err + atomic.AddInt32(&failed, 1) + return + } + } + } + // Start as many worker threads as goroutines allowed + pending := new(sync.WaitGroup) + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + pending.Add(1) + go func(id int) { + defer pending.Done() + process(id) + }(i) + } + pending.Wait() + + // If anything failed, report + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") + return 0, nil + } + if failed > 0 { + for i, err := range errs { + if err != nil { + return i, err + } + } + } + // All headers passed verification, import them into the database for i, header := range chain { // Short circuit insertion if shutting down if atomic.LoadInt32(&self.procInterrupt) == 1 { @@ -701,24 +772,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i } hash := header.Hash() - // Accumulate verification results until the next header is verified - for !complete[i] { - if res := <-nonceResults; res.valid { - complete[res.index] = true - } else { - header := chain[res.index] - return res.index, &BlockNonceErr{ - Hash: header.Hash(), - Number: new(big.Int).Set(header.Number), - Nonce: header.Nonce.Uint64(), - } - } - } - if BadHashes[hash] { - glog.V(logger.Error).Infof("bad header %d [%x…], known bad hash", header.Number, hash) - return i, BadHashError(hash) - } - // Write the header to the chain and get the status + // If the header's already known, skip it, otherwise store if self.HasHeader(hash) { stats.ignored++ continue @@ -743,76 +797,116 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain defer self.wg.Done() // Collect some import statistics to report on - stats := struct{ processed, ignored int }{} + stats := struct{ processed, ignored int32 }{} start := time.Now() - // Iterate over the blocks and receipts, inserting any new ones + // Create the block importing task queue and worker functions + tasks := make(chan int, len(blockChain)) for i := 0; i < len(blockChain) && i < len(receiptChain); i++ { - block, receipts := blockChain[i], receiptChain[i] + tasks <- i + } + close(tasks) - // Short circuit insertion if shutting down - if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") - break - } - // Short circuit if the owner header is unknown - if !self.HasHeader(block.Hash()) { - glog.V(logger.Debug).Infof("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) - return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) - } - // Skip if the entire data is already known - if self.HasBlock(block.Hash()) { - stats.ignored++ - continue - } - // Compute all the non-consensus fields of the receipts - transactions, logIndex := block.Transactions(), uint(0) - for j := 0; j < len(receipts); j++ { - // The transaction hash can be retrieved from the transaction itself - receipts[j].TxHash = transactions[j].Hash() - - // The contract address can be derived from the transaction itself - if MessageCreatesContract(transactions[j]) { - from, _ := transactions[j].From() - receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce()) + errs, failed := make([]error, len(tasks)), int32(0) + process := func(worker int) { + for index := range tasks { + block, receipts := blockChain[index], receiptChain[index] + + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&self.procInterrupt) == 1 { + return } - // The used gas can be calculated based on previous receipts - if j == 0 { - receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed) - } else { - receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed) + if atomic.LoadInt32(&failed) > 0 { + return } - // The derived log fields can simply be set from the block and transaction - for k := 0; k < len(receipts[j].Logs); k++ { - receipts[j].Logs[k].BlockNumber = block.NumberU64() - receipts[j].Logs[k].BlockHash = block.Hash() - receipts[j].Logs[k].TxHash = receipts[j].TxHash - receipts[j].Logs[k].TxIndex = uint(j) - receipts[j].Logs[k].Index = logIndex - logIndex++ + // Short circuit if the owner header is unknown + if !self.HasHeader(block.Hash()) { + errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + atomic.AddInt32(&failed, 1) + return } + // Skip if the entire data is already known + if self.HasBlock(block.Hash()) { + atomic.AddInt32(&stats.ignored, 1) + continue + } + // Compute all the non-consensus fields of the receipts + transactions, logIndex := block.Transactions(), uint(0) + for j := 0; j < len(receipts); j++ { + // The transaction hash can be retrieved from the transaction itself + receipts[j].TxHash = transactions[j].Hash() + + // The contract address can be derived from the transaction itself + if MessageCreatesContract(transactions[j]) { + from, _ := transactions[j].From() + receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce()) + } + // The used gas can be calculated based on previous receipts + if j == 0 { + receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed) + } else { + receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed) + } + // The derived log fields can simply be set from the block and transaction + for k := 0; k < len(receipts[j].Logs); k++ { + receipts[j].Logs[k].BlockNumber = block.NumberU64() + receipts[j].Logs[k].BlockHash = block.Hash() + receipts[j].Logs[k].TxHash = receipts[j].TxHash + receipts[j].Logs[k].TxIndex = uint(j) + receipts[j].Logs[k].Index = logIndex + logIndex++ + } + } + // Write all the data out into the database + if err := WriteBody(self.chainDb, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil { + errs[index] = fmt.Errorf("failed to write block body: %v", err) + atomic.AddInt32(&failed, 1) + glog.Fatal(errs[index]) + return + } + if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil { + errs[index] = fmt.Errorf("failed to write block receipts: %v", err) + atomic.AddInt32(&failed, 1) + glog.Fatal(errs[index]) + return + } + atomic.AddInt32(&stats.processed, 1) } - // Write all the data out into the database - if err := WriteBody(self.chainDb, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil { - glog.Fatalf("failed to write block body: %v", err) - return i, err - } - if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil { - glog.Fatalf("failed to write block receipts: %v", err) - return i, err - } - // Update the head fast sync block if better - self.mu.Lock() - if self.GetTd(self.currentFastBlock.Hash()).Cmp(self.GetTd(block.Hash())) < 0 { - if err := WriteHeadFastBlockHash(self.chainDb, block.Hash()); err != nil { - glog.Fatalf("failed to update head fast block hash: %v", err) + } + // Start as many worker threads as goroutines allowed + pending := new(sync.WaitGroup) + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + pending.Add(1) + go func(id int) { + defer pending.Done() + process(id) + }(i) + } + pending.Wait() + + // If anything failed, report + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") + return 0, nil + } + if failed > 0 { + for i, err := range errs { + if err != nil { + return i, err } - self.currentFastBlock = block } - self.mu.Unlock() - - stats.processed++ } + // Update the head fast sync block if better + self.mu.Lock() + head := blockChain[len(errs)-1] + if self.GetTd(self.currentFastBlock.Hash()).Cmp(self.GetTd(head.Hash())) < 0 { + if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil { + glog.Fatalf("failed to update head fast block hash: %v", err) + } + self.currentFastBlock = head + } + self.mu.Unlock() + // Report some public statistics so the user has a clue what's going on first, last := blockChain[0], blockChain[len(blockChain)-1] glog.V(logger.Info).Infof("imported %d receipt(s) (%d ignored) in %v. #%d [%x… / %x…]", stats.processed, stats.ignored, diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 93c2128bc..a614aaa2f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -94,7 +94,7 @@ func testFork(t *testing.T, processor *BlockProcessor, i, n int, full bool, comp } } else { headerChainB = makeHeaderChain(processor2.bc.CurrentHeader(), n, db, forkSeed) - if _, err := processor2.bc.InsertHeaderChain(headerChainB, true); err != nil { + if _, err := processor2.bc.InsertHeaderChain(headerChainB, 1); err != nil { t.Fatalf("failed to insert forking chain: %v", err) } } @@ -415,7 +415,9 @@ func TestChainMultipleInsertions(t *testing.T) { type bproc struct{} -func (bproc) Process(*types.Block) (vm.Logs, types.Receipts, error) { return nil, nil, nil } +func (bproc) Process(*types.Block) (vm.Logs, types.Receipts, error) { return nil, nil, nil } +func (bproc) ValidateHeader(*types.Header, bool, bool) error { return nil } +func (bproc) ValidateHeaderWithParent(*types.Header, *types.Header, bool, bool) error { return nil } func makeHeaderChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Header { blocks := makeBlockChainWithDiff(genesis, d, seed) @@ -492,8 +494,8 @@ func testReorg(t *testing.T, first, second []int, td int64, full bool) { bc.InsertChain(makeBlockChainWithDiff(genesis, first, 11)) bc.InsertChain(makeBlockChainWithDiff(genesis, second, 22)) } else { - bc.InsertHeaderChain(makeHeaderChainWithDiff(genesis, first, 11), false) - bc.InsertHeaderChain(makeHeaderChainWithDiff(genesis, second, 22), false) + bc.InsertHeaderChain(makeHeaderChainWithDiff(genesis, first, 11), 1) + bc.InsertHeaderChain(makeHeaderChainWithDiff(genesis, second, 22), 1) } // Check that the chain is valid number and link wise if full { @@ -543,7 +545,7 @@ func testBadHashes(t *testing.T, full bool) { } else { headers := makeHeaderChainWithDiff(genesis, []int{1, 2, 4}, 10) BadHashes[headers[2].Hash()] = true - _, err = bc.InsertHeaderChain(headers, true) + _, err = bc.InsertHeaderChain(headers, 1) } if !IsBadHashError(err) { t.Errorf("error mismatch: want: BadHashError, have: %v", err) @@ -575,7 +577,7 @@ func testReorgBadHashes(t *testing.T, full bool) { BadHashes[blocks[3].Header().Hash()] = true defer func() { delete(BadHashes, blocks[3].Header().Hash()) }() } else { - if _, err := bc.InsertHeaderChain(headers, true); err != nil { + if _, err := bc.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to import headers: %v", err) } if bc.CurrentHeader().Hash() != headers[3].Hash() { @@ -631,6 +633,8 @@ func testInsertNonceError(t *testing.T, full bool) { failHash = blocks[failAt].Hash() processor.bc.pow = failPow{failNum} + processor.Pow = failPow{failNum} + failRes, err = processor.bc.InsertChain(blocks) } else { headers := makeHeaderChain(processor.bc.CurrentHeader(), i, db, 0) @@ -640,7 +644,9 @@ func testInsertNonceError(t *testing.T, full bool) { failHash = headers[failAt].Hash() processor.bc.pow = failPow{failNum} - failRes, err = processor.bc.InsertHeaderChain(headers, true) + processor.Pow = failPow{failNum} + + failRes, err = processor.bc.InsertHeaderChain(headers, 1) } // Check that the returned error indicates the nonce failure. if failRes != failAt { @@ -714,12 +720,13 @@ func TestFastVsFullChains(t *testing.T) { fastDb, _ := ethdb.NewMemDatabase() WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds}) fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux)) + fast.SetProcessor(NewBlockProcessor(fastDb, FakePow{}, fast, new(event.TypeMux))) headers := make([]*types.Header, len(blocks)) for i, block := range blocks { headers[i] = block.Header() } - if n, err := fast.InsertHeaderChain(headers, true); err != nil { + if n, err := fast.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil { @@ -796,12 +803,13 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { fastDb, _ := ethdb.NewMemDatabase() WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds}) fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux)) + fast.SetProcessor(NewBlockProcessor(fastDb, FakePow{}, fast, new(event.TypeMux))) headers := make([]*types.Header, len(blocks)) for i, block := range blocks { headers[i] = block.Header() } - if n, err := fast.InsertHeaderChain(headers, true); err != nil { + if n, err := fast.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil { @@ -813,8 +821,9 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { lightDb, _ := ethdb.NewMemDatabase() WriteGenesisBlockForTesting(lightDb, GenesisAccount{address, funds}) light, _ := NewBlockChain(lightDb, FakePow{}, new(event.TypeMux)) + light.SetProcessor(NewBlockProcessor(lightDb, FakePow{}, light, new(event.TypeMux))) - if n, err := light.InsertHeaderChain(headers, true); err != nil { + if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } assert(t, "light", light, height, 0, 0) diff --git a/core/chain_makers.go b/core/chain_makers.go index 31b2627af..1d41b4091 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -239,7 +239,7 @@ func newCanonical(n int, full bool) (ethdb.Database, *BlockProcessor, error) { } // Header-only chain requested headers := makeHeaderChain(genesis.Header(), n, db, canonicalSeed) - _, err := blockchain.InsertHeaderChain(headers, true) + _, err := blockchain.InsertHeaderChain(headers, 1) return db, processor, err } diff --git a/core/error.go b/core/error.go index 5e32124a7..af42cd19c 100644 --- a/core/error.go +++ b/core/error.go @@ -111,7 +111,7 @@ type BlockNonceErr struct { } func (err *BlockNonceErr) Error() string { - return fmt.Sprintf("block %d (%v) nonce is invalid (got %d)", err.Number, err.Hash, err.Nonce) + return fmt.Sprintf("nonce for #%d [%x…] is invalid (got %d)", err.Number, err.Hash, err.Nonce) } // IsBlockNonceErr returns true for invalid block nonce errors. diff --git a/core/state/sync.go b/core/state/sync.go index e9bebe8ee..5a388886c 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -21,78 +21,51 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) -type StateSync struct { - db ethdb.Database - sync *trie.TrieSync - codeReqs map[common.Hash]struct{} // requested but not yet written to database - codeReqList []common.Hash // requested since last Missing -} +// StateSync is the main state synchronisation scheduler, which provides yet the +// unknown state hashes to retrieve, accepts node data associated with said hashes +// and reconstructs the state database step by step until all is done. +type StateSync trie.TrieSync -var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil)) +// NewStateSync create a new state trie download scheduler. +func NewStateSync(root common.Hash, database ethdb.Database) *StateSync { + // Pre-declare the result syncer t + var syncer *trie.TrieSync -func NewStateSync(root common.Hash, db ethdb.Database) *StateSync { - ss := &StateSync{ - db: db, - codeReqs: make(map[common.Hash]struct{}), - } - ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash - ss.sync = trie.NewTrieSync(root, db, ss.leafFound) - return ss -} + callback := func(leaf []byte, parent common.Hash) error { + var obj struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + CodeHash []byte + } + if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { + return err + } + syncer.AddSubTrie(obj.Root, 64, parent, nil) + syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent) -func (self *StateSync) leafFound(leaf []byte, parent common.Hash) error { - var obj struct { - Nonce uint64 - Balance *big.Int - Root common.Hash - CodeHash []byte - } - if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { - return err + return nil } - self.sync.AddSubTrie(obj.Root, 64, parent, nil) + syncer = trie.NewTrieSync(root, database, callback) + return (*StateSync)(syncer) +} - codehash := common.BytesToHash(obj.CodeHash) - if _, ok := self.codeReqs[codehash]; !ok { - code, _ := self.db.Get(obj.CodeHash) - if code == nil { - self.codeReqs[codehash] = struct{}{} - self.codeReqList = append(self.codeReqList, codehash) - } - } - return nil +// Missing retrieves the known missing nodes from the state trie for retrieval. +func (s *StateSync) Missing(max int) []common.Hash { + return (*trie.TrieSync)(s).Missing(max) } -func (self *StateSync) Missing(max int) []common.Hash { - cr := len(self.codeReqList) - gh := 0 - if max != 0 { - if cr > max { - cr = max - } - gh = max - cr - } - list := append(self.sync.Missing(gh), self.codeReqList[:cr]...) - self.codeReqList = self.codeReqList[cr:] - return list +// Process injects a batch of retrieved trie nodes data. +func (s *StateSync) Process(list []trie.SyncResult) (int, error) { + return (*trie.TrieSync)(s).Process(list) } -func (self *StateSync) Process(list []trie.SyncResult) error { - for i := 0; i < len(list); i++ { - if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node - self.db.Put(list[i].Hash[:], list[i].Data) - delete(self.codeReqs, list[i].Hash) - list[i] = list[len(list)-1] - list = list[:len(list)-1] - i-- - } - } - _, err := self.sync.Process(list) - return err +// Pending returns the number of state entries currently pending for download. +func (s *StateSync) Pending() int { + return (*trie.TrieSync)(s).Pending() } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index f6afe8bd8..f0376d484 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -115,8 +115,8 @@ func testIterativeStateSync(t *testing.T, batch int) { } results[i] = trie.SyncResult{hash, data} } - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[:0], sched.Missing(batch)...) } @@ -145,8 +145,8 @@ func TestIterativeDelayedStateSync(t *testing.T) { } results[i] = trie.SyncResult{hash, data} } - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[len(results):], sched.Missing(0)...) } @@ -183,8 +183,8 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { results = append(results, trie.SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { @@ -226,8 +226,8 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} diff --git a/core/types/common.go b/core/types/common.go index 29019a1b4..fe682f98a 100644 --- a/core/types/common.go +++ b/core/types/common.go @@ -20,4 +20,6 @@ import "github.com/ethereum/go-ethereum/core/vm" type BlockProcessor interface { Process(*Block) (vm.Logs, Receipts, error) + ValidateHeader(*Header, bool, bool) error + ValidateHeaderWithParent(*Header, *Header, bool, bool) error } |