aboutsummaryrefslogtreecommitdiffstats
path: root/core/transaction_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/transaction_pool.go')
-rw-r--r--core/transaction_pool.go88
1 files changed, 51 insertions, 37 deletions
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 7d58ffbd9..5e6f2c6a4 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -6,7 +6,6 @@ import (
"math/big"
"sort"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -38,10 +37,12 @@ type stateFn func() *state.StateDB
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- quit chan bool // Quiting channel
- currentState stateFn // The state function which will allow us to do some pre checkes
+ quit chan bool // Quiting channel
+ currentState stateFn // The state function which will allow us to do some pre checkes
+ state *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
eventMux *event.TypeMux
+ events event.Subscription
mu sync.RWMutex
txs map[common.Hash]*types.Transaction // processable transactions
@@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
+ state: state.ManageState(currentStateFn()),
}
}
func (pool *TxPool) Start() {
- // Queue timer will tick so we can attempt to move items from the queue to the
- // main transaction pool.
- queueTimer := time.NewTicker(300 * time.Millisecond)
- // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
- removalTimer := time.NewTicker(1 * time.Second)
-done:
- for {
- select {
- case <-queueTimer.C:
- pool.checkQueue()
- case <-removalTimer.C:
- pool.validatePool()
- case <-pool.quit:
- break done
+ pool.events = pool.eventMux.Subscribe(ChainEvent{})
+ for _ = range pool.events.Chan() {
+ pool.mu.Lock()
+ pool.state = state.ManageState(pool.currentState())
+
+ for _, tx := range pool.txs {
+ if addr, err := tx.From(); err == nil {
+ pool.state.SetNonce(addr, tx.Nonce())
+ }
}
+
+ pool.checkQueue()
+ pool.mu.Unlock()
}
}
+func (pool *TxPool) Stop() {
+ pool.txs = make(map[common.Hash]*types.Transaction)
+ close(pool.quit)
+ pool.events.Unsubscribe()
+ glog.V(logger.Info).Infoln("TX Pool stopped")
+}
+
+func (pool *TxPool) State() *state.ManagedState {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.state
+}
+
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
@@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
}
+ // check and validate the queueue
+ self.checkQueue()
+
return nil
}
@@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions.
func (self *TxPool) GetTransactions() (txs types.Transactions) {
- self.mu.RLock()
- defer self.mu.RUnlock()
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
+ // check queue first
+ self.checkQueue()
+ // invalidate any txs
+ self.validatePool()
txs = make(types.Transactions, len(self.txs))
i := 0
@@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (pool *TxPool) Stop() {
- pool.txs = make(map[common.Hash]*types.Transaction)
- close(pool.quit)
- glog.V(logger.Info).Infoln("TX Pool stopped")
-}
-
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated
if self.queue[from] == nil {
@@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
self.queue[from][hash] = tx
}
-func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.txs[hash]; !ok {
pool.txs[hash] = tx
+
+ pool.state.SetNonce(addr, tx.AccountNonce)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
@@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
+ state := pool.state
- statedb := pool.currentState()
var addq txQueue
for address, txs := range pool.queue {
- curnonce := statedb.GetNonce(address)
+ curnonce := state.GetNonce(address)
addq := addq[:0]
for hash, tx := range txs {
if tx.AccountNonce < curnonce {
+ fmt.Println("delete the tx", tx.AccountNonce, curnonce)
// Drop queued transactions whose nonce is lower than
// the account nonce because they have been processed.
delete(txs, hash)
} else {
// Collect the remaining transactions for the next pass.
- addq = append(addq, txQueueEntry{hash, tx})
+ addq = append(addq, txQueueEntry{hash, address, tx})
}
}
// Find the next consecutive nonce range starting at the
// current account nonce.
sort.Sort(addq)
for _, e := range addq {
- if e.AccountNonce != curnonce {
+ if e.AccountNonce > curnonce+1 {
break
}
- curnonce++
delete(txs, e.hash)
- pool.addTx(e.hash, e.Transaction)
+ pool.addTx(e.hash, address, e.Transaction)
}
// Delete the entire queue entry if it became empty.
if len(txs) == 0 {
@@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
for hash, tx := range pool.txs {
if err := pool.validateTx(tx); err != nil {
if glog.V(logger.Info) {
@@ -330,6 +343,7 @@ type txQueue []txQueueEntry
type txQueueEntry struct {
hash common.Hash
+ addr common.Address
*types.Transaction
}