aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/tx_pool.go65
-rw-r--r--core/tx_pool_test.go95
-rw-r--r--eth/api_backend.go11
-rw-r--r--eth/helper_test.go8
-rw-r--r--eth/protocol.go4
-rw-r--r--eth/sync.go3
-rw-r--r--internal/ethapi/api.go16
-rw-r--r--internal/ethapi/backend.go2
-rw-r--r--les/api_backend.go2
-rw-r--r--les/handler.go8
-rw-r--r--light/txpool.go4
-rw-r--r--miner/worker.go9
12 files changed, 171 insertions, 56 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index edcbc21eb..b805cf226 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -176,7 +176,7 @@ func (pool *TxPool) resetState() {
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
- pool.demoteUnexecutables()
+ pool.demoteUnexecutables(currentState)
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
@@ -185,7 +185,7 @@ func (pool *TxPool) resetState() {
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
}
func (pool *TxPool) Stop() {
@@ -196,8 +196,12 @@ func (pool *TxPool) Stop() {
}
func (pool *TxPool) State() *state.ManagedState {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ if pool.pendingState == nil {
+ pool.resetState()
+ }
return pool.pendingState
}
@@ -237,21 +241,26 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
-func (pool *TxPool) Pending() map[common.Address]types.Transactions {
+func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
pool.mu.Lock()
defer pool.mu.Unlock()
+ state, err := pool.currentState()
+ if err != nil {
+ return nil, err
+ }
+
// check queue first
- pool.promoteExecutables()
+ pool.promoteExecutables(state)
// invalidate any txs
- pool.demoteUnexecutables()
+ pool.demoteUnexecutables(state)
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
- return pending
+ return pending, nil
}
// SetLocal marks a transaction as local, skipping gas price
@@ -410,13 +419,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
if err := pool.add(tx); err != nil {
return err
}
- pool.promoteExecutables()
+
+ state, err := pool.currentState()
+ if err != nil {
+ return err
+ }
+
+ pool.promoteExecutables(state)
return nil
}
// AddBatch attempts to queue a batch of transactions.
-func (pool *TxPool) AddBatch(txs []*types.Transaction) {
+func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
pool.mu.Lock()
defer pool.mu.Unlock()
@@ -425,7 +440,15 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) {
glog.V(logger.Debug).Infoln("tx error:", err)
}
}
- pool.promoteExecutables()
+
+ state, err := pool.currentState()
+ if err != nil {
+ return err
+ }
+
+ pool.promoteExecutables(state)
+
+ return nil
}
// Get returns a transaction if it is contained in the pool
@@ -499,17 +522,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
-func (pool *TxPool) promoteExecutables() {
- // Init delayed since tx pool could have been started before any state sync
- if pool.pendingState == nil {
- pool.resetState()
- }
- // Retrieve the current state to allow nonce and balance checking
- state, err := pool.currentState()
- if err != nil {
- glog.Errorf("Could not get current state: %v", err)
- return
- }
+func (pool *TxPool) promoteExecutables(state *state.StateDB) {
// Iterate over all accounts and promote any executable transactions
queued := uint64(0)
for addr, list := range pool.queue {
@@ -645,13 +658,7 @@ func (pool *TxPool) promoteExecutables() {
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
-func (pool *TxPool) demoteUnexecutables() {
- // Retrieve the current state to allow nonce and balance checking
- state, err := pool.currentState()
- if err != nil {
- glog.V(logger.Info).Infoln("failed to get current state: %v", err)
- return
- }
+func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := state.GetNonce(addr)
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 009d19886..3e516735b 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -51,6 +51,80 @@ func deriveSender(tx *types.Transaction) (common.Address, error) {
return types.Sender(types.HomesteadSigner{}, tx)
}
+// This test simulates a scenario where a new block is imported during a
+// state reset and tests whether the pending state is in sync with the
+// block head event that initiated the resetState().
+func TestStateChangeDuringPoolReset(t *testing.T) {
+ var (
+ db, _ = ethdb.NewMemDatabase()
+ key, _ = crypto.GenerateKey()
+ address = crypto.PubkeyToAddress(key.PublicKey)
+ mux = new(event.TypeMux)
+ statedb, _ = state.New(common.Hash{}, db)
+ trigger = false
+ )
+
+ // setup pool with 2 transaction in it
+ statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
+
+ tx0 := transaction(0, big.NewInt(100000), key)
+ tx1 := transaction(1, big.NewInt(100000), key)
+
+ // stateFunc is used multiple times to reset the pending state.
+ // when simulate is true it will create a state that indicates
+ // that tx0 and tx1 are included in the chain.
+ stateFunc := func() (*state.StateDB, error) {
+ // delay "state change" by one. The tx pool fetches the
+ // state multiple times and by delaying it a bit we simulate
+ // a state change between those fetches.
+ stdb := statedb
+ if trigger {
+ statedb, _ = state.New(common.Hash{}, db)
+ // simulate that the new head block included tx0 and tx1
+ statedb.SetNonce(address, 2)
+ statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
+ trigger = false
+ }
+ return stdb, nil
+ }
+
+ gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) }
+
+ txpool := NewTxPool(testChainConfig(), mux, stateFunc, gasLimitFunc)
+ txpool.resetState()
+
+ nonce := txpool.State().GetNonce(address)
+ if nonce != 0 {
+ t.Fatalf("Invalid nonce, want 0, got %d", nonce)
+ }
+
+ txpool.AddBatch(types.Transactions{tx0, tx1})
+
+ nonce = txpool.State().GetNonce(address)
+ if nonce != 2 {
+ t.Fatalf("Invalid nonce, want 2, got %d", nonce)
+ }
+
+ // trigger state change in the background
+ trigger = true
+
+ txpool.resetState()
+
+ pendingTx, err := txpool.Pending()
+ if err != nil {
+ t.Fatalf("Could not fetch pending transactions: %v", err)
+ }
+
+ for addr, txs := range pendingTx {
+ t.Logf("%0x: %d\n", addr, len(txs))
+ }
+
+ nonce = txpool.State().GetNonce(address)
+ if nonce != 2 {
+ t.Fatalf("Invalid nonce, want 2, got %d", nonce)
+ }
+}
+
func TestInvalidTransactions(t *testing.T) {
pool, key := setupTxPool()
@@ -97,9 +171,10 @@ func TestTransactionQueue(t *testing.T) {
from, _ := deriveSender(tx)
currentState, _ := pool.currentState()
currentState.AddBalance(from, big.NewInt(1000))
+ pool.resetState()
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
}
@@ -108,7 +183,7 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx)
currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
@@ -124,11 +199,13 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx1)
currentState, _ = pool.currentState()
currentState.AddBalance(from, big.NewInt(1000))
+ pool.resetState()
+
pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3)
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending))
@@ -225,7 +302,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
if err := pool.add(tx2); err != nil {
t.Error("didn't expect error", err)
}
- pool.promoteExecutables()
+ state, _ := pool.currentState()
+ pool.promoteExecutables(state)
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@@ -236,7 +314,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
if err := pool.add(tx3); err != nil {
t.Error("didn't expect error", err)
}
- pool.promoteExecutables()
+ pool.promoteExecutables(state)
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@@ -295,6 +373,7 @@ func TestRemovedTxEvent(t *testing.T) {
from, _ := deriveSender(tx)
currentState, _ := pool.currentState()
currentState.AddBalance(from, big.NewInt(1000000000000))
+ pool.resetState()
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
pool.eventMux.Post(ChainHeadEvent{nil})
if pool.pending[from].Len() != 1 {
@@ -452,6 +531,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
state, _ := pool.currentState()
state.AddBalance(account, big.NewInt(1000000))
+ pool.resetState()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= maxQueuedPerAccount+5; i++ {
@@ -564,6 +644,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
state, _ := pool.currentState()
state.AddBalance(account, big.NewInt(1000000))
+ pool.resetState()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
@@ -733,7 +814,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
- pool.demoteUnexecutables()
+ pool.demoteUnexecutables(state)
}
}
@@ -757,7 +838,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
- pool.promoteExecutables()
+ pool.promoteExecutables(state)
}
}
diff --git a/eth/api_backend.go b/eth/api_backend.go
index b95ef79c5..f33b6f7e1 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -131,15 +131,20 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.Remove(txHash)
}
-func (b *EthApiBackend) GetPoolTransactions() types.Transactions {
+func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
+ pending, err := b.eth.txPool.Pending()
+ if err != nil {
+ return nil, err
+ }
+
var txs types.Transactions
- for _, batch := range b.eth.txPool.Pending() {
+ for _, batch := range pending {
txs = append(txs, batch...)
}
- return txs
+ return txs, nil
}
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
diff --git a/eth/helper_test.go b/eth/helper_test.go
index f23976785..bd6b2d0da 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -93,7 +93,7 @@ type testTxPool struct {
// AddBatch appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
-func (p *testTxPool) AddBatch(txs []*types.Transaction) {
+func (p *testTxPool) AddBatch(txs []*types.Transaction) error {
p.lock.Lock()
defer p.lock.Unlock()
@@ -101,10 +101,12 @@ func (p *testTxPool) AddBatch(txs []*types.Transaction) {
if p.added != nil {
p.added <- txs
}
+
+ return nil
}
// Pending returns all the transactions known to the pool
-func (p *testTxPool) Pending() map[common.Address]types.Transactions {
+func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -116,7 +118,7 @@ func (p *testTxPool) Pending() map[common.Address]types.Transactions {
for _, batch := range batches {
sort.Sort(types.TxByNonce(batch))
}
- return batches
+ return batches, nil
}
// newTestTransaction create a new dummy transaction.
diff --git a/eth/protocol.go b/eth/protocol.go
index 3f65c204b..7d22b33de 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -98,11 +98,11 @@ var errorToString = map[int]string{
type txPool interface {
// AddBatch should add the given transactions to the pool.
- AddBatch([]*types.Transaction)
+ AddBatch([]*types.Transaction) error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
- Pending() map[common.Address]types.Transactions
+ Pending() (map[common.Address]types.Transactions, error)
}
// statusData is the network packet for the status message.
diff --git a/eth/sync.go b/eth/sync.go
index 6584bb1e2..373cc2054 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -46,7 +46,8 @@ type txsync struct {
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
var txs types.Transactions
- for _, batch := range pm.txpool.Pending() {
+ pending, _ := pm.txpool.Pending()
+ for _, batch := range pending {
txs = append(txs, batch...)
}
if len(txs) == 0 {
diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go
index a25eff5ed..fd86b6465 100644
--- a/internal/ethapi/api.go
+++ b/internal/ethapi/api.go
@@ -1273,8 +1273,12 @@ func (s *PublicTransactionPoolAPI) SignTransaction(ctx context.Context, args Sig
// PendingTransactions returns the transactions that are in the transaction pool and have a from address that is one of
// the accounts this node manages.
-func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
- pending := s.b.GetPoolTransactions()
+func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) {
+ pending, err := s.b.GetPoolTransactions()
+ if err != nil {
+ return nil, err
+ }
+
transactions := make([]*RPCTransaction, 0, len(pending))
for _, tx := range pending {
var signer types.Signer = types.HomesteadSigner{}
@@ -1286,13 +1290,17 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
transactions = append(transactions, newRPCPendingTransaction(tx))
}
}
- return transactions
+ return transactions, nil
}
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
// pool and reinsert it with the new gas price and limit.
func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) {
- pending := s.b.GetPoolTransactions()
+ pending, err := s.b.GetPoolTransactions()
+ if err != nil {
+ return common.Hash{}, err
+ }
+
for _, p := range pending {
var signer types.Signer = types.HomesteadSigner{}
if p.Protected() {
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index 77df7eb8d..36d7e754b 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -55,7 +55,7 @@ type Backend interface {
// TxPool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
RemoveTx(txHash common.Hash)
- GetPoolTransactions() types.Transactions
+ GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
Stats() (pending int, queued int)
diff --git a/les/api_backend.go b/les/api_backend.go
index 8df963f6e..7dc548ec3 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -110,7 +110,7 @@ func (b *LesApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.RemoveTx(txHash)
}
-func (b *LesApiBackend) GetPoolTransactions() types.Transactions {
+func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) {
return b.eth.txPool.GetTransactions()
}
diff --git a/les/handler.go b/les/handler.go
index 83d73666f..fdf4e6e8a 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -88,7 +88,7 @@ type BlockChain interface {
type txPool interface {
// AddTransactions should add the given transactions to the pool.
- AddBatch([]*types.Transaction)
+ AddBatch([]*types.Transaction) error
}
type ProtocolManager struct {
@@ -879,7 +879,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if reqCnt > maxReqs || reqCnt > MaxTxSend {
return errResp(ErrRequestRejected, "")
}
- pm.txpool.AddBatch(txs)
+
+ if err := pm.txpool.AddBatch(txs); err != nil {
+ return errResp(ErrUnexpectedResponse, "msg: %v", err)
+ }
+
_, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
diff --git a/light/txpool.go b/light/txpool.go
index 309bc3a32..4a06d317d 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -500,7 +500,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions.
// The returned slice may be modified by the caller.
-func (self *TxPool) GetTransactions() (txs types.Transactions) {
+func (self *TxPool) GetTransactions() (txs types.Transactions, err error) {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -510,7 +510,7 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
txs[i] = tx
i++
}
- return txs
+ return txs, nil
}
// Content retrieves the data content of the transaction pool, returning all the
diff --git a/miner/worker.go b/miner/worker.go
index edbd502c1..5fa7c4115 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -519,7 +519,14 @@ func (self *worker) commitNewWork() {
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
core.ApplyDAOHardFork(work.state)
}
- txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending())
+
+ pending, err := self.eth.TxPool().Pending()
+ if err != nil {
+ glog.Errorf("Could not fetch pending transactions: %v", err)
+ return
+ }
+
+ txs := types.NewTransactionsByPriceAndNonce(pending)
work.commitTransactions(self.mux, txs, self.gasPrice, self.chain)
self.eth.TxPool().RemoveBatch(work.lowGasTxs)