aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-08-08 16:59:34 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-08-08 17:22:01 +0800
commit1c45f2f42e6578258e74c790d063b8639ec58bc6 (patch)
treec991999fc5f15ec38e87f56aa89bd9a2f98aacde
parent971079822ee6c12d9348ec78f212fe91c9b83edf (diff)
downloaddexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.tar
dexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.tar.gz
dexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.tar.bz2
dexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.tar.lz
dexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.tar.xz
dexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.tar.zst
dexon-1c45f2f42e6578258e74c790d063b8639ec58bc6.zip
core: fix txpool journal and test races
-rw-r--r--core/tx_pool.go19
-rw-r--r--core/tx_pool_test.go54
2 files changed, 43 insertions, 30 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 16f774265..b0c251f92 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -207,7 +207,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all)
- pool.resetState()
+ pool.reset()
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
@@ -261,7 +261,7 @@ func (pool *TxPool) loop() {
pool.homestead = true
}
}
- pool.resetState()
+ pool.reset()
pool.mu.Unlock()
case RemovedTransactionEvent:
@@ -300,15 +300,28 @@ func (pool *TxPool) loop() {
// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
+ pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
+ pool.mu.Unlock()
}
}
}
}
-func (pool *TxPool) resetState() {
+// lockedReset is a wrapper around reset to allow calling it in a thread safe
+// manner. This method is only ever used in the tester!
+func (pool *TxPool) lockedReset() {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pool.reset()
+}
+
+// reset retrieves the current state of the blockchain and ensures the content
+// of the transaction pool is valid with regard to the chain state.
+func (pool *TxPool) reset() {
currentState, err := pool.currentState()
if err != nil {
log.Error("Failed reset txpool state", "err", err)
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 020d6bedd..fcb330051 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -153,7 +153,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
// trigger state change in the background
trigger = true
- pool.resetState()
+ pool.lockedReset()
pendingTx, err := pool.Pending()
if err != nil {
@@ -213,7 +213,7 @@ func TestTransactionQueue(t *testing.T) {
from, _ := deriveSender(tx)
currentState, _ := pool.currentState()
currentState.AddBalance(from, big.NewInt(1000))
- pool.resetState()
+ pool.lockedReset()
pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables(currentState, []common.Address{from})
@@ -243,7 +243,7 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx1)
currentState, _ = pool.currentState()
currentState.AddBalance(from, big.NewInt(1000))
- pool.resetState()
+ pool.lockedReset()
pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
@@ -314,7 +314,7 @@ func TestTransactionChainFork(t *testing.T) {
pool.currentState = func() (*state.StateDB, error) { return statedb, nil }
currentState, _ := pool.currentState()
currentState.AddBalance(addr, big.NewInt(100000000000000))
- pool.resetState()
+ pool.lockedReset()
}
resetState()
@@ -342,7 +342,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
pool.currentState = func() (*state.StateDB, error) { return statedb, nil }
currentState, _ := pool.currentState()
currentState.AddBalance(addr, big.NewInt(100000000000000))
- pool.resetState()
+ pool.lockedReset()
}
resetState()
@@ -412,14 +412,14 @@ func TestNonceRecovery(t *testing.T) {
currentState, _ := pool.currentState()
currentState.SetNonce(addr, n)
currentState.AddBalance(addr, big.NewInt(100000000000000))
- pool.resetState()
+ pool.lockedReset()
tx := transaction(n, big.NewInt(100000), key)
if err := pool.AddRemote(tx); err != nil {
t.Error(err)
}
// simulate some weird re-order of transactions and missing nonce(s)
currentState.SetNonce(addr, n-1)
- pool.resetState()
+ pool.lockedReset()
if fn := pool.pendingState.GetNonce(addr); fn != n+1 {
t.Errorf("expected nonce to be %d, got %d", n+1, fn)
}
@@ -433,7 +433,7 @@ func TestRemovedTxEvent(t *testing.T) {
from, _ := deriveSender(tx)
currentState, _ := pool.currentState()
currentState.AddBalance(from, big.NewInt(1000000000000))
- pool.resetState()
+ pool.lockedReset()
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
pool.eventMux.Post(ChainHeadEvent{nil})
if pool.pending[from].Len() != 1 {
@@ -482,7 +482,7 @@ func TestTransactionDropping(t *testing.T) {
if len(pool.all) != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
}
- pool.resetState()
+ pool.lockedReset()
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
@@ -494,7 +494,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the balance of the account, and check that invalidated transactions are dropped
state.AddBalance(account, big.NewInt(-650))
- pool.resetState()
+ pool.lockedReset()
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@@ -519,7 +519,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.gasLimit = func() *big.Int { return big.NewInt(100) }
- pool.resetState()
+ pool.lockedReset()
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@@ -573,7 +573,7 @@ func TestTransactionPostponing(t *testing.T) {
if len(pool.all) != len(txns) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
}
- pool.resetState()
+ pool.lockedReset()
if pool.pending[account].Len() != len(txns) {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns))
}
@@ -585,7 +585,7 @@ func TestTransactionPostponing(t *testing.T) {
}
// Reduce the balance of the account, and check that transactions are reorganised
state.AddBalance(account, big.NewInt(-750))
- pool.resetState()
+ pool.lockedReset()
if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok {
t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0])
@@ -626,7 +626,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
state, _ := pool.currentState()
state.AddBalance(account, big.NewInt(1000000))
- pool.resetState()
+ pool.lockedReset()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ {
@@ -780,7 +780,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
if err := pool.AddRemote(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
- pending, queued := pool.stats()
+ pending, queued := pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
@@ -793,7 +793,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
// Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains
time.Sleep(2 * config.Lifetime)
- pending, queued = pool.stats()
+ pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
@@ -823,7 +823,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
state, _ := pool.currentState()
state.AddBalance(account, big.NewInt(1000000))
- pool.resetState()
+ pool.lockedReset()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
@@ -1057,7 +1057,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
pool.AddRemotes(txs)
pool.AddLocal(ltx)
- pending, queued := pool.stats()
+ pending, queued := pool.Stats()
if pending != 4 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
}
@@ -1070,7 +1070,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
// Reprice the pool and check that underpriced transactions get dropped
pool.SetGasPrice(big.NewInt(2))
- pending, queued = pool.stats()
+ pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
@@ -1095,7 +1095,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
if err := pool.AddLocal(tx); err != nil {
t.Fatalf("failed to add underpriced local transaction: %v", err)
}
- if pending, _ = pool.stats(); pending != 3 {
+ if pending, _ = pool.Stats(); pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if err := validateTxPoolInternals(pool); err != nil {
@@ -1142,7 +1142,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
pool.AddRemotes(txs)
pool.AddLocal(ltx)
- pending, queued := pool.stats()
+ pending, queued := pool.Stats()
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
@@ -1166,7 +1166,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(3, big.NewInt(100000), big.NewInt(5), keys[1])); err != nil {
t.Fatalf("failed to add well priced transaction: %v", err)
}
- pending, queued = pool.stats()
+ pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
@@ -1181,7 +1181,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if err := pool.AddLocal(tx); err != nil {
t.Fatalf("failed to add underpriced local transaction: %v", err)
}
- pending, queued = pool.stats()
+ pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
@@ -1307,7 +1307,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
- pending, queued := pool.stats()
+ pending, queued := pool.Stats()
if pending != 4 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
}
@@ -1322,7 +1322,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
- pending, queued = pool.stats()
+ pending, queued = pool.Stats()
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
@@ -1340,13 +1340,13 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
- pool.resetState()
+ pool.lockedReset()
time.Sleep(2 * config.Rejournal)
pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
- pending, queued = pool.stats()
+ pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}