From 5b0ee8ec304663898073b7a4c659e1def23716df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 13 Oct 2015 12:04:25 +0300 Subject: core, eth, trie: fix data races and merge/review issues --- core/block_processor.go | 14 +++++----- core/blockchain.go | 71 +++++++++++++++++++++++++++++-------------------- core/blockchain_test.go | 2 +- core/chain_util.go | 2 +- core/chain_util_test.go | 28 +++++++++---------- core/state/sync.go | 3 +-- core/state/sync_test.go | 4 +-- core/types/receipt.go | 10 +++---- core/vm/log.go | 2 +- 9 files changed, 75 insertions(+), 61 deletions(-) (limited to 'core') diff --git a/core/block_processor.go b/core/block_processor.go index 60f0258c4..5172636dd 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -195,14 +195,16 @@ func (sm *BlockProcessor) Process(block *types.Block) (logs vm.Logs, receipts ty defer sm.mutex.Unlock() if sm.bc.HasBlock(block.Hash()) { - return nil, nil, &KnownBlockError{block.Number(), block.Hash()} + if _, err := state.New(block.Root(), sm.chainDb); err == nil { + return nil, nil, &KnownBlockError{block.Number(), block.Hash()} + } } - - if !sm.bc.HasBlock(block.ParentHash()) { - return nil, nil, ParentError(block.ParentHash()) + if parent := sm.bc.GetBlock(block.ParentHash()); parent != nil { + if _, err := state.New(parent.Root(), sm.chainDb); err == nil { + return sm.processWithParent(block, parent) + } } - parent := sm.bc.GetBlock(block.ParentHash()) - return sm.processWithParent(block, parent) + return nil, nil, ParentError(block.ParentHash()) } func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs vm.Logs, receipts types.Receipts, err error) { diff --git a/core/blockchain.go b/core/blockchain.go index 490552ea0..f14ff363c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -18,11 +18,13 @@ package core import ( + crand "crypto/rand" "errors" "fmt" "io" + "math" "math/big" - "math/rand" + mrand "math/rand" "runtime" "sync" "sync/atomic" @@ -89,7 +91,8 @@ type BlockChain struct { procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup - pow pow.PoW + pow pow.PoW + rand *mrand.Rand } func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) { @@ -112,6 +115,12 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl futureBlocks: futureBlocks, pow: pow, } + // Seed a fast but crypto originating random generator + seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) + if err != nil { + return nil, err + } + bc.rand = mrand.New(mrand.NewSource(seed.Int64())) bc.genesisBlock = bc.GetBlockByNumber(0) if bc.genesisBlock == nil { @@ -178,21 +187,21 @@ func (self *BlockChain) loadLastState() error { fastTd := self.GetTd(self.currentFastBlock.Hash()) glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash().Bytes()[:4], headerTd) - glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd) glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd) + glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd) return nil } -// SetHead rewind the local chain to a new head entity. In the case of headers, -// everything above the new head will be deleted and the new one set. In the case -// of blocks though, the head may be further rewound if block bodies are missing -// (non-archive nodes after a fast sync). +// SetHead rewinds the local chain to a new head. In the case of headers, everything +// above the new head will be deleted and the new one set. In the case of blocks +// though, the head may be further rewound if block bodies are missing (non-archive +// nodes after a fast sync). func (bc *BlockChain) SetHead(head uint64) { bc.mu.Lock() defer bc.mu.Unlock() - // Figure out the highest known canonical assignment + // Figure out the highest known canonical headers and/or blocks height := uint64(0) if bc.currentHeader != nil { if hh := bc.currentHeader.Number.Uint64(); hh > height { @@ -266,7 +275,7 @@ func (bc *BlockChain) SetHead(head uint64) { // FastSyncCommitHead sets the current head block to the one defined by the hash // irrelevant what the chain contents were prior. func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error { - // Make sure that both the block as well at it's state trie exists + // Make sure that both the block as well at its state trie exists block := self.GetBlock(hash) if block == nil { return fmt.Errorf("non existent block [%x…]", hash[:4]) @@ -298,7 +307,7 @@ func (self *BlockChain) LastBlockHash() common.Hash { } // CurrentHeader retrieves the current head header of the canonical chain. The -// header is retrieved from the chain manager's internal cache. +// header is retrieved from the blockchain's internal cache. func (self *BlockChain) CurrentHeader() *types.Header { self.mu.RLock() defer self.mu.RUnlock() @@ -307,7 +316,7 @@ func (self *BlockChain) CurrentHeader() *types.Header { } // CurrentBlock retrieves the current head block of the canonical chain. The -// block is retrieved from the chain manager's internal cache. +// block is retrieved from the blockchain's internal cache. func (self *BlockChain) CurrentBlock() *types.Block { self.mu.RLock() defer self.mu.RUnlock() @@ -316,7 +325,7 @@ func (self *BlockChain) CurrentBlock() *types.Block { } // CurrentFastBlock retrieves the current fast-sync head block of the canonical -// chain. The block is retrieved from the chain manager's internal cache. +// chain. The block is retrieved from the blockchain's internal cache. func (self *BlockChain) CurrentFastBlock() *types.Block { self.mu.RLock() defer self.mu.RUnlock() @@ -353,7 +362,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) { bc.mu.Lock() defer bc.mu.Unlock() - // Prepare the genesis block and reinitialize the chain + // Prepare the genesis block and reinitialise the chain if err := WriteTd(bc.chainDb, genesis.Hash(), genesis.Difficulty()); err != nil { glog.Fatalf("failed to write genesis block TD: %v", err) } @@ -403,7 +412,7 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { // insert injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block to prevent them -// from diverging on a different header chain. +// from pointing to a possibly old canonical chain (i.e. side chain by now). // // Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) insert(block *types.Block) { @@ -625,10 +634,10 @@ const ( // writeHeader writes a header into the local chain, given that its parent is // already known. If the total difficulty of the newly inserted header becomes -// greater than the old known TD, the canonical chain is re-routed. +// greater than the current known TD, the canonical chain is re-routed. // // Note: This method is not concurrent-safe with inserting blocks simultaneously -// into the chain, as side effects caused by reorganizations cannot be emulated +// into the chain, as side effects caused by reorganisations cannot be emulated // without the real blocks. Hence, writing headers directly should only be done // in two scenarios: pure-header mode of operation (light clients), or properly // separated header/block phases (non-archive clients). @@ -678,10 +687,9 @@ func (self *BlockChain) writeHeader(header *types.Header) error { return nil } -// InsertHeaderChain will attempt to insert the given header chain in to the -// local chain, possibly creating a fork. If an error is returned, it will -// return the index number of the failing header as well an error describing -// what went wrong. +// InsertHeaderChain attempts to insert the given header chain in to the local +// chain, possibly creating a reorg. If an error is returned, it will return the +// index number of the failing header as well an error describing what went wrong. // // The verify parameter can be used to fine tune whether nonce verification // should be done or not. The reason behind the optional check is because some @@ -702,7 +710,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) // Generate the list of headers that should be POW verified verify := make([]bool, len(chain)) for i := 0; i < len(verify)/checkFreq; i++ { - index := i*checkFreq + rand.Intn(checkFreq) + index := i*checkFreq + self.rand.Intn(checkFreq) if index >= len(verify) { index = len(verify) - 1 } @@ -766,10 +774,6 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) pending.Wait() // If anything failed, report - if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") - return 0, nil - } if failed > 0 { for i, err := range errs { if err != nil { @@ -807,6 +811,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (self *BlockChain) Rollback(chain []common.Hash) { + self.mu.Lock() + defer self.mu.Unlock() + for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] @@ -905,6 +912,12 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain glog.Fatal(errs[index]) return } + if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil { + errs[index] = fmt.Errorf("failed to write log blooms: %v", err) + atomic.AddInt32(&failed, 1) + glog.Fatal(errs[index]) + return + } atomic.AddInt32(&stats.processed, 1) } } @@ -920,10 +933,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain pending.Wait() // If anything failed, report - if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") - return 0, nil - } if failed > 0 { for i, err := range errs { if err != nil { @@ -931,6 +940,10 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain } } } + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") + return 0, nil + } // Update the head fast sync block if better self.mu.Lock() head := blockChain[len(errs)-1] diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 01667c21e..8ddc5032b 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -452,7 +452,7 @@ func makeBlockChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.B func chm(genesis *types.Block, db ethdb.Database) *BlockChain { var eventMux event.TypeMux - bc := &BlockChain{chainDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}} + bc := &BlockChain{chainDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}, rand: rand.New(rand.NewSource(0))} bc.headerCache, _ = lru.New(100) bc.bodyCache, _ = lru.New(100) bc.bodyRLPCache, _ = lru.New(100) diff --git a/core/chain_util.go b/core/chain_util.go index 907e6668c..ddff381a1 100644 --- a/core/chain_util.go +++ b/core/chain_util.go @@ -394,7 +394,7 @@ func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) bloomDat, _ := db.Get(key) bloom := types.BytesToBloom(bloomDat) for _, receipt := range receipts { - for _, log := range receipt.Logs() { + for _, log := range receipt.Logs { bloom.Add(log.Address.Big()) } } diff --git a/core/chain_util_test.go b/core/chain_util_test.go index bc5aa9776..0bbcbbe53 100644 --- a/core/chain_util_test.go +++ b/core/chain_util_test.go @@ -345,15 +345,15 @@ func TestMipmapBloom(t *testing.T) { db, _ := ethdb.NewMemDatabase() receipt1 := new(types.Receipt) - receipt1.SetLogs(vm.Logs{ + receipt1.Logs = vm.Logs{ &vm.Log{Address: common.BytesToAddress([]byte("test"))}, &vm.Log{Address: common.BytesToAddress([]byte("address"))}, - }) + } receipt2 := new(types.Receipt) - receipt2.SetLogs(vm.Logs{ + receipt2.Logs = vm.Logs{ &vm.Log{Address: common.BytesToAddress([]byte("test"))}, &vm.Log{Address: common.BytesToAddress([]byte("address1"))}, - }) + } WriteMipmapBloom(db, 1, types.Receipts{receipt1}) WriteMipmapBloom(db, 2, types.Receipts{receipt2}) @@ -368,15 +368,15 @@ func TestMipmapBloom(t *testing.T) { // reset db, _ = ethdb.NewMemDatabase() receipt := new(types.Receipt) - receipt.SetLogs(vm.Logs{ + receipt.Logs = vm.Logs{ &vm.Log{Address: common.BytesToAddress([]byte("test"))}, - }) + } WriteMipmapBloom(db, 999, types.Receipts{receipt1}) receipt = new(types.Receipt) - receipt.SetLogs(vm.Logs{ + receipt.Logs = vm.Logs{ &vm.Log{Address: common.BytesToAddress([]byte("test 1"))}, - }) + } WriteMipmapBloom(db, 1000, types.Receipts{receipt}) bloom := GetMipmapBloom(db, 1000, 1000) @@ -403,22 +403,22 @@ func TestMipmapChain(t *testing.T) { defer db.Close() genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)}) - chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) { + chain, receipts := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) { var receipts types.Receipts switch i { case 1: receipt := types.NewReceipt(nil, new(big.Int)) - receipt.SetLogs(vm.Logs{ + receipt.Logs = vm.Logs{ &vm.Log{ Address: addr, Topics: []common.Hash{hash1}, }, - }) + } gen.AddUncheckedReceipt(receipt) receipts = types.Receipts{receipt} case 1000: receipt := types.NewReceipt(nil, new(big.Int)) - receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}}) + receipt.Logs = vm.Logs{&vm.Log{Address: addr2}} gen.AddUncheckedReceipt(receipt) receipts = types.Receipts{receipt} @@ -431,7 +431,7 @@ func TestMipmapChain(t *testing.T) { } WriteMipmapBloom(db, uint64(i+1), receipts) }) - for _, block := range chain { + for i, block := range chain { WriteBlock(db, block) if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { t.Fatalf("failed to insert block number: %v", err) @@ -439,7 +439,7 @@ func TestMipmapChain(t *testing.T) { if err := WriteHeadBlockHash(db, block.Hash()); err != nil { t.Fatalf("failed to insert block number: %v", err) } - if err := PutBlockReceipts(db, block, block.Receipts()); err != nil { + if err := PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil { t.Fatal("error writing block receipts:", err) } } diff --git a/core/state/sync.go b/core/state/sync.go index 5a388886c..ef2b4b84c 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -26,14 +26,13 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -// StateSync is the main state synchronisation scheduler, which provides yet the +// StateSync is the main state synchronisation scheduler, which provides yet the // unknown state hashes to retrieve, accepts node data associated with said hashes // and reconstructs the state database step by step until all is done. type StateSync trie.TrieSync // NewStateSync create a new state trie download scheduler. func NewStateSync(root common.Hash, database ethdb.Database) *StateSync { - // Pre-declare the result syncer t var syncer *trie.TrieSync callback := func(leaf []byte, parent common.Hash) error { diff --git a/core/state/sync_test.go b/core/state/sync_test.go index f0376d484..0dab372ba 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -38,7 +38,7 @@ type testAccount struct { func makeTestState() (ethdb.Database, common.Hash, []*testAccount) { // Create an empty state db, _ := ethdb.NewMemDatabase() - state := New(common.Hash{}, db) + state, _ := New(common.Hash{}, db) // Fill it with some arbitrary data accounts := []*testAccount{} @@ -68,7 +68,7 @@ func makeTestState() (ethdb.Database, common.Hash, []*testAccount) { // checkStateAccounts cross references a reconstructed state with an expected // account array. func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) { - state := New(root, db) + state, _ := New(root, db) for i, acc := range accounts { if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 { diff --git a/core/types/receipt.go b/core/types/receipt.go index aea5b3e91..e7d5203a3 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -67,7 +67,7 @@ func (r *Receipt) DecodeRLP(s *rlp.Stream) error { return nil } -// RlpEncode implements common.RlpEncode required for SHA derivation. +// RlpEncode implements common.RlpEncode required for SHA3 derivation. func (r *Receipt) RlpEncode() []byte { bytes, err := rlp.EncodeToBytes(r) if err != nil { @@ -82,7 +82,7 @@ func (r *Receipt) String() string { } // ReceiptForStorage is a wrapper around a Receipt that flattens and parses the -// entire content of a receipt, opposed to only the consensus fields originally. +// entire content of a receipt, as opposed to only the consensus fields originally. type ReceiptForStorage Receipt // EncodeRLP implements rlp.Encoder, and flattens all content fields of a receipt @@ -95,8 +95,8 @@ func (r *ReceiptForStorage) EncodeRLP(w io.Writer) error { return rlp.Encode(w, []interface{}{r.PostState, r.CumulativeGasUsed, r.Bloom, r.TxHash, r.ContractAddress, logs, r.GasUsed}) } -// DecodeRLP implements rlp.Decoder, and loads the consensus fields of a receipt -// from an RLP stream. +// DecodeRLP implements rlp.Decoder, and loads both consensus and implementation +// fields of a receipt from an RLP stream. func (r *ReceiptForStorage) DecodeRLP(s *rlp.Stream) error { var receipt struct { PostState []byte @@ -125,7 +125,7 @@ func (r *ReceiptForStorage) DecodeRLP(s *rlp.Stream) error { // Receipts is a wrapper around a Receipt array to implement types.DerivableList. type Receipts []*Receipt -// RlpEncode implements common.RlpEncode required for SHA derivation. +// RlpEncode implements common.RlpEncode required for SHA3 derivation. func (r Receipts) RlpEncode() []byte { bytes, err := rlp.EncodeToBytes(r) if err != nil { diff --git a/core/vm/log.go b/core/vm/log.go index 526221e43..191e3a253 100644 --- a/core/vm/log.go +++ b/core/vm/log.go @@ -66,6 +66,6 @@ func (l *Log) String() string { type Logs []*Log // LogForStorage is a wrapper around a Log that flattens and parses the entire -// content of a log, opposed to only the consensus fields originally (by hiding +// content of a log, as opposed to only the consensus fields originally (by hiding // the rlp interface methods). type LogForStorage Log -- cgit v1.2.3