aboutsummaryrefslogtreecommitdiffstats
path: root/trie/sync.go
blob: 1e4f8d87c138aa9f227ab532618eeada10b73d90 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package trie

import (
    "errors"
    "fmt"

    "github.com/ethereum/go-ethereum/common"
    "gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

// ErrNotRequested is returned by the trie sync when it's requested to process a
// node it did not request.
var ErrNotRequested = errors.New("not requested")

// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
// node it already processed previously.
var ErrAlreadyProcessed = errors.New("already processed")

// request represents a scheduled or already in-flight state retrieval request.
type request struct {
    hash common.Hash // Hash of the node data content to retrieve
    data []byte      // Data content of the node, cached until all subtrees complete
    raw  bool        // Whether this is a raw entry (code) or a trie node

    parents []*request // Parent state nodes referencing this entry (notify all upon completion)
    depth   int        // Depth level within the trie the node is located to prioritise DFS
    deps    int        // Number of dependencies before allowed to commit this node

    callback TrieSyncLeafCallback // Callback to invoke if a leaf node it reached on this branch
}

// SyncResult is a simple list to return missing nodes along with their request
// hashes.
type SyncResult struct {
    Hash common.Hash // Hash of the originally unknown trie node
    Data []byte      // Data content of the retrieved node
}

// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
    batch map[common.Hash][]byte // In-memory membatch of recently completed items
    order []common.Hash          // Order of completion to prevent out-of-order data loss
}

// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
    return &syncMemBatch{
        batch: make(map[common.Hash][]byte),
        order: make([]common.Hash, 0, 256),
    }
}

// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a
// leaf node. It's used by state syncing to check if the leaf node requires some
// further data syncing.
type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error

// TrieSync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
type TrieSync struct {
    database DatabaseReader           // Persistent database to check for existing entries
    membatch *syncMemBatch            // Memory buffer to avoid frequest database writes
    requests map[common.Hash]*request // Pending requests pertaining to a key hash
    queue    *prque.Prque             // Priority queue with the pending requests
}

// NewTrieSync creates a new trie data download scheduler.
func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync {
    ts := &TrieSync{
        database: database,
        membatch: newSyncMemBatch(),
        requests: make(map[common.Hash]*request),
        queue:    prque.New(),
    }
    ts.AddSubTrie(root, 0, common.Hash{}, callback)
    return ts
}

// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback TrieSyncLeafCallback) {
    // Short circuit if the trie is empty or already known
    if root == emptyRoot {
        return
    }
    if _, ok := s.membatch.batch[root]; ok {
        return
    }
    key := root.Bytes()
    blob, _ := s.database.Get(key)
    if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
        return
    }
    // Assemble the new sub-trie sync request
    req := &request{
        hash:     root,
        depth:    depth,
        callback: callback,
    }
    // If this sub-trie has a designated parent, link them together
    if parent != (common.Hash{}) {
        ancestor := s.requests[parent]
        if ancestor == nil {
            panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
        }
        ancestor.deps++
        req.parents = append(req.parents, ancestor)
    }
    s.schedule(req)
}

// AddRawEntry schedules the direct retrieval of a state entry that should not be
// interpreted as a trie node, but rather accepted and stored into the database
// as is. This method's goal is to support misc state metadata retrievals (e.g.
// contract code).
func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) {
    // Short circuit if the entry is empty or already known
    if hash == emptyState {
        return
    }
    if _, ok := s.membatch.batch[hash]; ok {
        return
    }
    if blob, _ := s.database.Get(hash.Bytes()); blob != nil {
        return
    }
    // Assemble the new sub-trie sync request
    req := &request{
        hash:  hash,
        raw:   true,
        depth: depth,
    }
    // If this sub-trie has a designated parent, link them together
    if parent != (common.Hash{}) {
        ancestor := s.requests[parent]
        if ancestor == nil {
            panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent))
        }
        ancestor.deps++
        req.parents = append(req.parents, ancestor)
    }
    s.schedule(req)
}

// Missing retrieves the known missing nodes from the trie for retrieval.
func (s *TrieSync) Missing(max int) []common.Hash {
    requests := []common.Hash{}
    for !s.queue.Empty() && (max == 0 || len(requests) < max) {
        requests = append(requests, s.queue.PopItem().(common.Hash))
    }
    return requests
}

// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// it failed.
func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
    committed := false

    for i, item := range results {
        // If the item was not requested, bail out
        request := s.requests[item.Hash]
        if request == nil {
            return committed, i, ErrNotRequested
        }
        if request.data != nil {
            return committed, i, ErrAlreadyProcessed
        }
        // If the item is a raw entry request, commit directly
        if request.raw {
            request.data = item.Data
            s.commit(request)
            committed = true
            continue
        }
        // Decode the node data content and update the request
        node, err := decodeNode(item.Hash[:], item.Data, 0)
        if err != nil {
            return committed, i, err
        }
        request.data = item.Data

        // Create and schedule a request for all the children nodes
        requests, err := s.children(request, node)
        if err != nil {
            return committed, i, err
        }
        if len(requests) == 0 && request.deps == 0 {
            s.commit(request)
            committed = true
            continue
        }
        request.deps += len(requests)
        for _, child := range requests {
            s.schedule(child)
        }
    }
    return committed, 0, nil
}

// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning th enumber of items written and any occurred error.
func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) {
    // Dump the membatch into a database dbw
    for i, key := range s.membatch.order {
        if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
            return i, err
        }
    }
    written := len(s.membatch.order)

    // Drop the membatch data and return
    s.membatch = newSyncMemBatch()
    return written, nil
}

// Pending returns the number of state entries currently pending for download.
func (s *TrieSync) Pending() int {
    return len(s.requests)
}

// schedule inserts a new state retrieval request into the fetch queue. If there
// is already a pending request for this node, the new request will be discarded
// and only a parent reference added to the old one.
func (s *TrieSync) schedule(req *request) {
    // If we're already requesting this node, add a new reference and stop
    if old, ok := s.requests[req.hash]; ok {
        old.parents = append(old.parents, req.parents...)
        return
    }
    // Schedule the request for future retrieval
    s.queue.Push(req.hash, float32(req.depth))
    s.requests[req.hash] = req
}

// children retrieves all the missing children of a state trie entry for future
// retrieval scheduling.
func (s *TrieSync) children(req *request, object node) ([]*request, error) {
    // Gather all the children of the node, irrelevant whether known or not
    type child struct {
        node  node
        depth int
    }
    children := []child{}

    switch node := (object).(type) {
    case *shortNode:
        children = []child{{
            node:  node.Val,
            depth: req.depth + len(node.Key),
        }}
    case *fullNode:
        for i := 0; i < 17; i++ {
            if node.Children[i] != nil {
                children = append(children, child{
                    node:  node.Children[i],
                    depth: req.depth + 1,
                })
            }
        }
    default:
        panic(fmt.Sprintf("unknown node: %+v", node))
    }
    // Iterate over the children, and request all unknown ones
    requests := make([]*request, 0, len(children))
    for _, child := range children {
        // Notify any external watcher of a new key/value node
        if req.callback != nil {
            if node, ok := (child.node).(valueNode); ok {
                if err := req.callback(node, req.hash); err != nil {
                    return nil, err
                }
            }
        }
        // If the child references another node, resolve or schedule
        if node, ok := (child.node).(hashNode); ok {
            // Try to resolve the node from the local database
            hash := common.BytesToHash(node)
            if _, ok := s.membatch.batch[hash]; ok {
                continue
            }
            blob, _ := s.database.Get(node)
            if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil {
                continue
            }
            // Locally unknown node, schedule for retrieval
            requests = append(requests, &request{
                hash:     hash,
                parents:  []*request{req},
                depth:    child.depth,
                callback: req.callback,
            })
        }
    }
    return requests, nil
}

// commit finalizes a retrieval request and stores it into the membatch. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
func (s *TrieSync) commit(req *request) (err error) {
    // Write the node content to the membatch
    s.membatch.batch[req.hash] = req.data
    s.membatch.order = append(s.membatch.order, req.hash)

    delete(s.requests, req.hash)

    // Check all parents for completion
    for _, parent := range req.parents {
        parent.deps--
        if parent.deps == 0 {
            if err := s.commit(parent); err != nil {
                return err
            }
        }
    }
    return nil
}