aboutsummaryrefslogtreecommitdiffstats
path: root/trie
diff options
context:
space:
mode:
Diffstat (limited to 'trie')
-rw-r--r--trie/sync.go74
-rw-r--r--trie/sync_test.go30
2 files changed, 86 insertions, 18 deletions
diff --git a/trie/sync.go b/trie/sync.go
index 168501392..9e8449431 100644
--- a/trie/sync.go
+++ b/trie/sync.go
@@ -28,6 +28,10 @@ import (
// node it did not request.
var ErrNotRequested = errors.New("not requested")
+// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
+// node it already processed previously.
+var ErrAlreadyProcessed = errors.New("already processed")
+
// request represents a scheduled or already in-flight state retrieval request.
type request struct {
hash common.Hash // Hash of the node data content to retrieve
@@ -48,6 +52,21 @@ type SyncResult struct {
Data []byte // Data content of the retrieved node
}
+// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
+// persisted data items.
+type syncMemBatch struct {
+ batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items
+ order []common.Hash // Order of completion to prevent out-of-order data loss
+}
+
+// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
+func newSyncMemBatch() *syncMemBatch {
+ return &syncMemBatch{
+ batch: make(map[common.Hash][]byte),
+ order: make([]common.Hash, 0, 256),
+ }
+}
+
// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a
// leaf node. It's used by state syncing to check if the leaf node requires some
// further data syncing.
@@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
type TrieSync struct {
- database DatabaseReader
+ database DatabaseReader // Persistent database to check for existing entries
+ membatch *syncMemBatch // Memory buffer to avoid frequest database writes
requests map[common.Hash]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests
}
@@ -66,6 +86,7 @@ type TrieSync struct {
func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync {
ts := &TrieSync{
database: database,
+ membatch: newSyncMemBatch(),
requests: make(map[common.Hash]*request),
queue: prque.New(),
}
@@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c
if root == emptyRoot {
return
}
+ if _, ok := s.membatch.batch[root]; ok {
+ return
+ }
key := root.Bytes()
blob, _ := s.database.Get(key)
if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
@@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash)
if hash == emptyState {
return
}
+ if _, ok := s.membatch.batch[hash]; ok {
+ return
+ }
if blob, _ := s.database.Get(hash.Bytes()); blob != nil {
return
}
@@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash {
// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// it failed.
-func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) {
+func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
committed := false
for i, item := range results {
@@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
if request == nil {
return committed, i, ErrNotRequested
}
+ if request.data != nil {
+ return committed, i, ErrAlreadyProcessed
+ }
// If the item is a raw entry request, commit directly
if request.raw {
request.data = item.Data
- s.commit(request, dbw)
+ s.commit(request)
committed = true
continue
}
@@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
return committed, i, err
}
if len(requests) == 0 && request.deps == 0 {
- s.commit(request, dbw)
+ s.commit(request)
committed = true
continue
}
@@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
return committed, 0, nil
}
+// Commit flushes the data stored in the internal membatch out to persistent
+// storage, returning th enumber of items written and any occurred error.
+func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) {
+ // Dump the membatch into a database dbw
+ for i, key := range s.membatch.order {
+ if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
+ return i, err
+ }
+ }
+ written := len(s.membatch.order)
+
+ // Drop the membatch data and return
+ s.membatch = newSyncMemBatch()
+ return written, nil
+}
+
// Pending returns the number of state entries currently pending for download.
func (s *TrieSync) Pending() int {
return len(s.requests)
@@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
// If the child references another node, resolve or schedule
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
+ hash := common.BytesToHash(node)
+ if _, ok := s.membatch.batch[hash]; ok {
+ continue
+ }
blob, _ := s.database.Get(node)
if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil {
continue
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &request{
- hash: common.BytesToHash(node),
+ hash: hash,
parents: []*request{req},
depth: child.depth,
callback: req.callback,
@@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
return requests, nil
}
-// commit finalizes a retrieval request and stores it into the database. If any
+// commit finalizes a retrieval request and stores it into the membatch. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
-func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) {
- // Write the node content to disk
- if err := dbw.Put(req.hash[:], req.data); err != nil {
- return err
- }
+func (s *TrieSync) commit(req *request) (err error) {
+ // Write the node content to the membatch
+ s.membatch.batch[req.hash] = req.data
+ s.membatch.order = append(s.membatch.order, req.hash)
+
delete(s.requests, req.hash)
// Check all parents for completion
for _, parent := range req.parents {
parent.deps--
if parent.deps == 0 {
- if err := s.commit(parent, dbw); err != nil {
+ if err := s.commit(parent); err != nil {
return err
}
}
diff --git a/trie/sync_test.go b/trie/sync_test.go
index d778555b9..ec16a25bd 100644
--- a/trie/sync_test.go
+++ b/trie/sync_test.go
@@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) {
}
results[i] = SyncResult{hash, data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two tries are in sync
@@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
}
results[i] = SyncResult{hash, data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[len(results):], sched.Missing(10000)...)
}
// Cross check that the two tries are in sync
@@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
@@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
for _, result := range results {
delete(queue, result.Hash)
}
@@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[:0], sched.Missing(0)...)
}
// Cross check that the two tries are in sync
@@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
// Process each of the trie nodes
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
for _, result := range results {
added = append(added, result.Hash)
}