From b97e34a8e4d06b315cc495819ba6612f89dec54f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 7 Oct 2015 12:14:30 +0300 Subject: eth/downloader: concurrent receipt and state processing --- trie/sync.go | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++------- trie/trie.go | 5 +++++ 2 files changed, 65 insertions(+), 8 deletions(-) (limited to 'trie') diff --git a/trie/sync.go b/trie/sync.go index 65cfd6ed8..bb112fb62 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -50,15 +51,15 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // TrieSync is the main state trie synchronisation scheduler, which provides yet // unknown trie hashes to retrieve, accepts node data associated with said hashes -// and reconstructs the trie steb by step until all is done. +// and reconstructs the trie step by step until all is done. type TrieSync struct { - database Database // State database for storing all the assembled node data + database ethdb.Database // State database for storing all the assembled node data requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } // NewTrieSync creates a new trie data download scheduler. -func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallback) *TrieSync { +func NewTrieSync(root common.Hash, database ethdb.Database, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, requests: make(map[common.Hash]*request), @@ -70,10 +71,14 @@ func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallb // AddSubTrie registers a new trie to the sync code, rooted at the designated parent. func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback TrieSyncLeafCallback) { - // Short circuit if the trie is empty + // Short circuit if the trie is empty or already known if root == emptyRoot { return } + blob, _ := s.database.Get(root.Bytes()) + if local, err := decodeNode(blob); local != nil && err == nil { + return + } // Assemble the new sub-trie sync request node := node(hashNode(root.Bytes())) req := &request{ @@ -94,6 +99,35 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c s.schedule(req) } +// AddRawEntry schedules the direct retrieval of a state entry that should not be +// interpreted as a trie node, but rather accepted and stored into the database +// as is. This method's goal is to support misc state metadata retrievals (e.g. +// contract code). +func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { + // Short circuit if the entry is empty or already known + if hash == emptyState { + return + } + if blob, _ := s.database.Get(hash.Bytes()); blob != nil { + return + } + // Assemble the new sub-trie sync request + req := &request{ + hash: hash, + depth: depth, + } + // If this sub-trie has a designated parent, link them together + if parent != (common.Hash{}) { + ancestor := s.requests[parent] + if ancestor == nil { + panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent)) + } + ancestor.deps++ + req.parents = append(req.parents, ancestor) + } + s.schedule(req) +} + // Missing retrieves the known missing nodes from the trie for retrieval. func (s *TrieSync) Missing(max int) []common.Hash { requests := []common.Hash{} @@ -111,6 +145,12 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { if request == nil { return i, fmt.Errorf("not requested: %x", item.Hash) } + // If the item is a raw entry request, commit directly + if request.object == nil { + request.data = item.Data + s.commit(request, nil) + continue + } // Decode the node data content and update the request node, err := decodeNode(item.Data) if err != nil { @@ -125,7 +165,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { return i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request) + s.commit(request, nil) continue } request.deps += len(requests) @@ -136,6 +176,11 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { return 0, nil } +// Pending returns the number of state entries currently pending for download. +func (s *TrieSync) Pending() int { + return len(s.requests) +} + // schedule inserts a new state retrieval request into the fetch queue. If there // is already a pending request for this node, the new request will be discarded // and only a parent reference added to the old one. @@ -213,9 +258,16 @@ func (s *TrieSync) children(req *request) ([]*request, error) { // commit finalizes a retrieval request and stores it into the database. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request) error { +func (s *TrieSync) commit(req *request, batch ethdb.Batch) (err error) { + // Create a new batch if none was specified + if batch == nil { + batch = s.database.NewBatch() + defer func() { + err = batch.Write() + }() + } // Write the node content to disk - if err := s.database.Put(req.hash[:], req.data); err != nil { + if err := batch.Put(req.hash[:], req.data); err != nil { return err } delete(s.requests, req.hash) @@ -224,7 +276,7 @@ func (s *TrieSync) commit(req *request) error { for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent); err != nil { + if err := s.commit(parent, batch); err != nil { return err } } diff --git a/trie/trie.go b/trie/trie.go index aa8d39fe2..a3a383fb5 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -24,6 +24,7 @@ import ( "hash" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -35,8 +36,12 @@ const defaultCacheCapacity = 800 var ( // The global cache stores decoded trie nodes by hash as they get loaded. globalCache = newARC(defaultCacheCapacity) + // This is the known root hash of an empty trie. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + + // This is the known hash of an empty state trie entry. + emptyState = crypto.Sha3Hash(nil) ) var ErrMissingRoot = errors.New("missing root node") -- cgit v1.2.3