aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/chain_manager.go21
-rw-r--r--core/transaction_pool.go88
-rw-r--r--core/transaction_pool_test.go20
3 files changed, 62 insertions, 67 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index d58c0d504..d14a19fea 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB {
return self.transState
}
-func (self *ChainManager) TxState() *state.ManagedState {
- self.tsmu.RLock()
- defer self.tsmu.RUnlock()
-
- return self.txState
-}
-
-func (self *ChainManager) setTxState(statedb *state.StateDB) {
- self.tsmu.Lock()
- defer self.tsmu.Unlock()
- self.txState = state.ManageState(statedb)
-}
-
func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb
}
@@ -751,7 +738,7 @@ out:
case ev := <-events.Chan():
switch ev := ev.(type) {
case queueEvent:
- for i, event := range ev.queue {
+ for _, event := range ev.queue {
switch event := event.(type) {
case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
@@ -760,12 +747,6 @@ out:
self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block})
}
- case ChainSplitEvent:
- // On chain splits we need to reset the transaction state. We can't be sure whether the actual
- // state of the accounts are still valid.
- if i == ev.splitCount {
- self.setTxState(state.New(event.Block.Root(), self.stateDb))
- }
}
self.eventMux.Post(event)
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 7d58ffbd9..5e6f2c6a4 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -6,7 +6,6 @@ import (
"math/big"
"sort"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -38,10 +37,12 @@ type stateFn func() *state.StateDB
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- quit chan bool // Quiting channel
- currentState stateFn // The state function which will allow us to do some pre checkes
+ quit chan bool // Quiting channel
+ currentState stateFn // The state function which will allow us to do some pre checkes
+ state *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
eventMux *event.TypeMux
+ events event.Subscription
mu sync.RWMutex
txs map[common.Hash]*types.Transaction // processable transactions
@@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
+ state: state.ManageState(currentStateFn()),
}
}
func (pool *TxPool) Start() {
- // Queue timer will tick so we can attempt to move items from the queue to the
- // main transaction pool.
- queueTimer := time.NewTicker(300 * time.Millisecond)
- // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
- removalTimer := time.NewTicker(1 * time.Second)
-done:
- for {
- select {
- case <-queueTimer.C:
- pool.checkQueue()
- case <-removalTimer.C:
- pool.validatePool()
- case <-pool.quit:
- break done
+ pool.events = pool.eventMux.Subscribe(ChainEvent{})
+ for _ = range pool.events.Chan() {
+ pool.mu.Lock()
+ pool.state = state.ManageState(pool.currentState())
+
+ for _, tx := range pool.txs {
+ if addr, err := tx.From(); err == nil {
+ pool.state.SetNonce(addr, tx.Nonce())
+ }
}
+
+ pool.checkQueue()
+ pool.mu.Unlock()
}
}
+func (pool *TxPool) Stop() {
+ pool.txs = make(map[common.Hash]*types.Transaction)
+ close(pool.quit)
+ pool.events.Unsubscribe()
+ glog.V(logger.Info).Infoln("TX Pool stopped")
+}
+
+func (pool *TxPool) State() *state.ManagedState {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.state
+}
+
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
@@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
}
+ // check and validate the queueue
+ self.checkQueue()
+
return nil
}
@@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions.
func (self *TxPool) GetTransactions() (txs types.Transactions) {
- self.mu.RLock()
- defer self.mu.RUnlock()
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
+ // check queue first
+ self.checkQueue()
+ // invalidate any txs
+ self.validatePool()
txs = make(types.Transactions, len(self.txs))
i := 0
@@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (pool *TxPool) Stop() {
- pool.txs = make(map[common.Hash]*types.Transaction)
- close(pool.quit)
- glog.V(logger.Info).Infoln("TX Pool stopped")
-}
-
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated
if self.queue[from] == nil {
@@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
self.queue[from][hash] = tx
}
-func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.txs[hash]; !ok {
pool.txs[hash] = tx
+
+ pool.state.SetNonce(addr, tx.AccountNonce)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
@@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
+ state := pool.state
- statedb := pool.currentState()
var addq txQueue
for address, txs := range pool.queue {
- curnonce := statedb.GetNonce(address)
+ curnonce := state.GetNonce(address)
addq := addq[:0]
for hash, tx := range txs {
if tx.AccountNonce < curnonce {
+ fmt.Println("delete the tx", tx.AccountNonce, curnonce)
// Drop queued transactions whose nonce is lower than
// the account nonce because they have been processed.
delete(txs, hash)
} else {
// Collect the remaining transactions for the next pass.
- addq = append(addq, txQueueEntry{hash, tx})
+ addq = append(addq, txQueueEntry{hash, address, tx})
}
}
// Find the next consecutive nonce range starting at the
// current account nonce.
sort.Sort(addq)
for _, e := range addq {
- if e.AccountNonce != curnonce {
+ if e.AccountNonce > curnonce+1 {
break
}
- curnonce++
delete(txs, e.hash)
- pool.addTx(e.hash, e.Transaction)
+ pool.addTx(e.hash, address, e.Transaction)
}
// Delete the entire queue entry if it became empty.
if len(txs) == 0 {
@@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
for hash, tx := range pool.txs {
if err := pool.validateTx(tx); err != nil {
if glog.V(logger.Info) {
@@ -330,6 +343,7 @@ type txQueue []txQueueEntry
type txQueueEntry struct {
hash common.Hash
+ addr common.Address
*types.Transaction
}
diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go
index 600fd9b4f..170bdfa72 100644
--- a/core/transaction_pool_test.go
+++ b/core/transaction_pool_test.go
@@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) {
}
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
err = pool.Add(tx)
if err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds)
}
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice()))
- pool.currentState().AddBalance(from, balance)
+ pool.state.AddBalance(from, balance)
err = pool.Add(tx)
if err != ErrIntrinsicGas {
t.Error("expected", ErrIntrinsicGas, "got", err)
}
- pool.currentState().SetNonce(from, 1)
- pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff))
+ pool.state.SetNonce(from, 1)
+ pool.state.AddBalance(from, big.NewInt(0xffffffffffffff))
tx.GasLimit = big.NewInt(100000)
tx.Price = big.NewInt(1)
tx.SignECDSA(key)
@@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx)
pool.checkQueue()
@@ -76,17 +76,17 @@ func TestTransactionQueue(t *testing.T) {
}
tx = transaction()
+ tx.SetNonce(1)
tx.SignECDSA(key)
from, _ = tx.From()
- pool.currentState().SetNonce(from, 10)
- tx.SetNonce(1)
+ pool.state.SetNonce(from, 2)
pool.queueTx(tx.Hash(), tx)
pool.checkQueue()
if _, ok := pool.txs[tx.Hash()]; ok {
t.Error("expected transaction to be in tx pool")
}
- if len(pool.queue[from]) != 0 {
+ if len(pool.queue[from]) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
@@ -117,7 +117,7 @@ func TestRemoveTx(t *testing.T) {
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx)
pool.addTx(tx.Hash(), tx)
if len(pool.queue) != 1 {
@@ -146,7 +146,7 @@ func TestNegativeValue(t *testing.T) {
tx.Value().Set(big.NewInt(-1))
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
err := pool.Add(tx)
if err != ErrNegativeValue {
t.Error("expected", ErrNegativeValue, "got", err)