From b97e34a8e4d06b315cc495819ba6612f89dec54f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 7 Oct 2015 12:14:30 +0300 Subject: eth/downloader: concurrent receipt and state processing --- core/state/sync.go | 93 ++++++++++++++++++------------------------------- core/state/sync_test.go | 16 ++++----- 2 files changed, 41 insertions(+), 68 deletions(-) (limited to 'core/state') diff --git a/core/state/sync.go b/core/state/sync.go index e9bebe8ee..5a388886c 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -21,78 +21,51 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) -type StateSync struct { - db ethdb.Database - sync *trie.TrieSync - codeReqs map[common.Hash]struct{} // requested but not yet written to database - codeReqList []common.Hash // requested since last Missing -} +// StateSync is the main state synchronisation scheduler, which provides yet the +// unknown state hashes to retrieve, accepts node data associated with said hashes +// and reconstructs the state database step by step until all is done. +type StateSync trie.TrieSync -var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil)) +// NewStateSync create a new state trie download scheduler. +func NewStateSync(root common.Hash, database ethdb.Database) *StateSync { + // Pre-declare the result syncer t + var syncer *trie.TrieSync -func NewStateSync(root common.Hash, db ethdb.Database) *StateSync { - ss := &StateSync{ - db: db, - codeReqs: make(map[common.Hash]struct{}), - } - ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash - ss.sync = trie.NewTrieSync(root, db, ss.leafFound) - return ss -} + callback := func(leaf []byte, parent common.Hash) error { + var obj struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + CodeHash []byte + } + if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { + return err + } + syncer.AddSubTrie(obj.Root, 64, parent, nil) + syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent) -func (self *StateSync) leafFound(leaf []byte, parent common.Hash) error { - var obj struct { - Nonce uint64 - Balance *big.Int - Root common.Hash - CodeHash []byte - } - if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { - return err + return nil } - self.sync.AddSubTrie(obj.Root, 64, parent, nil) + syncer = trie.NewTrieSync(root, database, callback) + return (*StateSync)(syncer) +} - codehash := common.BytesToHash(obj.CodeHash) - if _, ok := self.codeReqs[codehash]; !ok { - code, _ := self.db.Get(obj.CodeHash) - if code == nil { - self.codeReqs[codehash] = struct{}{} - self.codeReqList = append(self.codeReqList, codehash) - } - } - return nil +// Missing retrieves the known missing nodes from the state trie for retrieval. +func (s *StateSync) Missing(max int) []common.Hash { + return (*trie.TrieSync)(s).Missing(max) } -func (self *StateSync) Missing(max int) []common.Hash { - cr := len(self.codeReqList) - gh := 0 - if max != 0 { - if cr > max { - cr = max - } - gh = max - cr - } - list := append(self.sync.Missing(gh), self.codeReqList[:cr]...) - self.codeReqList = self.codeReqList[cr:] - return list +// Process injects a batch of retrieved trie nodes data. +func (s *StateSync) Process(list []trie.SyncResult) (int, error) { + return (*trie.TrieSync)(s).Process(list) } -func (self *StateSync) Process(list []trie.SyncResult) error { - for i := 0; i < len(list); i++ { - if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node - self.db.Put(list[i].Hash[:], list[i].Data) - delete(self.codeReqs, list[i].Hash) - list[i] = list[len(list)-1] - list = list[:len(list)-1] - i-- - } - } - _, err := self.sync.Process(list) - return err +// Pending returns the number of state entries currently pending for download. +func (s *StateSync) Pending() int { + return (*trie.TrieSync)(s).Pending() } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index f6afe8bd8..f0376d484 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -115,8 +115,8 @@ func testIterativeStateSync(t *testing.T, batch int) { } results[i] = trie.SyncResult{hash, data} } - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[:0], sched.Missing(batch)...) } @@ -145,8 +145,8 @@ func TestIterativeDelayedStateSync(t *testing.T) { } results[i] = trie.SyncResult{hash, data} } - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[len(results):], sched.Missing(0)...) } @@ -183,8 +183,8 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { results = append(results, trie.SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { @@ -226,8 +226,8 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if err := sched.Process(results); err != nil { - t.Fatalf("failed to process results: %v", err) + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) } for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} -- cgit v1.2.3