diff options
-rw-r--r-- | Makefile | 27 | ||||
-rw-r--r-- | cmd/geth/js_test.go | 2 | ||||
-rw-r--r-- | cmd/geth/main.go | 4 | ||||
-rw-r--r-- | common/natspec/natspec_e2e_test.go | 2 | ||||
-rw-r--r-- | core/bench_test.go | 2 | ||||
-rw-r--r-- | core/chain_makers_test.go | 2 | ||||
-rw-r--r-- | core/chain_manager.go | 112 | ||||
-rw-r--r-- | core/chain_manager_test.go | 141 | ||||
-rw-r--r-- | core/chain_pow.go | 87 | ||||
-rw-r--r-- | core/chain_pow_test.go | 233 | ||||
-rw-r--r-- | core/events.go | 3 | ||||
-rw-r--r-- | core/genesis.go | 22 | ||||
-rw-r--r-- | core/transaction_pool.go | 12 | ||||
-rw-r--r-- | core/transaction_pool_test.go | 12 | ||||
-rw-r--r-- | core/transaction_util.go | 21 | ||||
-rw-r--r-- | core/types/transaction.go | 24 | ||||
-rw-r--r-- | eth/helper_test.go | 2 |
17 files changed, 587 insertions, 121 deletions
@@ -10,6 +10,30 @@ geth: @echo "Done building." @echo "Run \"$(GOBIN)/geth\" to launch geth." +geth-cross: geth-linux geth-darwin geth-windows geth-android + @echo "Full cross compilation done:" + @ls -l $(GOBIN)/geth-* + +geth-linux: xgo + build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=linux/* -v ./cmd/geth + @echo "Linux cross compilation done:" + @ls -l $(GOBIN)/geth-linux-* + +geth-darwin: xgo + build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=darwin/* -v ./cmd/geth + @echo "Darwin cross compilation done:" + @ls -l $(GOBIN)/geth-darwin-* + +geth-windows: xgo + build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=windows/* -v ./cmd/geth + @echo "Windows cross compilation done:" + @ls -l $(GOBIN)/geth-windows-* + +geth-android: xgo + build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=android-16/*,android-21/* -v ./cmd/geth + @echo "Android cross compilation done:" + @ls -l $(GOBIN)/geth-android-* + evm: build/env.sh $(GOROOT)/bin/go install -v $(shell build/ldflags.sh) ./cmd/evm @echo "Done building." @@ -28,5 +52,8 @@ test: all travis-test-with-coverage: all build/env.sh build/test-global-coverage.sh +xgo: + build/env.sh go get github.com/karalabe/xgo + clean: rm -fr build/_workspace/pkg/ Godeps/_workspace/pkg $(GOBIN)/* diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index 2fd5a531d..1f5b28e3a 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -92,7 +92,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *eth db, _ := ethdb.NewMemDatabase() - core.WriteGenesisBlockForTesting(db, common.HexToAddress(testAddress), common.String2Big(testBalance)) + core.WriteGenesisBlockForTesting(db, core.GenesisAccount{common.HexToAddress(testAddress), common.String2Big(testBalance)}) ks := crypto.NewKeyStorePlain(filepath.Join(tmp, "keystore")) am := accounts.NewManager(ks) conf := ð.Config{ diff --git a/cmd/geth/main.go b/cmd/geth/main.go index b54d85c22..daffda30c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -48,9 +48,9 @@ import ( const ( ClientIdentifier = "Geth" - Version = "1.1.0" + Version = "1.2.0" VersionMajor = 1 - VersionMinor = 1 + VersionMinor = 2 VersionPatch = 0 ) diff --git a/common/natspec/natspec_e2e_test.go b/common/natspec/natspec_e2e_test.go index ea28b457e..57f81f652 100644 --- a/common/natspec/natspec_e2e_test.go +++ b/common/natspec/natspec_e2e_test.go @@ -134,7 +134,7 @@ func testEth(t *testing.T) (ethereum *eth.Ethereum, err error) { db, _ := ethdb.NewMemDatabase() // set up mock genesis with balance on the testAddress - core.WriteGenesisBlockForTesting(db, common.HexToAddress(testAddress), common.String2Big(testBalance)) + core.WriteGenesisBlockForTesting(db, core.GenesisAccount{common.HexToAddress(testAddress), common.String2Big(testBalance)}) // only use minimalistic stack with no networking ethereum, err = eth.New(ð.Config{ diff --git a/core/bench_test.go b/core/bench_test.go index d05b7d30b..def4f0d2a 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -162,7 +162,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Generate a chain of b.N blocks using the supplied block // generator function. - genesis := WriteGenesisBlockForTesting(db, benchRootAddr, benchRootFunds) + genesis := WriteGenesisBlockForTesting(db, GenesisAccount{benchRootAddr, benchRootFunds}) chain := GenerateChain(genesis, db, b.N, gen) // Time the insertion of the new chain. diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index 1c868624d..ac18e5e0b 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -42,7 +42,7 @@ func ExampleGenerateChain() { ) // Ensure that key1 has some funds in the genesis block. - genesis := WriteGenesisBlockForTesting(db, addr1, big.NewInt(1000000)) + genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr1, big.NewInt(1000000)}) // This call generates a chain of 5 blocks. The function runs for // each block and adds different features to gen based on the diff --git a/core/chain_manager.go b/core/chain_manager.go index 1218b1a6e..42f70af33 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "math/big" - "runtime" "sync" "sync/atomic" "time" @@ -570,18 +569,17 @@ func (self *ChainManager) WriteBlock(block *types.Block) (status writeStatus, er // chain fork if block.ParentHash() != cblock.Hash() { // during split we merge two different chains and create the new canonical chain - err := self.merge(cblock, block) + err := self.reorg(cblock, block) if err != nil { return NonStatTy, err } - status = SplitStatTy } + status = CanonStatTy + self.mu.Lock() self.setTotalDifficulty(td) self.insert(block) self.mu.Unlock() - - status = CanonStatTy } else { status = SideStatTy } @@ -616,14 +614,12 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { stats struct{ queued, processed, ignored int } tstart = time.Now() - nonceDone = make(chan nonceResult, len(chain)) - nonceQuit = make(chan struct{}) nonceChecked = make([]bool, len(chain)) ) // Start the parallel nonce verifier. - go verifyNonces(self.pow, chain, nonceQuit, nonceDone) - defer close(nonceQuit) + nonceAbort, nonceResults := verifyNoncesFromBlocks(self.pow, chain) + defer close(nonceAbort) txcount := 0 for i, block := range chain { @@ -636,11 +632,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Wait for block i's nonce to be verified before processing // its state transition. for !nonceChecked[i] { - r := <-nonceDone - nonceChecked[r.i] = true + r := <-nonceResults + nonceChecked[r.index] = true if !r.valid { - block := chain[r.i] - return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()} + block := chain[r.index] + return r.index, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()} } } @@ -684,9 +680,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } + if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil { + glog.V(logger.Warn).Infoln("error writing block receipts:", err) + } txcount += len(block.Transactions()) - // write the block to the chain and get the status status, err := self.WriteBlock(block) if err != nil { @@ -714,10 +712,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { queue[i] = ChainSplitEvent{block, logs} queueEvent.splitCount++ } - if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil { - glog.V(logger.Warn).Infoln("error writing block receipts:", err) - } - stats.processed++ } @@ -732,20 +726,26 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return 0, nil } -// diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them -// to be part of the new canonical chain. -func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, error) { +// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them +// to be part of the new canonical chain and accumulates potential missing transactions and post an +// event about them +func (self *ChainManager) reorg(oldBlock, newBlock *types.Block) error { + self.mu.Lock() + defer self.mu.Unlock() + var ( newChain types.Blocks commonBlock *types.Block oldStart = oldBlock newStart = newBlock + deletedTxs types.Transactions ) // first reduce whoever is higher bound if oldBlock.NumberU64() > newBlock.NumberU64() { // reduce old chain for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) { + deletedTxs = append(deletedTxs, oldBlock.Transactions()...) } } else { // reduce new chain and append new chain blocks for inserting later on @@ -754,10 +754,10 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e } } if oldBlock == nil { - return nil, fmt.Errorf("Invalid old chain") + return fmt.Errorf("Invalid old chain") } if newBlock == nil { - return nil, fmt.Errorf("Invalid new chain") + return fmt.Errorf("Invalid new chain") } numSplit := newBlock.Number() @@ -767,13 +767,14 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e break } newChain = append(newChain, newBlock) + deletedTxs = append(deletedTxs, oldBlock.Transactions()...) oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash()) if oldBlock == nil { - return nil, fmt.Errorf("Invalid old chain") + return fmt.Errorf("Invalid old chain") } if newBlock == nil { - return nil, fmt.Errorf("Invalid new chain") + return fmt.Errorf("Invalid new chain") } } @@ -782,18 +783,8 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4]) } - return newChain, nil -} - -// merge merges two different chain to the new canonical chain -func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error { - newChain, err := self.diff(oldBlock, newBlock) - if err != nil { - return fmt.Errorf("chain reorg failed: %v", err) - } - + var addedTxs types.Transactions // insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly - self.mu.Lock() for _, block := range newChain { // insert the block in the canonical way, re-writing history self.insert(block) @@ -801,8 +792,18 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error { PutTransactions(self.chainDb, block, block.Transactions()) PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash())) + addedTxs = append(addedTxs, block.Transactions()...) + } + + // calculate the difference between deleted and added transactions + diff := types.TxDifference(deletedTxs, addedTxs) + // When transactions get deleted from the database that means the + // receipts that were created in the fork must also be deleted + for _, tx := range diff { + DeleteReceipt(self.chainDb, tx.Hash()) + DeleteTransaction(self.chainDb, tx.Hash()) } - self.mu.Unlock() + self.eventMux.Post(RemovedTransactionEvent{diff}) return nil } @@ -843,40 +844,3 @@ func blockErr(block *types.Block, err error) { glog.V(logger.Error).Infoln(err) glog.V(logger.Debug).Infoln(verifyNonces) } - -type nonceResult struct { - i int - valid bool -} - -// block verifies nonces of the given blocks in parallel and returns -// an error if one of the blocks nonce verifications failed. -func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) { - // Spawn a few workers. They listen for blocks on the in channel - // and send results on done. The workers will exit in the - // background when in is closed. - var ( - in = make(chan int) - nworkers = runtime.GOMAXPROCS(0) - ) - defer close(in) - if len(blocks) < nworkers { - nworkers = len(blocks) - } - for i := 0; i < nworkers; i++ { - go func() { - for i := range in { - done <- nonceResult{i: i, valid: pow.Verify(blocks[i])} - } - }() - } - // Feed block indices to the workers. - for i := range blocks { - select { - case in <- i: - continue - case <-quit: - return - } - } -} diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 67ca41f00..0c77fc138 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -30,8 +30,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" "github.com/hashicorp/golang-lru" @@ -456,7 +458,7 @@ func TestInsertNonceError(t *testing.T) { fail := rand.Int() % len(blocks) failblock := blocks[fail] - bc.pow = failpow{failblock.NumberU64()} + bc.pow = failPow{failblock.NumberU64()} n, err := bc.InsertChain(blocks) // Check that the returned error indicates the nonce failure. @@ -483,34 +485,115 @@ func TestInsertNonceError(t *testing.T) { } } -/* -func TestGenesisMismatch(t *testing.T) { - db, _ := ethdb.NewMemDatabase() - var mux event.TypeMux - genesis := GenesisBlock(0, db) - _, err := NewChainManager(genesis, db, db, db, thePow(), &mux) - if err != nil { - t.Error(err) - } - genesis = GenesisBlock(1, db) - _, err = NewChainManager(genesis, db, db, db, thePow(), &mux) - if err == nil { - t.Error("expected genesis mismatch error") +// Tests that chain reorganizations handle transaction removals and reinsertions. +func TestChainTxReorgs(t *testing.T) { + params.MinGasLimit = big.NewInt(125000) // Minimum the gas limit may ever be. + params.GenesisGasLimit = big.NewInt(3141592) // Gas limit of the Genesis block. + + var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + key3, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = crypto.PubkeyToAddress(key2.PublicKey) + addr3 = crypto.PubkeyToAddress(key3.PublicKey) + db, _ = ethdb.NewMemDatabase() + ) + genesis := WriteGenesisBlockForTesting(db, + GenesisAccount{addr1, big.NewInt(1000000)}, + GenesisAccount{addr2, big.NewInt(1000000)}, + GenesisAccount{addr3, big.NewInt(1000000)}, + ) + // Create two transactions shared between the chains: + // - postponed: transaction included at a later block in the forked chain + // - swapped: transaction included at the same block number in the forked chain + postponed, _ := types.NewTransaction(0, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1) + swapped, _ := types.NewTransaction(1, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1) + + // Create two transactions that will be dropped by the forked chain: + // - pastDrop: transaction dropped retroactively from a past block + // - freshDrop: transaction dropped exactly at the block where the reorg is detected + var pastDrop, freshDrop *types.Transaction + + // Create three transactions that will be added in the forked chain: + // - pastAdd: transaction added before the reorganiztion is detected + // - freshAdd: transaction added at the exact block the reorg is detected + // - futureAdd: transaction added after the reorg has already finished + var pastAdd, freshAdd, futureAdd *types.Transaction + + chain := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) { + switch i { + case 0: + pastDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2) + + gen.AddTx(pastDrop) // This transaction will be dropped in the fork from below the split point + gen.AddTx(postponed) // This transaction will be postponed till block #3 in the fork + + case 2: + freshDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2) + + gen.AddTx(freshDrop) // This transaction will be dropped in the fork from exactly at the split point + gen.AddTx(swapped) // This transaction will be swapped out at the exact height + + gen.OffsetTime(9) // Lower the block difficulty to simulate a weaker chain + } + }) + // Import the chain. This runs all block validation rules. + evmux := &event.TypeMux{} + chainman, _ := NewChainManager(db, FakePow{}, evmux) + chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux)) + if i, err := chainman.InsertChain(chain); err != nil { + t.Fatalf("failed to insert original chain[%d]: %v", i, err) + } + + // overwrite the old chain + chain = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) { + switch i { + case 0: + pastAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3) + gen.AddTx(pastAdd) // This transaction needs to be injected during reorg + + case 2: + gen.AddTx(postponed) // This transaction was postponed from block #1 in the original chain + gen.AddTx(swapped) // This transaction was swapped from the exact current spot in the original chain + + freshAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3) + gen.AddTx(freshAdd) // This transaction will be added exactly at reorg time + + case 3: + futureAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3) + gen.AddTx(futureAdd) // This transaction will be added after a full reorg + } + }) + if _, err := chainman.InsertChain(chain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) } -} -*/ - -// failpow returns false from Verify for a certain block number. -type failpow struct{ num uint64 } -func (pow failpow) Search(pow.Block, <-chan struct{}) (nonce uint64, mixHash []byte) { - return 0, nil -} -func (pow failpow) Verify(b pow.Block) bool { - return b.NumberU64() != pow.num -} -func (pow failpow) GetHashrate() int64 { - return 0 -} -func (pow failpow) Turbo(bool) { + // removed tx + for i, tx := range (types.Transactions{pastDrop, freshDrop}) { + if GetTransaction(db, tx.Hash()) != nil { + t.Errorf("drop %d: tx found while shouldn't have been", i) + } + if GetReceipt(db, tx.Hash()) != nil { + t.Errorf("drop %d: receipt found while shouldn't have been", i) + } + } + // added tx + for i, tx := range (types.Transactions{pastAdd, freshAdd, futureAdd}) { + if GetTransaction(db, tx.Hash()) == nil { + t.Errorf("add %d: expected tx to be found", i) + } + if GetReceipt(db, tx.Hash()) == nil { + t.Errorf("add %d: expected receipt to be found", i) + } + } + // shared tx + for i, tx := range (types.Transactions{postponed, swapped}) { + if GetTransaction(db, tx.Hash()) == nil { + t.Errorf("share %d: expected tx to be found", i) + } + if GetReceipt(db, tx.Hash()) == nil { + t.Errorf("share %d: expected receipt to be found", i) + } + } } diff --git a/core/chain_pow.go b/core/chain_pow.go new file mode 100644 index 000000000..c3b5788c1 --- /dev/null +++ b/core/chain_pow.go @@ -0,0 +1,87 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package core + +import ( + "runtime" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/pow" +) + +// nonceCheckResult contains the result of a nonce verification. +type nonceCheckResult struct { + index int // Index of the item verified from an input array + valid bool // Result of the nonce verification +} + +// verifyNoncesFromHeaders starts a concurrent header nonce verification, +// returning a quit channel to abort the operations and a results channel +// to retrieve the async verifications. +func verifyNoncesFromHeaders(checker pow.PoW, headers []*types.Header) (chan<- struct{}, <-chan nonceCheckResult) { + items := make([]pow.Block, len(headers)) + for i, header := range headers { + items[i] = types.NewBlockWithHeader(header) + } + return verifyNonces(checker, items) +} + +// verifyNoncesFromBlocks starts a concurrent block nonce verification, +// returning a quit channel to abort the operations and a results channel +// to retrieve the async verifications. +func verifyNoncesFromBlocks(checker pow.PoW, blocks []*types.Block) (chan<- struct{}, <-chan nonceCheckResult) { + items := make([]pow.Block, len(blocks)) + for i, block := range blocks { + items[i] = block + } + return verifyNonces(checker, items) +} + +// verifyNonces starts a concurrent nonce verification, returning a quit channel +// to abort the operations and a results channel to retrieve the async checks. +func verifyNonces(checker pow.PoW, items []pow.Block) (chan<- struct{}, <-chan nonceCheckResult) { + // Spawn as many workers as allowed threads + workers := runtime.GOMAXPROCS(0) + if len(items) < workers { + workers = len(items) + } + // Create a task channel and spawn the verifiers + tasks := make(chan int, workers) + results := make(chan nonceCheckResult, len(items)) // Buffered to make sure all workers stop + for i := 0; i < workers; i++ { + go func() { + for index := range tasks { + results <- nonceCheckResult{index: index, valid: checker.Verify(items[index])} + } + }() + } + // Feed item indices to the workers until done or aborted + abort := make(chan struct{}) + go func() { + defer close(tasks) + + for i := range items { + select { + case tasks <- i: + continue + case <-abort: + return + } + } + }() + return abort, results +} diff --git a/core/chain_pow_test.go b/core/chain_pow_test.go new file mode 100644 index 000000000..80c6a1cc0 --- /dev/null +++ b/core/chain_pow_test.go @@ -0,0 +1,233 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package core + +import ( + "math/big" + "runtime" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/pow" +) + +// failPow is a non-validating proof of work implementation, that returns true +// from Verify for all but one block. +type failPow struct { + failing uint64 +} + +func (pow failPow) Search(pow.Block, <-chan struct{}) (uint64, []byte) { + return 0, nil +} +func (pow failPow) Verify(block pow.Block) bool { return block.NumberU64() != pow.failing } +func (pow failPow) GetHashrate() int64 { return 0 } +func (pow failPow) Turbo(bool) {} + +// delayedPow is a non-validating proof of work implementation, that returns true +// from Verify for all blocks, but delays them the configured amount of time. +type delayedPow struct { + delay time.Duration +} + +func (pow delayedPow) Search(pow.Block, <-chan struct{}) (uint64, []byte) { + return 0, nil +} +func (pow delayedPow) Verify(block pow.Block) bool { time.Sleep(pow.delay); return true } +func (pow delayedPow) GetHashrate() int64 { return 0 } +func (pow delayedPow) Turbo(bool) {} + +// Tests that simple POW verification works, for both good and bad blocks. +func TestPowVerification(t *testing.T) { + // Create a simple chain to verify + var ( + testdb, _ = ethdb.NewMemDatabase() + genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int)) + blocks = GenerateChain(genesis, testdb, 8, nil) + ) + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + // Run the POW checker for blocks one-by-one, checking for both valid and invalid nonces + for i := 0; i < len(blocks); i++ { + for j, full := range []bool{true, false} { + for k, valid := range []bool{true, false} { + var results <-chan nonceCheckResult + + switch { + case full && valid: + _, results = verifyNoncesFromBlocks(FakePow{}, []*types.Block{blocks[i]}) + case full && !valid: + _, results = verifyNoncesFromBlocks(failPow{blocks[i].NumberU64()}, []*types.Block{blocks[i]}) + case !full && valid: + _, results = verifyNoncesFromHeaders(FakePow{}, []*types.Header{headers[i]}) + case !full && !valid: + _, results = verifyNoncesFromHeaders(failPow{headers[i].Number.Uint64()}, []*types.Header{headers[i]}) + } + // Wait for the verification result + select { + case result := <-results: + if result.index != 0 { + t.Errorf("test %d.%d.%d: invalid index: have %d, want 0", i, j, k, result.index) + } + if result.valid != valid { + t.Errorf("test %d.%d.%d: validity mismatch: have %v, want %v", i, j, k, result.valid, valid) + } + case <-time.After(time.Second): + t.Fatalf("test %d.%d.%d: verification timeout", i, j, k) + } + // Make sure no more data is returned + select { + case result := <-results: + t.Fatalf("test %d.%d.%d: unexpected result returned: %v", i, j, k, result) + case <-time.After(25 * time.Millisecond): + } + } + } + } +} + +// Tests that concurrent POW verification works, for both good and bad blocks. +func TestPowConcurrentVerification2(t *testing.T) { testPowConcurrentVerification(t, 2) } +func TestPowConcurrentVerification8(t *testing.T) { testPowConcurrentVerification(t, 8) } +func TestPowConcurrentVerification32(t *testing.T) { testPowConcurrentVerification(t, 32) } + +func testPowConcurrentVerification(t *testing.T, threads int) { + // Create a simple chain to verify + var ( + testdb, _ = ethdb.NewMemDatabase() + genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int)) + blocks = GenerateChain(genesis, testdb, 8, nil) + ) + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + // Set the number of threads to verify on + old := runtime.GOMAXPROCS(threads) + defer runtime.GOMAXPROCS(old) + + // Run the POW checker for the entire block chain at once both for a valid and + // also an invalid chain (enough if one is invalid, last but one (arbitrary)). + for i, full := range []bool{true, false} { + for j, valid := range []bool{true, false} { + var results <-chan nonceCheckResult + + switch { + case full && valid: + _, results = verifyNoncesFromBlocks(FakePow{}, blocks) + case full && !valid: + _, results = verifyNoncesFromBlocks(failPow{uint64(len(blocks) - 1)}, blocks) + case !full && valid: + _, results = verifyNoncesFromHeaders(FakePow{}, headers) + case !full && !valid: + _, results = verifyNoncesFromHeaders(failPow{uint64(len(headers) - 1)}, headers) + } + // Wait for all the verification results + checks := make(map[int]bool) + for k := 0; k < len(blocks); k++ { + select { + case result := <-results: + if _, ok := checks[result.index]; ok { + t.Fatalf("test %d.%d.%d: duplicate results for %d", i, j, k, result.index) + } + if result.index < 0 || result.index >= len(blocks) { + t.Fatalf("test %d.%d.%d: result %d out of bounds [%d, %d]", i, j, k, result.index, 0, len(blocks)-1) + } + checks[result.index] = result.valid + + case <-time.After(time.Second): + t.Fatalf("test %d.%d.%d: verification timeout", i, j, k) + } + } + // Check nonce check validity + for k := 0; k < len(blocks); k++ { + want := valid || (k != len(blocks)-2) // We chose the last but one nonce in the chain to fail + if checks[k] != want { + t.Errorf("test %d.%d.%d: validity mismatch: have %v, want %v", i, j, k, checks[k], want) + } + } + // Make sure no more data is returned + select { + case result := <-results: + t.Fatalf("test %d.%d: unexpected result returned: %v", i, j, result) + case <-time.After(25 * time.Millisecond): + } + } + } +} + +// Tests that aborting a POW validation indeed prevents further checks from being +// run, as well as checks that no left-over goroutines are leaked. +func TestPowConcurrentAbortion2(t *testing.T) { testPowConcurrentAbortion(t, 2) } +func TestPowConcurrentAbortion8(t *testing.T) { testPowConcurrentAbortion(t, 8) } +func TestPowConcurrentAbortion32(t *testing.T) { testPowConcurrentAbortion(t, 32) } + +func testPowConcurrentAbortion(t *testing.T, threads int) { + // Create a simple chain to verify + var ( + testdb, _ = ethdb.NewMemDatabase() + genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int)) + blocks = GenerateChain(genesis, testdb, 1024, nil) + ) + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + // Set the number of threads to verify on + old := runtime.GOMAXPROCS(threads) + defer runtime.GOMAXPROCS(old) + + // Run the POW checker for the entire block chain at once + for i, full := range []bool{true, false} { + var abort chan<- struct{} + var results <-chan nonceCheckResult + + // Start the verifications and immediately abort + if full { + abort, results = verifyNoncesFromBlocks(delayedPow{time.Millisecond}, blocks) + } else { + abort, results = verifyNoncesFromHeaders(delayedPow{time.Millisecond}, headers) + } + close(abort) + + // Deplete the results channel + verified := make(map[int]struct{}) + for depleted := false; !depleted; { + select { + case result := <-results: + verified[result.index] = struct{}{} + case <-time.After(50 * time.Millisecond): + depleted = true + } + } + // Check that abortion was honored by not processing too many POWs + if len(verified) > 2*threads { + t.Errorf("test %d: verification count too large: have %d, want below %d", i, len(verified), 2*threads) + } + // Check that there are no gaps in the results + for j := 0; j < len(verified); j++ { + if _, ok := verified[j]; !ok { + t.Errorf("test %d.%d: gap found in verification results", i, j) + } + } + } +} diff --git a/core/events.go b/core/events.go index a487fc51d..e142b6dba 100644 --- a/core/events.go +++ b/core/events.go @@ -36,6 +36,9 @@ type NewBlockEvent struct{ Block *types.Block } // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } +// RemovedTransactionEvent is posted when a reorg happens +type RemovedTransactionEvent struct{ Txs types.Transactions } + // ChainSplit is posted when a new head is detected type ChainSplitEvent struct { Block *types.Block diff --git a/core/genesis.go b/core/genesis.go index 727e2c75f..b2346da65 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -125,15 +125,27 @@ func GenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big return block } -func WriteGenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big.Int) *types.Block { +type GenesisAccount struct { + Address common.Address + Balance *big.Int +} + +func WriteGenesisBlockForTesting(db ethdb.Database, accounts ...GenesisAccount) *types.Block { + accountJson := "{" + for i, account := range accounts { + if i != 0 { + accountJson += "," + } + accountJson += fmt.Sprintf(`"0x%x":{"balance":"0x%x"}`, account.Address, account.Balance.Bytes()) + } + accountJson += "}" + testGenesis := fmt.Sprintf(`{ "nonce":"0x%x", "gasLimit":"0x%x", "difficulty":"0x%x", - "alloc": { - "0x%x":{"balance":"0x%x"} - } -}`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), addr, balance.Bytes()) + "alloc": %s +}`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), accountJson) block, _ := WriteGenesisBlock(db, strings.NewReader(testGenesis)) return block } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 513600be3..11d0cb490 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -81,7 +81,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( gasLimit: gasLimitFn, minGasPrice: new(big.Int), pendingState: state.ManageState(currentStateFn()), - events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}), + events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), } go pool.eventLoop() @@ -93,16 +93,18 @@ func (pool *TxPool) eventLoop() { // we need to know the new state. The new state will help us determine // the nonces in the managed state for ev := range pool.events.Chan() { - pool.mu.Lock() - switch ev := ev.(type) { case ChainHeadEvent: + pool.mu.Lock() pool.resetState() + pool.mu.Unlock() case GasPriceChanged: + pool.mu.Lock() pool.minGasPrice = ev.Price + pool.mu.Unlock() + case RemovedTransactionEvent: + pool.AddTransactions(ev.Txs) } - - pool.mu.Unlock() } } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index d9267cc43..37cd20c96 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -238,3 +238,15 @@ func TestNonceRecovery(t *testing.T) { t.Errorf("expected nonce to be %d, got %d", n+1, fn) } } + +func TestRemovedTxEvent(t *testing.T) { + pool, key := setupTxPool() + tx := transaction(0, big.NewInt(1000000), key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1000000000000)) + pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) + pool.eventMux.Post(ChainHeadEvent{nil}) + if len(pool.pending) != 1 { + t.Error("expected 1 pending tx, got", len(pool.pending)) + } +} diff --git a/core/transaction_util.go b/core/transaction_util.go index 69c6bc36f..ebe095abb 100644 --- a/core/transaction_util.go +++ b/core/transaction_util.go @@ -77,6 +77,22 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio } } +func DeleteTransaction(db ethdb.Database, txHash common.Hash) { + db.Delete(txHash[:]) +} + +func GetTransaction(db ethdb.Database, txhash common.Hash) *types.Transaction { + data, _ := db.Get(txhash[:]) + if len(data) != 0 { + var tx types.Transaction + if err := rlp.DecodeBytes(data, &tx); err != nil { + return nil + } + return &tx + } + return nil +} + // PutReceipts stores the receipts in the current database func PutReceipts(db ethdb.Database, receipts types.Receipts) error { batch := new(leveldb.Batch) @@ -107,6 +123,11 @@ func PutReceipts(db ethdb.Database, receipts types.Receipts) error { return nil } +// Delete a receipts from the database +func DeleteReceipt(db ethdb.Database, txHash common.Hash) { + db.Delete(append(receiptsPre, txHash[:]...)) +} + // GetReceipt returns a receipt by hash func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt { data, _ := db.Get(append(receiptsPre, txHash[:]...)) diff --git a/core/types/transaction.go b/core/types/transaction.go index 8260d7423..7a6c5e088 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -272,14 +272,36 @@ func (tx *Transaction) String() string { // Transaction slice type for basic sorting. type Transactions []*Transaction -func (s Transactions) Len() int { return len(s) } +// Len returns the length of s +func (s Transactions) Len() int { return len(s) } + +// Swap swaps the i'th and the j'th element in s func (s Transactions) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +// GetRlp implements Rlpable and returns the i'th element of s in rlp func (s Transactions) GetRlp(i int) []byte { enc, _ := rlp.EncodeToBytes(s[i]) return enc } +// Returns a new set t which is the difference between a to b +func TxDifference(a, b Transactions) (keep Transactions) { + keep = make(Transactions, 0, len(a)) + + remove := make(map[common.Hash]struct{}) + for _, tx := range b { + remove[tx.Hash()] = struct{}{} + } + + for _, tx := range a { + if _, ok := remove[tx.Hash()]; !ok { + keep = append(keep, tx) + } + } + + return keep +} + type TxByNonce struct{ Transactions } func (s TxByNonce) Less(i, j int) bool { diff --git a/eth/helper_test.go b/eth/helper_test.go index 3a799e6f6..034751f7f 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -33,7 +33,7 @@ func newTestProtocolManager(blocks int, generator func(int, *core.BlockGen), new evmux = new(event.TypeMux) pow = new(core.FakePow) db, _ = ethdb.NewMemDatabase() - genesis = core.WriteGenesisBlockForTesting(db, testBankAddress, testBankFunds) + genesis = core.WriteGenesisBlockForTesting(db, core.GenesisAccount{testBankAddress, testBankFunds}) chainman, _ = core.NewChainManager(db, pow, evmux) blockproc = core.NewBlockProcessor(db, pow, chainman, evmux) ) |