diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/blockchain.go | 25 | ||||
-rw-r--r-- | core/state/sync.go | 98 | ||||
-rw-r--r-- | core/state/sync_test.go | 238 |
3 files changed, 357 insertions, 4 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index b68e7d3ae..6c8a24751 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" "github.com/hashicorp/golang-lru" ) @@ -246,6 +247,26 @@ func (bc *BlockChain) SetHead(head uint64) { bc.loadLastState() } +// FastSyncCommitHead sets the current head block to the one defined by the hash +// irrelevant what the chain contents were prior. +func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error { + // Make sure that both the block as well at it's state trie exists + block := self.GetBlock(hash) + if block == nil { + return fmt.Errorf("non existent block [%x…]", hash[:4]) + } + if _, err := trie.NewSecure(block.Root(), self.chainDb); err != nil { + return err + } + // If all checks out, manually set the head block + self.mu.Lock() + self.currentBlock = block + self.mu.Unlock() + + glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4]) + return nil +} + func (self *BlockChain) GasLimit() *big.Int { self.mu.RLock() defer self.mu.RUnlock() @@ -721,10 +742,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain self.wg.Add(1) defer self.wg.Done() - // Make sure only one thread manipulates the chain at once - self.chainmu.Lock() - defer self.chainmu.Unlock() - // Collect some import statistics to report on stats := struct{ processed, ignored int }{} start := time.Now() diff --git a/core/state/sync.go b/core/state/sync.go new file mode 100644 index 000000000..e9bebe8ee --- /dev/null +++ b/core/state/sync.go @@ -0,0 +1,98 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package state + +import ( + "bytes" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +type StateSync struct { + db ethdb.Database + sync *trie.TrieSync + codeReqs map[common.Hash]struct{} // requested but not yet written to database + codeReqList []common.Hash // requested since last Missing +} + +var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil)) + +func NewStateSync(root common.Hash, db ethdb.Database) *StateSync { + ss := &StateSync{ + db: db, + codeReqs: make(map[common.Hash]struct{}), + } + ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash + ss.sync = trie.NewTrieSync(root, db, ss.leafFound) + return ss +} + +func (self *StateSync) leafFound(leaf []byte, parent common.Hash) error { + var obj struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + CodeHash []byte + } + if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { + return err + } + self.sync.AddSubTrie(obj.Root, 64, parent, nil) + + codehash := common.BytesToHash(obj.CodeHash) + if _, ok := self.codeReqs[codehash]; !ok { + code, _ := self.db.Get(obj.CodeHash) + if code == nil { + self.codeReqs[codehash] = struct{}{} + self.codeReqList = append(self.codeReqList, codehash) + } + } + return nil +} + +func (self *StateSync) Missing(max int) []common.Hash { + cr := len(self.codeReqList) + gh := 0 + if max != 0 { + if cr > max { + cr = max + } + gh = max - cr + } + list := append(self.sync.Missing(gh), self.codeReqList[:cr]...) + self.codeReqList = self.codeReqList[cr:] + return list +} + +func (self *StateSync) Process(list []trie.SyncResult) error { + for i := 0; i < len(list); i++ { + if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node + self.db.Put(list[i].Hash[:], list[i].Data) + delete(self.codeReqs, list[i].Hash) + list[i] = list[len(list)-1] + list = list[:len(list)-1] + i-- + } + } + _, err := self.sync.Process(list) + return err +} diff --git a/core/state/sync_test.go b/core/state/sync_test.go new file mode 100644 index 000000000..f6afe8bd8 --- /dev/null +++ b/core/state/sync_test.go @@ -0,0 +1,238 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package state + +import ( + "bytes" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/trie" +) + +// testAccount is the data associated with an account used by the state tests. +type testAccount struct { + address common.Address + balance *big.Int + nonce uint64 + code []byte +} + +// makeTestState create a sample test state to test node-wise reconstruction. +func makeTestState() (ethdb.Database, common.Hash, []*testAccount) { + // Create an empty state + db, _ := ethdb.NewMemDatabase() + state := New(common.Hash{}, db) + + // Fill it with some arbitrary data + accounts := []*testAccount{} + for i := byte(0); i < 255; i++ { + obj := state.GetOrNewStateObject(common.BytesToAddress([]byte{i})) + acc := &testAccount{address: common.BytesToAddress([]byte{i})} + + obj.AddBalance(big.NewInt(int64(11 * i))) + acc.balance = big.NewInt(int64(11 * i)) + + obj.SetNonce(uint64(42 * i)) + acc.nonce = uint64(42 * i) + + if i%3 == 0 { + obj.SetCode([]byte{i, i, i, i, i}) + acc.code = []byte{i, i, i, i, i} + } + state.UpdateStateObject(obj) + accounts = append(accounts, acc) + } + root, _ := state.Commit() + + // Return the generated state + return db, root, accounts +} + +// checkStateAccounts cross references a reconstructed state with an expected +// account array. +func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) { + state := New(root, db) + for i, acc := range accounts { + + if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 { + t.Errorf("account %d: balance mismatch: have %v, want %v", i, balance, acc.balance) + } + if nonce := state.GetNonce(acc.address); nonce != acc.nonce { + t.Errorf("account %d: nonce mismatch: have %v, want %v", i, nonce, acc.nonce) + } + if code := state.GetCode(acc.address); bytes.Compare(code, acc.code) != 0 { + t.Errorf("account %d: code mismatch: have %x, want %x", i, code, acc.code) + } + } +} + +// Tests that an empty state is not scheduled for syncing. +func TestEmptyStateSync(t *testing.T) { + empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + db, _ := ethdb.NewMemDatabase() + if req := NewStateSync(empty, db).Missing(1); len(req) != 0 { + t.Errorf("content requested for empty state: %v", req) + } +} + +// Tests that given a root hash, a state can sync iteratively on a single thread, +// requesting retrieval tasks and returning all of them in one go. +func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) } +func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) } + +func testIterativeStateSync(t *testing.T, batch int) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := append([]common.Hash{}, sched.Missing(batch)...) + for len(queue) > 0 { + results := make([]trie.SyncResult, len(queue)) + for i, hash := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results[i] = trie.SyncResult{hash, data} + } + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + queue = append(queue[:0], sched.Missing(batch)...) + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} + +// Tests that the trie scheduler can correctly reconstruct the state even if only +// partial results are returned, and the others sent only later. +func TestIterativeDelayedStateSync(t *testing.T) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := append([]common.Hash{}, sched.Missing(0)...) + for len(queue) > 0 { + // Sync only half of the scheduled nodes + results := make([]trie.SyncResult, len(queue)/2+1) + for i, hash := range queue[:len(results)] { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results[i] = trie.SyncResult{hash, data} + } + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + queue = append(queue[len(results):], sched.Missing(0)...) + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} + +// Tests that given a root hash, a trie can sync iteratively on a single thread, +// requesting retrieval tasks and returning all of them in one go, however in a +// random order. +func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) } +func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) } + +func testIterativeRandomStateSync(t *testing.T, batch int) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(batch) { + queue[hash] = struct{}{} + } + for len(queue) > 0 { + // Fetch all the queued nodes in a random order + results := make([]trie.SyncResult, 0, len(queue)) + for hash, _ := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results = append(results, trie.SyncResult{hash, data}) + } + // Feed the retrieved results back and queue new tasks + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + queue = make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(batch) { + queue[hash] = struct{}{} + } + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} + +// Tests that the trie scheduler can correctly reconstruct the state even if only +// partial results are returned (Even those randomly), others sent only later. +func TestIterativeRandomDelayedStateSync(t *testing.T) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(0) { + queue[hash] = struct{}{} + } + for len(queue) > 0 { + // Sync only half of the scheduled nodes, even those in random order + results := make([]trie.SyncResult, 0, len(queue)/2+1) + for hash, _ := range queue { + delete(queue, hash) + + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results = append(results, trie.SyncResult{hash, data}) + + if len(results) >= cap(results) { + break + } + } + // Feed the retrieved results back and queue new tasks + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + for _, hash := range sched.Missing(0) { + queue[hash] = struct{}{} + } + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} |