aboutsummaryrefslogblamecommitdiffstats
path: root/trie/sync.go
blob: 88d6eb77993b78c2a7d66c2cd67cd312af84f266 (plain) (tree)


















                                                                                  
                


                                                
                                               


                                                          



                                                                                



                                                                                    

                                                                               


                                                                                        

                                                                                                    
                                                                                               

                                                                                       
                                                                                            








                                                                               


                                                                             
                                                                                      










                                                                                             
                                                                            
                                                                                 
                                                            
                  
                                                                                              
                                                                                            



                                                                                      


                                                                                      
                                   
                                            







                                                                                     
                                                                                                   
                                                              


                              


                                                

                                      
                                                                               

                      
                                                 
                        















                                                                                     



                                                                                 
                                                                             



                                                               


                                                
                                                      




                                                 
                            













                                                                                      
                                                                         
                                               






                                                                            


                                                                                
                                                                 

                          



                                                          
                                                            
                 


                                                                
                                                                      
                                
                                                
                                         
                                        

                                
                                                                      
                                                                   
                               
                                                
                 


                                                                           
                                                          
                               
                                                

                                                            
                                         
                                        






                                                
                                

 
                                                                            
                                                                         
                                                      












                                                                              
                                                                              
                              


                              


                                                                                
                                       











                                                                               
                                                                        

                                                                               
                          



                             
                                        
                        
                                    
                                        

                                                         
                       
                                         
                                                    
                                                                  
                                                                











                                                                      
                                                                     





                                                                                    
                                                            
                                                                          



                                                                
                                                              



                                                                       
                                               








                                                          
                                                                               

                                                                                
                                                 



                                                             





                                            
                                                                





                                          
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package trie

import (
    "errors"
    "fmt"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/ethdb"
    "gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

// ErrNotRequested is returned by the trie sync when it's requested to process a
// node it did not request.
var ErrNotRequested = errors.New("not requested")

// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
// node it already processed previously.
var ErrAlreadyProcessed = errors.New("already processed")

// request represents a scheduled or already in-flight state retrieval request.
type request struct {
    hash common.Hash // Hash of the node data content to retrieve
    data []byte      // Data content of the node, cached until all subtrees complete
    raw  bool        // Whether this is a raw entry (code) or a trie node

    parents []*request // Parent state nodes referencing this entry (notify all upon completion)
    depth   int        // Depth level within the trie the node is located to prioritise DFS
    deps    int        // Number of dependencies before allowed to commit this node

    callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
}

// SyncResult is a simple list to return missing nodes along with their request
// hashes.
type SyncResult struct {
    Hash common.Hash // Hash of the originally unknown trie node
    Data []byte      // Data content of the retrieved node
}

// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
    batch map[common.Hash][]byte // In-memory membatch of recently completed items
    order []common.Hash          // Order of completion to prevent out-of-order data loss
}

// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
    return &syncMemBatch{
        batch: make(map[common.Hash][]byte),
        order: make([]common.Hash, 0, 256),
    }
}

// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
type Sync struct {
    database DatabaseReader           // Persistent database to check for existing entries
    membatch *syncMemBatch            // Memory buffer to avoid frequent database writes
    requests map[common.Hash]*request // Pending requests pertaining to a key hash
    queue    *prque.Prque             // Priority queue with the pending requests
}

// NewSync creates a new trie data download scheduler.
func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *Sync {
    ts := &Sync{
        database: database,
        membatch: newSyncMemBatch(),
        requests: make(map[common.Hash]*request),
        queue:    prque.New(),
    }
    ts.AddSubTrie(root, 0, common.Hash{}, callback)
    return ts
}

// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) {
    // Short circuit if the trie is empty or already known
    if root == emptyRoot {
        return
    }
    if _, ok := s.membatch.batch[root]; ok {
        return
    }
    key := root.Bytes()
    blob, _ := s.database.Get(key)
    if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
        return
    }
    // Assemble the new sub-trie sync request
    req := &request{
        hash:     root,
        depth:    depth,
        callback: callback,
    }
    // If this sub-trie has a designated parent, link them together
    if parent != (common.Hash{}) {
        ancestor := s.requests[parent]
        if ancestor == nil {
            panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
        }
        ancestor.deps++
        req.parents = append(req.parents, ancestor)
    }
    s.schedule(req)
}

// AddRawEntry schedules the direct retrieval of a state entry that should not be
// interpreted as a trie node, but rather accepted and stored into the database
// as is. This method's goal is to support misc state metadata retrievals (e.g.
// contract code).
func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) {
    // Short circuit if the entry is empty or already known
    if hash == emptyState {
        return
    }
    if _, ok := s.membatch.batch[hash]; ok {
        return
    }
    if ok, _ := s.database.Has(hash.Bytes()); ok {
        return
    }
    // Assemble the new sub-trie sync request
    req := &request{
        hash:  hash,
        raw:   true,
        depth: depth,
    }
    // If this sub-trie has a designated parent, link them together
    if parent != (common.Hash{}) {
        ancestor := s.requests[parent]
        if ancestor == nil {
            panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent))
        }
        ancestor.deps++
        req.parents = append(req.parents, ancestor)
    }
    s.schedule(req)
}

// Missing retrieves the known missing nodes from the trie for retrieval.
func (s *Sync) Missing(max int) []common.Hash {
    requests := []common.Hash{}
    for !s.queue.Empty() && (max == 0 || len(requests) < max) {
        requests = append(requests, s.queue.PopItem().(common.Hash))
    }
    return requests
}

// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// it failed.
func (s *Sync) Process(results []SyncResult) (bool, int, error) {
    committed := false

    for i, item := range results {
        // If the item was not requested, bail out
        request := s.requests[item.Hash]
        if request == nil {
            return committed, i, ErrNotRequested
        }
        if request.data != nil {
            return committed, i, ErrAlreadyProcessed
        }
        // If the item is a raw entry request, commit directly
        if request.raw {
            request.data = item.Data
            s.commit(request)
            committed = true
            continue
        }
        // Decode the node data content and update the request
        node, err := decodeNode(item.Hash[:], item.Data, 0)
        if err != nil {
            return committed, i, err
        }
        request.data = item.Data

        // Create and schedule a request for all the children nodes
        requests, err := s.children(request, node)
        if err != nil {
            return committed, i, err
        }
        if len(requests) == 0 && request.deps == 0 {
            s.commit(request)
            committed = true
            continue
        }
        request.deps += len(requests)
        for _, child := range requests {
            s.schedule(child)
        }
    }
    return committed, 0, nil
}

// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning the number of items written and any occurred error.
func (s *Sync) Commit(dbw ethdb.Putter) (int, error) {
    // Dump the membatch into a database dbw
    for i, key := range s.membatch.order {
        if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
            return i, err
        }
    }
    written := len(s.membatch.order)

    // Drop the membatch data and return
    s.membatch = newSyncMemBatch()
    return written, nil
}

// Pending returns the number of state entries currently pending for download.
func (s *Sync) Pending() int {
    return len(s.requests)
}

// schedule inserts a new state retrieval request into the fetch queue. If there
// is already a pending request for this node, the new request will be discarded
// and only a parent reference added to the old one.
func (s *Sync) schedule(req *request) {
    // If we're already requesting this node, add a new reference and stop
    if old, ok := s.requests[req.hash]; ok {
        old.parents = append(old.parents, req.parents...)
        return
    }
    // Schedule the request for future retrieval
    s.queue.Push(req.hash, float32(req.depth))
    s.requests[req.hash] = req
}

// children retrieves all the missing children of a state trie entry for future
// retrieval scheduling.
func (s *Sync) children(req *request, object node) ([]*request, error) {
    // Gather all the children of the node, irrelevant whether known or not
    type child struct {
        node  node
        depth int
    }
    children := []child{}

    switch node := (object).(type) {
    case *shortNode:
        children = []child{{
            node:  node.Val,
            depth: req.depth + len(node.Key),
        }}
    case *fullNode:
        for i := 0; i < 17; i++ {
            if node.Children[i] != nil {
                children = append(children, child{
                    node:  node.Children[i],
                    depth: req.depth + 1,
                })
            }
        }
    default:
        panic(fmt.Sprintf("unknown node: %+v", node))
    }
    // Iterate over the children, and request all unknown ones
    requests := make([]*request, 0, len(children))
    for _, child := range children {
        // Notify any external watcher of a new key/value node
        if req.callback != nil {
            if node, ok := (child.node).(valueNode); ok {
                if err := req.callback(node, req.hash); err != nil {
                    return nil, err
                }
            }
        }
        // If the child references another node, resolve or schedule
        if node, ok := (child.node).(hashNode); ok {
            // Try to resolve the node from the local database
            hash := common.BytesToHash(node)
            if _, ok := s.membatch.batch[hash]; ok {
                continue
            }
            if ok, _ := s.database.Has(node); ok {
                continue
            }
            // Locally unknown node, schedule for retrieval
            requests = append(requests, &request{
                hash:     hash,
                parents:  []*request{req},
                depth:    child.depth,
                callback: req.callback,
            })
        }
    }
    return requests, nil
}

// commit finalizes a retrieval request and stores it into the membatch. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
func (s *Sync) commit(req *request) (err error) {
    // Write the node content to the membatch
    s.membatch.batch[req.hash] = req.data
    s.membatch.order = append(s.membatch.order, req.hash)

    delete(s.requests, req.hash)

    // Check all parents for completion
    for _, parent := range req.parents {
        parent.deps--
        if parent.deps == 0 {
            if err := s.commit(parent); err != nil {
                return err
            }
        }
    }
    return nil
}