diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-07 17:14:30 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-19 15:03:10 +0800 |
commit | b97e34a8e4d06b315cc495819ba6612f89dec54f (patch) | |
tree | 22ddf740ffe180b29b9b5a3a94684d7ac2a5ae19 /trie/sync.go | |
parent | ab27bee25a845be90bd60e774ff68d2ea1501772 (diff) | |
download | go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.gz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.bz2 go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.lz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.xz go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.tar.zst go-tangerine-b97e34a8e4d06b315cc495819ba6612f89dec54f.zip |
eth/downloader: concurrent receipt and state processing
Diffstat (limited to 'trie/sync.go')
-rw-r--r-- | trie/sync.go | 68 |
1 files changed, 60 insertions, 8 deletions
diff --git a/trie/sync.go b/trie/sync.go index 65cfd6ed8..bb112fb62 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -50,15 +51,15 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // TrieSync is the main state trie synchronisation scheduler, which provides yet // unknown trie hashes to retrieve, accepts node data associated with said hashes -// and reconstructs the trie steb by step until all is done. +// and reconstructs the trie step by step until all is done. type TrieSync struct { - database Database // State database for storing all the assembled node data + database ethdb.Database // State database for storing all the assembled node data requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } // NewTrieSync creates a new trie data download scheduler. -func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallback) *TrieSync { +func NewTrieSync(root common.Hash, database ethdb.Database, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, requests: make(map[common.Hash]*request), @@ -70,10 +71,14 @@ func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallb // AddSubTrie registers a new trie to the sync code, rooted at the designated parent. func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback TrieSyncLeafCallback) { - // Short circuit if the trie is empty + // Short circuit if the trie is empty or already known if root == emptyRoot { return } + blob, _ := s.database.Get(root.Bytes()) + if local, err := decodeNode(blob); local != nil && err == nil { + return + } // Assemble the new sub-trie sync request node := node(hashNode(root.Bytes())) req := &request{ @@ -94,6 +99,35 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c s.schedule(req) } +// AddRawEntry schedules the direct retrieval of a state entry that should not be +// interpreted as a trie node, but rather accepted and stored into the database +// as is. This method's goal is to support misc state metadata retrievals (e.g. +// contract code). +func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { + // Short circuit if the entry is empty or already known + if hash == emptyState { + return + } + if blob, _ := s.database.Get(hash.Bytes()); blob != nil { + return + } + // Assemble the new sub-trie sync request + req := &request{ + hash: hash, + depth: depth, + } + // If this sub-trie has a designated parent, link them together + if parent != (common.Hash{}) { + ancestor := s.requests[parent] + if ancestor == nil { + panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent)) + } + ancestor.deps++ + req.parents = append(req.parents, ancestor) + } + s.schedule(req) +} + // Missing retrieves the known missing nodes from the trie for retrieval. func (s *TrieSync) Missing(max int) []common.Hash { requests := []common.Hash{} @@ -111,6 +145,12 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { if request == nil { return i, fmt.Errorf("not requested: %x", item.Hash) } + // If the item is a raw entry request, commit directly + if request.object == nil { + request.data = item.Data + s.commit(request, nil) + continue + } // Decode the node data content and update the request node, err := decodeNode(item.Data) if err != nil { @@ -125,7 +165,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { return i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request) + s.commit(request, nil) continue } request.deps += len(requests) @@ -136,6 +176,11 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { return 0, nil } +// Pending returns the number of state entries currently pending for download. +func (s *TrieSync) Pending() int { + return len(s.requests) +} + // schedule inserts a new state retrieval request into the fetch queue. If there // is already a pending request for this node, the new request will be discarded // and only a parent reference added to the old one. @@ -213,9 +258,16 @@ func (s *TrieSync) children(req *request) ([]*request, error) { // commit finalizes a retrieval request and stores it into the database. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request) error { +func (s *TrieSync) commit(req *request, batch ethdb.Batch) (err error) { + // Create a new batch if none was specified + if batch == nil { + batch = s.database.NewBatch() + defer func() { + err = batch.Write() + }() + } // Write the node content to disk - if err := s.database.Put(req.hash[:], req.data); err != nil { + if err := batch.Put(req.hash[:], req.data); err != nil { return err } delete(s.requests, req.hash) @@ -224,7 +276,7 @@ func (s *TrieSync) commit(req *request) error { for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent); err != nil { + if err := s.commit(parent, batch); err != nil { return err } } |