From a2d5a60418e70ce56112381dffdd121cc678a1b6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 18 Aug 2015 14:14:45 +0200 Subject: core, core/state: batch-based state sync --- core/block_processor.go | 20 +++++------ core/chain_makers.go | 14 +++++--- core/chain_manager.go | 8 ++--- core/genesis.go | 16 +++++---- core/state/state_object.go | 8 ++--- core/state/state_test.go | 3 +- core/state/statedb.go | 86 +++++++++++++++++++++++----------------------- miner/worker.go | 5 ++- tests/block_test_util.go | 12 +++---- tests/state_test_util.go | 8 ++--- 10 files changed, 90 insertions(+), 90 deletions(-) diff --git a/core/block_processor.go b/core/block_processor.go index 238b2db95..40590bdc5 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -100,10 +100,8 @@ func (self *BlockProcessor) ApplyTransaction(gp GasPool, statedb *state.StateDB, } // Update the state with pending changes - statedb.SyncIntermediate() - usedGas.Add(usedGas, gas) - receipt := types.NewReceipt(statedb.Root().Bytes(), usedGas) + receipt := types.NewReceipt(statedb.IntermediateRoot().Bytes(), usedGas) receipt.TxHash = tx.Hash() receipt.GasUsed = new(big.Int).Set(gas) if MessageCreatesContract(tx) { @@ -265,16 +263,16 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st // Accumulate static rewards; block reward, uncle's and uncle inclusion. AccumulateRewards(state, header, uncles) - // Commit state objects/accounts to a temporary trie (does not save) - // used to calculate the state root. - state.SyncObjects() - if header.Root != state.Root() { - err = fmt.Errorf("invalid merkle root. received=%x got=%x", header.Root, state.Root()) - return + // Commit state objects/accounts to a database batch and calculate + // the state root. The database is not modified if the root + // doesn't match. + root, batch := state.CommitBatch() + if header.Root != root { + return nil, nil, fmt.Errorf("invalid merkle root: header=%x computed=%x", header.Root, root) } - // Sync the current block's state to the database - state.Sync() + // Execute the database writes. + batch.Write() return state.Logs(), receipts, nil } diff --git a/core/chain_makers.go b/core/chain_makers.go index 70233438d..3af9b0b89 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -17,6 +17,7 @@ package core import ( + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -94,9 +95,9 @@ func (b *BlockGen) AddTx(tx *types.Transaction) { if err != nil { panic(err) } - b.statedb.SyncIntermediate() + root := b.statedb.IntermediateRoot() b.header.GasUsed.Add(b.header.GasUsed, gas) - receipt := types.NewReceipt(b.statedb.Root().Bytes(), b.header.GasUsed) + receipt := types.NewReceipt(root.Bytes(), b.header.GasUsed) logs := b.statedb.GetLogs(tx.Hash()) receipt.SetLogs(logs) receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) @@ -163,8 +164,11 @@ func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int, gen(i, b) } AccumulateRewards(statedb, h, b.uncles) - statedb.SyncIntermediate() - h.Root = statedb.Root() + root, err := statedb.Commit() + if err != nil { + panic(fmt.Sprintf("state write error: %v", err)) + } + h.Root = root return types.NewBlock(h, b.txs, b.uncles, b.receipts) } for i := 0; i < n; i++ { @@ -184,7 +188,7 @@ func makeHeader(parent *types.Block, state *state.StateDB) *types.Header { time = new(big.Int).Add(parent.Time(), big.NewInt(10)) // block time is fixed at 10 seconds } return &types.Header{ - Root: state.Root(), + Root: state.IntermediateRoot(), ParentHash: parent.Hash(), Coinbase: parent.Coinbase(), Difficulty: CalcDifficulty(time.Uint64(), new(big.Int).Sub(time, big.NewInt(10)).Uint64(), parent.Number(), parent.Difficulty()), diff --git a/core/chain_manager.go b/core/chain_manager.go index 42f70af33..0fb472308 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -839,8 +839,8 @@ out: } func blockErr(block *types.Block, err error) { - h := block.Header() - glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes()) - glog.V(logger.Error).Infoln(err) - glog.V(logger.Debug).Infoln(verifyNonces) + if glog.V(logger.Error) { + glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex()) + glog.Errorf(" %v", err) + } } diff --git a/core/genesis.go b/core/genesis.go index b2346da65..bf97da2e2 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -69,7 +69,7 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block, statedb.SetState(address, common.HexToHash(key), common.HexToHash(value)) } } - statedb.SyncObjects() + root, stateBatch := statedb.CommitBatch() difficulty := common.String2Big(genesis.Difficulty) block := types.NewBlock(&types.Header{ @@ -81,7 +81,7 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block, Difficulty: difficulty, MixDigest: common.HexToHash(genesis.Mixhash), Coinbase: common.HexToAddress(genesis.Coinbase), - Root: statedb.Root(), + Root: root, }, nil, nil, nil) if block := GetBlock(chainDb, block.Hash()); block != nil { @@ -92,8 +92,10 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block, } return block, nil } - statedb.Sync() + if err := stateBatch.Write(); err != nil { + return nil, fmt.Errorf("cannot write state: %v", err) + } if err := WriteTd(chainDb, block.Hash(), difficulty); err != nil { return nil, err } @@ -115,12 +117,14 @@ func GenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big statedb := state.New(common.Hash{}, db) obj := statedb.GetOrNewStateObject(addr) obj.SetBalance(balance) - statedb.SyncObjects() - statedb.Sync() + root, err := statedb.Commit() + if err != nil { + panic(fmt.Sprintf("cannot write state: %v", err)) + } block := types.NewBlock(&types.Header{ Difficulty: params.GenesisDifficulty, GasLimit: params.GenesisGasLimit, - Root: statedb.Root(), + Root: root, }, nil, nil, nil) return block } diff --git a/core/state/state_object.go b/core/state/state_object.go index 251a31093..40af9ed9c 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -218,6 +218,7 @@ func (c *StateObject) ReturnGas(gas, price *big.Int) {} func (self *StateObject) SetGasLimit(gasLimit *big.Int) { self.gasPool = new(big.Int).Set(gasLimit) + self.dirty = true if glog.V(logger.Core) { glog.Infof("%x: gas (+ %v)", self.Address(), self.gasPool) @@ -228,19 +229,14 @@ func (self *StateObject) SubGas(gas, price *big.Int) error { if self.gasPool.Cmp(gas) < 0 { return GasLimitError(self.gasPool, gas) } - self.gasPool.Sub(self.gasPool, gas) - - rGas := new(big.Int).Set(gas) - rGas.Mul(rGas, price) - self.dirty = true - return nil } func (self *StateObject) AddGas(gas, price *big.Int) { self.gasPool.Add(self.gasPool, gas) + self.dirty = true } func (self *StateObject) Copy() *StateObject { diff --git a/core/state/state_test.go b/core/state/state_test.go index 60836738e..b5a7f4081 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -89,8 +89,7 @@ func TestNull(t *testing.T) { //value := common.FromHex("0x823140710bf13990e4500136726d8b55") var value common.Hash state.SetState(address, common.Hash{}, value) - state.SyncIntermediate() - state.Sync() + state.Commit() value = state.GetState(address, common.Hash{}) if !common.EmptyHash(value) { t.Errorf("expected empty hash. got %x", value) diff --git a/core/state/statedb.go b/core/state/statedb.go index c2bc99564..4233c763b 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -35,7 +35,6 @@ import ( type StateDB struct { db ethdb.Database trie *trie.SecureTrie - root common.Hash stateObjects map[string]*StateObject @@ -56,7 +55,6 @@ func New(root common.Hash, db ethdb.Database) *StateDB { glog.Errorf("can't create state trie with root %x: %v", root[:], err) } return &StateDB{ - root: root, db: db, trie: tr, stateObjects: make(map[string]*StateObject), @@ -204,7 +202,6 @@ func (self *StateDB) UpdateStateObject(stateObject *StateObject) { if len(stateObject.CodeHash()) > 0 { self.db.Put(stateObject.CodeHash(), stateObject.code) } - addr := stateObject.Address() self.trie.Update(addr[:], stateObject.RlpEncode()) } @@ -215,6 +212,7 @@ func (self *StateDB) DeleteStateObject(stateObject *StateObject) { addr := stateObject.Address() self.trie.Delete(addr[:]) + //delete(self.stateObjects, addr.Str()) } // Retrieve a state object given my the address. Nil if not found @@ -311,65 +309,67 @@ func (self *StateDB) Set(state *StateDB) { self.logSize = state.logSize } -func (s *StateDB) Root() common.Hash { - return s.trie.Hash() -} - -// Syncs the trie and all siblings -func (s *StateDB) Sync() { - // Sync all nested states +// IntermediateRoot computes the current root hash of the state trie. +// It is called in between transactions to get the root hash that +// goes into transaction receipts. +func (s *StateDB) IntermediateRoot() common.Hash { + s.refund = new(big.Int) for _, stateObject := range s.stateObjects { - stateObject.trie.Commit() - } - - s.trie.Commit() - - s.Empty() -} - -func (self *StateDB) Empty() { - self.stateObjects = make(map[string]*StateObject) - self.refund = new(big.Int) -} - -func (self *StateDB) Refunds() *big.Int { - return self.refund -} - -// SyncIntermediate updates the intermediate state and all mid steps -func (self *StateDB) SyncIntermediate() { - self.refund = new(big.Int) - - for _, stateObject := range self.stateObjects { if stateObject.dirty { if stateObject.remove { - self.DeleteStateObject(stateObject) + s.DeleteStateObject(stateObject) } else { stateObject.Update() - - self.UpdateStateObject(stateObject) + s.UpdateStateObject(stateObject) } stateObject.dirty = false } } + return s.trie.Hash() } -// SyncObjects syncs the changed objects to the trie -func (self *StateDB) SyncObjects() { - self.trie, _ = trie.NewSecure(self.root, self.db) +// Commit commits all state changes to the database. +func (s *StateDB) Commit() (root common.Hash, err error) { + return s.commit(s.db) +} + +// CommitBatch commits all state changes to a write batch but does not +// execute the batch. It is used to validate state changes against +// the root hash stored in a block. +func (s *StateDB) CommitBatch() (root common.Hash, batch ethdb.Batch) { + batch = s.db.NewBatch() + root, _ = s.commit(batch) + return root, batch +} - self.refund = new(big.Int) +func (s *StateDB) commit(db trie.DatabaseWriter) (common.Hash, error) { + s.refund = new(big.Int) - for _, stateObject := range self.stateObjects { + for _, stateObject := range s.stateObjects { if stateObject.remove { - self.DeleteStateObject(stateObject) + // If the object has been removed, don't bother syncing it + // and just mark it for deletion in the trie. + s.DeleteStateObject(stateObject) } else { + // Write any storage changes in the state object to its trie. stateObject.Update() - - self.UpdateStateObject(stateObject) + // Commit the trie of the object to the batch. + // This updates the trie root internally, so + // getting the root hash of the storage trie + // through UpdateStateObject is fast. + if _, err := stateObject.trie.CommitTo(db); err != nil { + return common.Hash{}, err + } + // Update the object in the account trie. + s.UpdateStateObject(stateObject) } stateObject.dirty = false } + return s.trie.CommitTo(db) +} + +func (self *StateDB) Refunds() *big.Int { + return self.refund } // Debug stuff diff --git a/miner/worker.go b/miner/worker.go index 22d0b9b6e..098f42a72 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -266,7 +266,6 @@ func (self *worker) wait() { block := result.Block work := result.Work - work.state.Sync() if self.fullValidation { if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil { glog.V(logger.Error).Infoln("mining err", err) @@ -274,6 +273,7 @@ func (self *worker) wait() { } go self.mux.Post(core.NewMinedBlockEvent{block}) } else { + work.state.Commit() parent := self.chain.GetBlock(block.ParentHash()) if parent == nil { glog.V(logger.Error).Infoln("Invalid block found during mining") @@ -528,8 +528,7 @@ func (self *worker) commitNewWork() { if atomic.LoadInt32(&self.mining) == 1 { // commit state root after all state transitions. core.AccumulateRewards(work.state, header, uncles) - work.state.SyncObjects() - header.Root = work.state.Root() + header.Root = work.state.IntermediateRoot() } // create the new block whose nonce will be mined. diff --git a/tests/block_test_util.go b/tests/block_test_util.go index 3ca00bae8..33577cf55 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -253,13 +253,13 @@ func (t *BlockTest) InsertPreState(ethereum *eth.Ethereum) (*state.StateDB, erro statedb.SetState(common.HexToAddress(addrString), common.HexToHash(k), common.HexToHash(v)) } } - // sync objects to trie - statedb.SyncObjects() - // sync trie to disk - statedb.Sync() - if !bytes.Equal(t.Genesis.Root().Bytes(), statedb.Root().Bytes()) { - return nil, fmt.Errorf("computed state root does not match genesis block %x %x", t.Genesis.Root().Bytes()[:4], statedb.Root().Bytes()[:4]) + root, err := statedb.Commit() + if err != nil { + return nil, fmt.Errorf("error writing state: %v", err) + } + if t.Genesis.Root() != root { + return nil, fmt.Errorf("computed state root does not match genesis block: genesis=%x computed=%x", t.Genesis.Root().Bytes()[:4], root.Bytes()[:4]) } return statedb, nil } diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 086822461..669e90a1e 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -200,9 +200,9 @@ func runStateTest(test VmTest) error { } } - statedb.Sync() - if common.HexToHash(test.PostStateRoot) != statedb.Root() { - return fmt.Errorf("Post state root error. Expected %s, got %x", test.PostStateRoot, statedb.Root()) + root, _ := statedb.Commit() + if common.HexToHash(test.PostStateRoot) != root { + return fmt.Errorf("Post state root error. Expected %s, got %x", test.PostStateRoot, root) } // check logs @@ -246,7 +246,7 @@ func RunState(statedb *state.StateDB, env, tx map[string]string) ([]byte, state. if core.IsNonceErr(err) || core.IsInvalidTxErr(err) || state.IsGasLimitErr(err) { statedb.Set(snapshot) } - statedb.SyncObjects() + statedb.Commit() return ret, vmenv.state.Logs(), vmenv.Gas, err } -- cgit v1.2.3