diff options
Diffstat (limited to 'trie')
-rw-r--r-- | trie/sync.go | 74 | ||||
-rw-r--r-- | trie/sync_test.go | 30 |
2 files changed, 86 insertions, 18 deletions
diff --git a/trie/sync.go b/trie/sync.go index 168501392..9e8449431 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -28,6 +28,10 @@ import ( // node it did not request. var ErrNotRequested = errors.New("not requested") +// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a +// node it already processed previously. +var ErrAlreadyProcessed = errors.New("already processed") + // request represents a scheduled or already in-flight state retrieval request. type request struct { hash common.Hash // Hash of the node data content to retrieve @@ -48,6 +52,21 @@ type SyncResult struct { Data []byte // Data content of the retrieved node } +// syncMemBatch is an in-memory buffer of successfully downloaded but not yet +// persisted data items. +type syncMemBatch struct { + batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items + order []common.Hash // Order of completion to prevent out-of-order data loss +} + +// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. +func newSyncMemBatch() *syncMemBatch { + return &syncMemBatch{ + batch: make(map[common.Hash][]byte), + order: make([]common.Hash, 0, 256), + } +} + // TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a // leaf node. It's used by state syncing to check if the leaf node requires some // further data syncing. @@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type TrieSync struct { - database DatabaseReader + database DatabaseReader // Persistent database to check for existing entries + membatch *syncMemBatch // Memory buffer to avoid frequest database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } @@ -66,6 +86,7 @@ type TrieSync struct { func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, + membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), queue: prque.New(), } @@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c if root == emptyRoot { return } + if _, ok := s.membatch.batch[root]; ok { + return + } key := root.Bytes() blob, _ := s.database.Get(key) if local, err := decodeNode(key, blob, 0); local != nil && err == nil { @@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) if hash == emptyState { return } + if _, ok := s.membatch.batch[hash]; ok { + return + } if blob, _ := s.database.Get(hash.Bytes()); blob != nil { return } @@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { // Process injects a batch of retrieved trie nodes data, returning if something // was committed to the database and also the index of an entry if processing of // it failed. -func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) { +func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { committed := false for i, item := range results { @@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, if request == nil { return committed, i, ErrNotRequested } + if request.data != nil { + return committed, i, ErrAlreadyProcessed + } // If the item is a raw entry request, commit directly if request.raw { request.data = item.Data - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, 0, nil } +// Commit flushes the data stored in the internal membatch out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) { + // Dump the membatch into a database dbw + for i, key := range s.membatch.order { + if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { + return i, err + } + } + written := len(s.membatch.order) + + // Drop the membatch data and return + s.membatch = newSyncMemBatch() + return written, nil +} + // Pending returns the number of state entries currently pending for download. func (s *TrieSync) Pending() int { return len(s.requests) @@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { // If the child references another node, resolve or schedule if node, ok := (child.node).(hashNode); ok { // Try to resolve the node from the local database + hash := common.BytesToHash(node) + if _, ok := s.membatch.batch[hash]; ok { + continue + } blob, _ := s.database.Get(node) if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil { continue } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ - hash: common.BytesToHash(node), + hash: hash, parents: []*request{req}, depth: child.depth, callback: req.callback, @@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { return requests, nil } -// commit finalizes a retrieval request and stores it into the database. If any +// commit finalizes a retrieval request and stores it into the membatch. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) { - // Write the node content to disk - if err := dbw.Put(req.hash[:], req.data); err != nil { - return err - } +func (s *TrieSync) commit(req *request) (err error) { + // Write the node content to the membatch + s.membatch.batch[req.hash] = req.data + s.membatch.order = append(s.membatch.order, req.hash) + delete(s.requests, req.hash) // Check all parents for completion for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent, dbw); err != nil { + if err := s.commit(parent); err != nil { return err } } diff --git a/trie/sync_test.go b/trie/sync_test.go index d778555b9..ec16a25bd 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two tries are in sync @@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(10000)...) } // Cross check that the two tries are in sync @@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { results = append(results, SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { delete(queue, result.Hash) } @@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(0)...) } // Cross check that the two tries are in sync @@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } // Process each of the trie nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) } |