aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorJeffrey Wilcke <geffobscura@gmail.com>2015-08-17 20:01:41 +0800
committerJeffrey Wilcke <geffobscura@gmail.com>2015-09-22 02:33:28 +0800
commiteaa4473dbd4ad404b85f8f0f63b0418a782351b4 (patch)
tree27eabb671346c279969caafe28d25a44aef0f9a0 /core
parent12c0afe4fe9f284dd10a80af7744102dac8bf06b (diff)
downloaddexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.tar
dexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.tar.gz
dexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.tar.bz2
dexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.tar.lz
dexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.tar.xz
dexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.tar.zst
dexon-eaa4473dbd4ad404b85f8f0f63b0418a782351b4.zip
core, core/types: readd transactions after chain re-org
Added a `Difference` method to `types.Transactions` which sets the receiver to the difference of a to b (NOTE: not a **and** b). Transaction pool subscribes to RemovedTransactionEvent adding back to those potential missing from the chain. When a chain re-org occurs remove any transactions that were removed from the canonical chain during the re-org as well as the receipts that were generated in the process. Closes #1746
Diffstat (limited to 'core')
-rw-r--r--core/bench_test.go2
-rw-r--r--core/chain_makers_test.go2
-rw-r--r--core/chain_manager.go60
-rw-r--r--core/chain_manager_test.go124
-rw-r--r--core/events.go3
-rw-r--r--core/genesis.go22
-rw-r--r--core/transaction_pool.go12
-rw-r--r--core/transaction_pool_test.go12
-rw-r--r--core/transaction_util.go21
-rw-r--r--core/types/transaction.go24
10 files changed, 228 insertions, 54 deletions
diff --git a/core/bench_test.go b/core/bench_test.go
index d05b7d30b..def4f0d2a 100644
--- a/core/bench_test.go
+++ b/core/bench_test.go
@@ -162,7 +162,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
// Generate a chain of b.N blocks using the supplied block
// generator function.
- genesis := WriteGenesisBlockForTesting(db, benchRootAddr, benchRootFunds)
+ genesis := WriteGenesisBlockForTesting(db, GenesisAccount{benchRootAddr, benchRootFunds})
chain := GenerateChain(genesis, db, b.N, gen)
// Time the insertion of the new chain.
diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go
index 1c868624d..ac18e5e0b 100644
--- a/core/chain_makers_test.go
+++ b/core/chain_makers_test.go
@@ -42,7 +42,7 @@ func ExampleGenerateChain() {
)
// Ensure that key1 has some funds in the genesis block.
- genesis := WriteGenesisBlockForTesting(db, addr1, big.NewInt(1000000))
+ genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr1, big.NewInt(1000000)})
// This call generates a chain of 5 blocks. The function runs for
// each block and adds different features to gen based on the
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 62fd548ed..42f70af33 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -569,18 +569,17 @@ func (self *ChainManager) WriteBlock(block *types.Block) (status writeStatus, er
// chain fork
if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain
- err := self.merge(cblock, block)
+ err := self.reorg(cblock, block)
if err != nil {
return NonStatTy, err
}
- status = SplitStatTy
}
+ status = CanonStatTy
+
self.mu.Lock()
self.setTotalDifficulty(td)
self.insert(block)
self.mu.Unlock()
-
- status = CanonStatTy
} else {
status = SideStatTy
}
@@ -681,9 +680,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
+ if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil {
+ glog.V(logger.Warn).Infoln("error writing block receipts:", err)
+ }
txcount += len(block.Transactions())
-
// write the block to the chain and get the status
status, err := self.WriteBlock(block)
if err != nil {
@@ -711,10 +712,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
queue[i] = ChainSplitEvent{block, logs}
queueEvent.splitCount++
}
- if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil {
- glog.V(logger.Warn).Infoln("error writing block receipts:", err)
- }
-
stats.processed++
}
@@ -729,20 +726,26 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return 0, nil
}
-// diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
-// to be part of the new canonical chain.
-func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, error) {
+// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
+// to be part of the new canonical chain and accumulates potential missing transactions and post an
+// event about them
+func (self *ChainManager) reorg(oldBlock, newBlock *types.Block) error {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
var (
newChain types.Blocks
commonBlock *types.Block
oldStart = oldBlock
newStart = newBlock
+ deletedTxs types.Transactions
)
// first reduce whoever is higher bound
if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain
for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) {
+ deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
}
} else {
// reduce new chain and append new chain blocks for inserting later on
@@ -751,10 +754,10 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
}
}
if oldBlock == nil {
- return nil, fmt.Errorf("Invalid old chain")
+ return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
- return nil, fmt.Errorf("Invalid new chain")
+ return fmt.Errorf("Invalid new chain")
}
numSplit := newBlock.Number()
@@ -764,13 +767,14 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
break
}
newChain = append(newChain, newBlock)
+ deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash())
if oldBlock == nil {
- return nil, fmt.Errorf("Invalid old chain")
+ return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
- return nil, fmt.Errorf("Invalid new chain")
+ return fmt.Errorf("Invalid new chain")
}
}
@@ -779,18 +783,8 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
}
- return newChain, nil
-}
-
-// merge merges two different chain to the new canonical chain
-func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error {
- newChain, err := self.diff(oldBlock, newBlock)
- if err != nil {
- return fmt.Errorf("chain reorg failed: %v", err)
- }
-
+ var addedTxs types.Transactions
// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
- self.mu.Lock()
for _, block := range newChain {
// insert the block in the canonical way, re-writing history
self.insert(block)
@@ -798,8 +792,18 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error {
PutTransactions(self.chainDb, block, block.Transactions())
PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash()))
+ addedTxs = append(addedTxs, block.Transactions()...)
+ }
+
+ // calculate the difference between deleted and added transactions
+ diff := types.TxDifference(deletedTxs, addedTxs)
+ // When transactions get deleted from the database that means the
+ // receipts that were created in the fork must also be deleted
+ for _, tx := range diff {
+ DeleteReceipt(self.chainDb, tx.Hash())
+ DeleteTransaction(self.chainDb, tx.Hash())
}
- self.mu.Unlock()
+ self.eventMux.Post(RemovedTransactionEvent{diff})
return nil
}
diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go
index 31a8769be..0c77fc138 100644
--- a/core/chain_manager_test.go
+++ b/core/chain_manager_test.go
@@ -30,8 +30,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
"github.com/hashicorp/golang-lru"
@@ -483,19 +485,115 @@ func TestInsertNonceError(t *testing.T) {
}
}
-/*
-func TestGenesisMismatch(t *testing.T) {
- db, _ := ethdb.NewMemDatabase()
- var mux event.TypeMux
- genesis := GenesisBlock(0, db)
- _, err := NewChainManager(genesis, db, db, db, thePow(), &mux)
- if err != nil {
- t.Error(err)
+// Tests that chain reorganizations handle transaction removals and reinsertions.
+func TestChainTxReorgs(t *testing.T) {
+ params.MinGasLimit = big.NewInt(125000) // Minimum the gas limit may ever be.
+ params.GenesisGasLimit = big.NewInt(3141592) // Gas limit of the Genesis block.
+
+ var (
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ key3, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ addr2 = crypto.PubkeyToAddress(key2.PublicKey)
+ addr3 = crypto.PubkeyToAddress(key3.PublicKey)
+ db, _ = ethdb.NewMemDatabase()
+ )
+ genesis := WriteGenesisBlockForTesting(db,
+ GenesisAccount{addr1, big.NewInt(1000000)},
+ GenesisAccount{addr2, big.NewInt(1000000)},
+ GenesisAccount{addr3, big.NewInt(1000000)},
+ )
+ // Create two transactions shared between the chains:
+ // - postponed: transaction included at a later block in the forked chain
+ // - swapped: transaction included at the same block number in the forked chain
+ postponed, _ := types.NewTransaction(0, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1)
+ swapped, _ := types.NewTransaction(1, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1)
+
+ // Create two transactions that will be dropped by the forked chain:
+ // - pastDrop: transaction dropped retroactively from a past block
+ // - freshDrop: transaction dropped exactly at the block where the reorg is detected
+ var pastDrop, freshDrop *types.Transaction
+
+ // Create three transactions that will be added in the forked chain:
+ // - pastAdd: transaction added before the reorganiztion is detected
+ // - freshAdd: transaction added at the exact block the reorg is detected
+ // - futureAdd: transaction added after the reorg has already finished
+ var pastAdd, freshAdd, futureAdd *types.Transaction
+
+ chain := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {
+ switch i {
+ case 0:
+ pastDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2)
+
+ gen.AddTx(pastDrop) // This transaction will be dropped in the fork from below the split point
+ gen.AddTx(postponed) // This transaction will be postponed till block #3 in the fork
+
+ case 2:
+ freshDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2)
+
+ gen.AddTx(freshDrop) // This transaction will be dropped in the fork from exactly at the split point
+ gen.AddTx(swapped) // This transaction will be swapped out at the exact height
+
+ gen.OffsetTime(9) // Lower the block difficulty to simulate a weaker chain
+ }
+ })
+ // Import the chain. This runs all block validation rules.
+ evmux := &event.TypeMux{}
+ chainman, _ := NewChainManager(db, FakePow{}, evmux)
+ chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux))
+ if i, err := chainman.InsertChain(chain); err != nil {
+ t.Fatalf("failed to insert original chain[%d]: %v", i, err)
+ }
+
+ // overwrite the old chain
+ chain = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
+ switch i {
+ case 0:
+ pastAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
+ gen.AddTx(pastAdd) // This transaction needs to be injected during reorg
+
+ case 2:
+ gen.AddTx(postponed) // This transaction was postponed from block #1 in the original chain
+ gen.AddTx(swapped) // This transaction was swapped from the exact current spot in the original chain
+
+ freshAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
+ gen.AddTx(freshAdd) // This transaction will be added exactly at reorg time
+
+ case 3:
+ futureAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
+ gen.AddTx(futureAdd) // This transaction will be added after a full reorg
+ }
+ })
+ if _, err := chainman.InsertChain(chain); err != nil {
+ t.Fatalf("failed to insert forked chain: %v", err)
}
- genesis = GenesisBlock(1, db)
- _, err = NewChainManager(genesis, db, db, db, thePow(), &mux)
- if err == nil {
- t.Error("expected genesis mismatch error")
+
+ // removed tx
+ for i, tx := range (types.Transactions{pastDrop, freshDrop}) {
+ if GetTransaction(db, tx.Hash()) != nil {
+ t.Errorf("drop %d: tx found while shouldn't have been", i)
+ }
+ if GetReceipt(db, tx.Hash()) != nil {
+ t.Errorf("drop %d: receipt found while shouldn't have been", i)
+ }
+ }
+ // added tx
+ for i, tx := range (types.Transactions{pastAdd, freshAdd, futureAdd}) {
+ if GetTransaction(db, tx.Hash()) == nil {
+ t.Errorf("add %d: expected tx to be found", i)
+ }
+ if GetReceipt(db, tx.Hash()) == nil {
+ t.Errorf("add %d: expected receipt to be found", i)
+ }
+ }
+ // shared tx
+ for i, tx := range (types.Transactions{postponed, swapped}) {
+ if GetTransaction(db, tx.Hash()) == nil {
+ t.Errorf("share %d: expected tx to be found", i)
+ }
+ if GetReceipt(db, tx.Hash()) == nil {
+ t.Errorf("share %d: expected receipt to be found", i)
+ }
}
}
-*/
diff --git a/core/events.go b/core/events.go
index a487fc51d..e142b6dba 100644
--- a/core/events.go
+++ b/core/events.go
@@ -36,6 +36,9 @@ type NewBlockEvent struct{ Block *types.Block }
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
+// RemovedTransactionEvent is posted when a reorg happens
+type RemovedTransactionEvent struct{ Txs types.Transactions }
+
// ChainSplit is posted when a new head is detected
type ChainSplitEvent struct {
Block *types.Block
diff --git a/core/genesis.go b/core/genesis.go
index 727e2c75f..b2346da65 100644
--- a/core/genesis.go
+++ b/core/genesis.go
@@ -125,15 +125,27 @@ func GenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big
return block
}
-func WriteGenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big.Int) *types.Block {
+type GenesisAccount struct {
+ Address common.Address
+ Balance *big.Int
+}
+
+func WriteGenesisBlockForTesting(db ethdb.Database, accounts ...GenesisAccount) *types.Block {
+ accountJson := "{"
+ for i, account := range accounts {
+ if i != 0 {
+ accountJson += ","
+ }
+ accountJson += fmt.Sprintf(`"0x%x":{"balance":"0x%x"}`, account.Address, account.Balance.Bytes())
+ }
+ accountJson += "}"
+
testGenesis := fmt.Sprintf(`{
"nonce":"0x%x",
"gasLimit":"0x%x",
"difficulty":"0x%x",
- "alloc": {
- "0x%x":{"balance":"0x%x"}
- }
-}`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), addr, balance.Bytes())
+ "alloc": %s
+}`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), accountJson)
block, _ := WriteGenesisBlock(db, strings.NewReader(testGenesis))
return block
}
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 513600be3..11d0cb490 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -81,7 +81,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
gasLimit: gasLimitFn,
minGasPrice: new(big.Int),
pendingState: state.ManageState(currentStateFn()),
- events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}),
+ events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
}
go pool.eventLoop()
@@ -93,16 +93,18 @@ func (pool *TxPool) eventLoop() {
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
for ev := range pool.events.Chan() {
- pool.mu.Lock()
-
switch ev := ev.(type) {
case ChainHeadEvent:
+ pool.mu.Lock()
pool.resetState()
+ pool.mu.Unlock()
case GasPriceChanged:
+ pool.mu.Lock()
pool.minGasPrice = ev.Price
+ pool.mu.Unlock()
+ case RemovedTransactionEvent:
+ pool.AddTransactions(ev.Txs)
}
-
- pool.mu.Unlock()
}
}
diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go
index d9267cc43..37cd20c96 100644
--- a/core/transaction_pool_test.go
+++ b/core/transaction_pool_test.go
@@ -238,3 +238,15 @@ func TestNonceRecovery(t *testing.T) {
t.Errorf("expected nonce to be %d, got %d", n+1, fn)
}
}
+
+func TestRemovedTxEvent(t *testing.T) {
+ pool, key := setupTxPool()
+ tx := transaction(0, big.NewInt(1000000), key)
+ from, _ := tx.From()
+ pool.currentState().AddBalance(from, big.NewInt(1000000000000))
+ pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
+ pool.eventMux.Post(ChainHeadEvent{nil})
+ if len(pool.pending) != 1 {
+ t.Error("expected 1 pending tx, got", len(pool.pending))
+ }
+}
diff --git a/core/transaction_util.go b/core/transaction_util.go
index 69c6bc36f..ebe095abb 100644
--- a/core/transaction_util.go
+++ b/core/transaction_util.go
@@ -77,6 +77,22 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio
}
}
+func DeleteTransaction(db ethdb.Database, txHash common.Hash) {
+ db.Delete(txHash[:])
+}
+
+func GetTransaction(db ethdb.Database, txhash common.Hash) *types.Transaction {
+ data, _ := db.Get(txhash[:])
+ if len(data) != 0 {
+ var tx types.Transaction
+ if err := rlp.DecodeBytes(data, &tx); err != nil {
+ return nil
+ }
+ return &tx
+ }
+ return nil
+}
+
// PutReceipts stores the receipts in the current database
func PutReceipts(db ethdb.Database, receipts types.Receipts) error {
batch := new(leveldb.Batch)
@@ -107,6 +123,11 @@ func PutReceipts(db ethdb.Database, receipts types.Receipts) error {
return nil
}
+// Delete a receipts from the database
+func DeleteReceipt(db ethdb.Database, txHash common.Hash) {
+ db.Delete(append(receiptsPre, txHash[:]...))
+}
+
// GetReceipt returns a receipt by hash
func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt {
data, _ := db.Get(append(receiptsPre, txHash[:]...))
diff --git a/core/types/transaction.go b/core/types/transaction.go
index 8260d7423..7a6c5e088 100644
--- a/core/types/transaction.go
+++ b/core/types/transaction.go
@@ -272,14 +272,36 @@ func (tx *Transaction) String() string {
// Transaction slice type for basic sorting.
type Transactions []*Transaction
-func (s Transactions) Len() int { return len(s) }
+// Len returns the length of s
+func (s Transactions) Len() int { return len(s) }
+
+// Swap swaps the i'th and the j'th element in s
func (s Transactions) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+// GetRlp implements Rlpable and returns the i'th element of s in rlp
func (s Transactions) GetRlp(i int) []byte {
enc, _ := rlp.EncodeToBytes(s[i])
return enc
}
+// Returns a new set t which is the difference between a to b
+func TxDifference(a, b Transactions) (keep Transactions) {
+ keep = make(Transactions, 0, len(a))
+
+ remove := make(map[common.Hash]struct{})
+ for _, tx := range b {
+ remove[tx.Hash()] = struct{}{}
+ }
+
+ for _, tx := range a {
+ if _, ok := remove[tx.Hash()]; !ok {
+ keep = append(keep, tx)
+ }
+ }
+
+ return keep
+}
+
type TxByNonce struct{ Transactions }
func (s TxByNonce) Less(i, j int) bool {