aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/chain_manager.go21
-rw-r--r--core/transaction_pool.go88
-rw-r--r--core/transaction_pool_test.go20
-rw-r--r--eth/backend.go29
-rw-r--r--miner/worker.go4
-rw-r--r--xeth/xeth.go8
6 files changed, 66 insertions, 104 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index d58c0d504..d14a19fea 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB {
return self.transState
}
-func (self *ChainManager) TxState() *state.ManagedState {
- self.tsmu.RLock()
- defer self.tsmu.RUnlock()
-
- return self.txState
-}
-
-func (self *ChainManager) setTxState(statedb *state.StateDB) {
- self.tsmu.Lock()
- defer self.tsmu.Unlock()
- self.txState = state.ManageState(statedb)
-}
-
func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb
}
@@ -751,7 +738,7 @@ out:
case ev := <-events.Chan():
switch ev := ev.(type) {
case queueEvent:
- for i, event := range ev.queue {
+ for _, event := range ev.queue {
switch event := event.(type) {
case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
@@ -760,12 +747,6 @@ out:
self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block})
}
- case ChainSplitEvent:
- // On chain splits we need to reset the transaction state. We can't be sure whether the actual
- // state of the accounts are still valid.
- if i == ev.splitCount {
- self.setTxState(state.New(event.Block.Root(), self.stateDb))
- }
}
self.eventMux.Post(event)
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 7d58ffbd9..5e6f2c6a4 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -6,7 +6,6 @@ import (
"math/big"
"sort"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -38,10 +37,12 @@ type stateFn func() *state.StateDB
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- quit chan bool // Quiting channel
- currentState stateFn // The state function which will allow us to do some pre checkes
+ quit chan bool // Quiting channel
+ currentState stateFn // The state function which will allow us to do some pre checkes
+ state *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
eventMux *event.TypeMux
+ events event.Subscription
mu sync.RWMutex
txs map[common.Hash]*types.Transaction // processable transactions
@@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
+ state: state.ManageState(currentStateFn()),
}
}
func (pool *TxPool) Start() {
- // Queue timer will tick so we can attempt to move items from the queue to the
- // main transaction pool.
- queueTimer := time.NewTicker(300 * time.Millisecond)
- // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
- removalTimer := time.NewTicker(1 * time.Second)
-done:
- for {
- select {
- case <-queueTimer.C:
- pool.checkQueue()
- case <-removalTimer.C:
- pool.validatePool()
- case <-pool.quit:
- break done
+ pool.events = pool.eventMux.Subscribe(ChainEvent{})
+ for _ = range pool.events.Chan() {
+ pool.mu.Lock()
+ pool.state = state.ManageState(pool.currentState())
+
+ for _, tx := range pool.txs {
+ if addr, err := tx.From(); err == nil {
+ pool.state.SetNonce(addr, tx.Nonce())
+ }
}
+
+ pool.checkQueue()
+ pool.mu.Unlock()
}
}
+func (pool *TxPool) Stop() {
+ pool.txs = make(map[common.Hash]*types.Transaction)
+ close(pool.quit)
+ pool.events.Unsubscribe()
+ glog.V(logger.Info).Infoln("TX Pool stopped")
+}
+
+func (pool *TxPool) State() *state.ManagedState {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.state
+}
+
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
@@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
}
+ // check and validate the queueue
+ self.checkQueue()
+
return nil
}
@@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions.
func (self *TxPool) GetTransactions() (txs types.Transactions) {
- self.mu.RLock()
- defer self.mu.RUnlock()
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
+ // check queue first
+ self.checkQueue()
+ // invalidate any txs
+ self.validatePool()
txs = make(types.Transactions, len(self.txs))
i := 0
@@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (pool *TxPool) Stop() {
- pool.txs = make(map[common.Hash]*types.Transaction)
- close(pool.quit)
- glog.V(logger.Info).Infoln("TX Pool stopped")
-}
-
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated
if self.queue[from] == nil {
@@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
self.queue[from][hash] = tx
}
-func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.txs[hash]; !ok {
pool.txs[hash] = tx
+
+ pool.state.SetNonce(addr, tx.AccountNonce)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
@@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
+ state := pool.state
- statedb := pool.currentState()
var addq txQueue
for address, txs := range pool.queue {
- curnonce := statedb.GetNonce(address)
+ curnonce := state.GetNonce(address)
addq := addq[:0]
for hash, tx := range txs {
if tx.AccountNonce < curnonce {
+ fmt.Println("delete the tx", tx.AccountNonce, curnonce)
// Drop queued transactions whose nonce is lower than
// the account nonce because they have been processed.
delete(txs, hash)
} else {
// Collect the remaining transactions for the next pass.
- addq = append(addq, txQueueEntry{hash, tx})
+ addq = append(addq, txQueueEntry{hash, address, tx})
}
}
// Find the next consecutive nonce range starting at the
// current account nonce.
sort.Sort(addq)
for _, e := range addq {
- if e.AccountNonce != curnonce {
+ if e.AccountNonce > curnonce+1 {
break
}
- curnonce++
delete(txs, e.hash)
- pool.addTx(e.hash, e.Transaction)
+ pool.addTx(e.hash, address, e.Transaction)
}
// Delete the entire queue entry if it became empty.
if len(txs) == 0 {
@@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
for hash, tx := range pool.txs {
if err := pool.validateTx(tx); err != nil {
if glog.V(logger.Info) {
@@ -330,6 +343,7 @@ type txQueue []txQueueEntry
type txQueueEntry struct {
hash common.Hash
+ addr common.Address
*types.Transaction
}
diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go
index 600fd9b4f..170bdfa72 100644
--- a/core/transaction_pool_test.go
+++ b/core/transaction_pool_test.go
@@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) {
}
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
err = pool.Add(tx)
if err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds)
}
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice()))
- pool.currentState().AddBalance(from, balance)
+ pool.state.AddBalance(from, balance)
err = pool.Add(tx)
if err != ErrIntrinsicGas {
t.Error("expected", ErrIntrinsicGas, "got", err)
}
- pool.currentState().SetNonce(from, 1)
- pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff))
+ pool.state.SetNonce(from, 1)
+ pool.state.AddBalance(from, big.NewInt(0xffffffffffffff))
tx.GasLimit = big.NewInt(100000)
tx.Price = big.NewInt(1)
tx.SignECDSA(key)
@@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx)
pool.checkQueue()
@@ -76,17 +76,17 @@ func TestTransactionQueue(t *testing.T) {
}
tx = transaction()
+ tx.SetNonce(1)
tx.SignECDSA(key)
from, _ = tx.From()
- pool.currentState().SetNonce(from, 10)
- tx.SetNonce(1)
+ pool.state.SetNonce(from, 2)
pool.queueTx(tx.Hash(), tx)
pool.checkQueue()
if _, ok := pool.txs[tx.Hash()]; ok {
t.Error("expected transaction to be in tx pool")
}
- if len(pool.queue[from]) != 0 {
+ if len(pool.queue[from]) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
@@ -117,7 +117,7 @@ func TestRemoveTx(t *testing.T) {
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx)
pool.addTx(tx.Hash(), tx)
if len(pool.queue) != 1 {
@@ -146,7 +146,7 @@ func TestNegativeValue(t *testing.T) {
tx.Value().Set(big.NewInt(-1))
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
err := pool.Add(tx)
if err != ErrNegativeValue {
t.Error("expected", ErrNegativeValue, "got", err)
diff --git a/eth/backend.go b/eth/backend.go
index 724763a52..3956dfcaa 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -198,7 +198,6 @@ type Ethereum struct {
net *p2p.Server
eventMux *event.TypeMux
- txSub event.Subscription
miner *miner.Miner
// logger logger.LogSystem
@@ -470,10 +469,6 @@ func (s *Ethereum) Start() error {
s.whisper.Start()
}
- // broadcast transactions
- s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
- go s.txBroadcastLoop()
-
glog.V(logger.Info).Infoln("Server started")
return nil
}
@@ -531,8 +526,6 @@ func (self *Ethereum) AddPeer(nodeURL string) error {
}
func (s *Ethereum) Stop() {
- s.txSub.Unsubscribe() // quits txBroadcastLoop
-
s.net.Stop()
s.protocolManager.Stop()
s.chainManager.Stop()
@@ -552,28 +545,6 @@ func (s *Ethereum) WaitForShutdown() {
<-s.shutdownChan
}
-func (self *Ethereum) txBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.txSub.Chan() {
- event := obj.(core.TxPreEvent)
- self.syncAccounts(event.Tx)
- }
-}
-
-// keep accounts synced up
-func (self *Ethereum) syncAccounts(tx *types.Transaction) {
- from, err := tx.From()
- if err != nil {
- return
- }
-
- if self.accountManager.HasAccount(from) {
- if self.chainManager.TxState().GetNonce(from) < tx.Nonce() {
- self.chainManager.TxState().SetNonce(from, tx.Nonce())
- }
- }
-}
-
// StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval
// by default that is 10 times per epoch
// in epoch n, if we past autoDAGepochHeight within-epoch blocks,
diff --git a/miner/worker.go b/miner/worker.go
index 58efd61db..1580d4d42 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -494,10 +494,6 @@ func (self *worker) commitTransactions(transactions types.Transactions) {
err := self.commitTransaction(tx)
switch {
case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
- // Remove invalid transactions
- from, _ := tx.From()
-
- self.chain.TxState().RemoveNonce(from, tx.Nonce())
current.remove.Add(tx.Hash())
if glog.V(logger.Detail) {
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 157fe76c7..187892a49 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -936,22 +936,22 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS
tx = types.NewTransactionMessage(to, value, gas, price, data)
}
- state := self.backend.ChainManager().TxState()
+ state := self.backend.TxPool().State()
var nonce uint64
if len(nonceStr) != 0 {
nonce = common.Big(nonceStr).Uint64()
} else {
- nonce = state.NewNonce(from)
+ nonce = state.GetNonce(from) + 1 //state.NewNonce(from)
}
tx.SetNonce(nonce)
if err := self.sign(tx, from, false); err != nil {
- state.RemoveNonce(from, tx.Nonce())
+ //state.RemoveNonce(from, tx.Nonce())
return "", err
}
if err := self.backend.TxPool().Add(tx); err != nil {
- state.RemoveNonce(from, tx.Nonce())
+ //state.RemoveNonce(from, tx.Nonce())
return "", err
}