aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-06-04 04:22:20 +0800
committerobscuren <geffobscura@gmail.com>2015-06-04 04:43:23 +0800
commitd09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8 (patch)
treeec8f89f562417a379c45c25d58b5de9b45bc0b04
parent5197aed7dbba2ac19d99221efe33fede82007f5d (diff)
downloaddexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.tar
dexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.tar.gz
dexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.tar.bz2
dexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.tar.lz
dexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.tar.xz
dexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.tar.zst
dexon-d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8.zip
core, eth, miner: moved nonce management to tx pool.
Removed the managed tx state from the chain manager to the transaction pool where it's much easier to keep track of nonces (and manage them). The transaction pool now also uses the queue and pending txs differently where queued txs are now moved over to the pending queue (i.e. txs ready for processing and propagation).
-rw-r--r--core/chain_manager.go21
-rw-r--r--core/transaction_pool.go88
-rw-r--r--core/transaction_pool_test.go20
-rw-r--r--eth/backend.go29
-rw-r--r--miner/worker.go4
-rw-r--r--xeth/xeth.go8
6 files changed, 66 insertions, 104 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index d58c0d504..d14a19fea 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB {
return self.transState
}
-func (self *ChainManager) TxState() *state.ManagedState {
- self.tsmu.RLock()
- defer self.tsmu.RUnlock()
-
- return self.txState
-}
-
-func (self *ChainManager) setTxState(statedb *state.StateDB) {
- self.tsmu.Lock()
- defer self.tsmu.Unlock()
- self.txState = state.ManageState(statedb)
-}
-
func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb
}
@@ -751,7 +738,7 @@ out:
case ev := <-events.Chan():
switch ev := ev.(type) {
case queueEvent:
- for i, event := range ev.queue {
+ for _, event := range ev.queue {
switch event := event.(type) {
case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
@@ -760,12 +747,6 @@ out:
self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block})
}
- case ChainSplitEvent:
- // On chain splits we need to reset the transaction state. We can't be sure whether the actual
- // state of the accounts are still valid.
- if i == ev.splitCount {
- self.setTxState(state.New(event.Block.Root(), self.stateDb))
- }
}
self.eventMux.Post(event)
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 7d58ffbd9..5e6f2c6a4 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -6,7 +6,6 @@ import (
"math/big"
"sort"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -38,10 +37,12 @@ type stateFn func() *state.StateDB
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- quit chan bool // Quiting channel
- currentState stateFn // The state function which will allow us to do some pre checkes
+ quit chan bool // Quiting channel
+ currentState stateFn // The state function which will allow us to do some pre checkes
+ state *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
eventMux *event.TypeMux
+ events event.Subscription
mu sync.RWMutex
txs map[common.Hash]*types.Transaction // processable transactions
@@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
+ state: state.ManageState(currentStateFn()),
}
}
func (pool *TxPool) Start() {
- // Queue timer will tick so we can attempt to move items from the queue to the
- // main transaction pool.
- queueTimer := time.NewTicker(300 * time.Millisecond)
- // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
- removalTimer := time.NewTicker(1 * time.Second)
-done:
- for {
- select {
- case <-queueTimer.C:
- pool.checkQueue()
- case <-removalTimer.C:
- pool.validatePool()
- case <-pool.quit:
- break done
+ pool.events = pool.eventMux.Subscribe(ChainEvent{})
+ for _ = range pool.events.Chan() {
+ pool.mu.Lock()
+ pool.state = state.ManageState(pool.currentState())
+
+ for _, tx := range pool.txs {
+ if addr, err := tx.From(); err == nil {
+ pool.state.SetNonce(addr, tx.Nonce())
+ }
}
+
+ pool.checkQueue()
+ pool.mu.Unlock()
}
}
+func (pool *TxPool) Stop() {
+ pool.txs = make(map[common.Hash]*types.Transaction)
+ close(pool.quit)
+ pool.events.Unsubscribe()
+ glog.V(logger.Info).Infoln("TX Pool stopped")
+}
+
+func (pool *TxPool) State() *state.ManagedState {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.state
+}
+
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
@@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
}
+ // check and validate the queueue
+ self.checkQueue()
+
return nil
}
@@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions.
func (self *TxPool) GetTransactions() (txs types.Transactions) {
- self.mu.RLock()
- defer self.mu.RUnlock()
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
+ // check queue first
+ self.checkQueue()
+ // invalidate any txs
+ self.validatePool()
txs = make(types.Transactions, len(self.txs))
i := 0
@@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (pool *TxPool) Stop() {
- pool.txs = make(map[common.Hash]*types.Transaction)
- close(pool.quit)
- glog.V(logger.Info).Infoln("TX Pool stopped")
-}
-
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated
if self.queue[from] == nil {
@@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
self.queue[from][hash] = tx
}
-func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.txs[hash]; !ok {
pool.txs[hash] = tx
+
+ pool.state.SetNonce(addr, tx.AccountNonce)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
@@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
+ state := pool.state
- statedb := pool.currentState()
var addq txQueue
for address, txs := range pool.queue {
- curnonce := statedb.GetNonce(address)
+ curnonce := state.GetNonce(address)
addq := addq[:0]
for hash, tx := range txs {
if tx.AccountNonce < curnonce {
+ fmt.Println("delete the tx", tx.AccountNonce, curnonce)
// Drop queued transactions whose nonce is lower than
// the account nonce because they have been processed.
delete(txs, hash)
} else {
// Collect the remaining transactions for the next pass.
- addq = append(addq, txQueueEntry{hash, tx})
+ addq = append(addq, txQueueEntry{hash, address, tx})
}
}
// Find the next consecutive nonce range starting at the
// current account nonce.
sort.Sort(addq)
for _, e := range addq {
- if e.AccountNonce != curnonce {
+ if e.AccountNonce > curnonce+1 {
break
}
- curnonce++
delete(txs, e.hash)
- pool.addTx(e.hash, e.Transaction)
+ pool.addTx(e.hash, address, e.Transaction)
}
// Delete the entire queue entry if it became empty.
if len(txs) == 0 {
@@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
for hash, tx := range pool.txs {
if err := pool.validateTx(tx); err != nil {
if glog.V(logger.Info) {
@@ -330,6 +343,7 @@ type txQueue []txQueueEntry
type txQueueEntry struct {
hash common.Hash
+ addr common.Address
*types.Transaction
}
diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go
index 600fd9b4f..170bdfa72 100644
--- a/core/transaction_pool_test.go
+++ b/core/transaction_pool_test.go
@@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) {
}
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
err = pool.Add(tx)
if err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds)
}
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice()))
- pool.currentState().AddBalance(from, balance)
+ pool.state.AddBalance(from, balance)
err = pool.Add(tx)
if err != ErrIntrinsicGas {
t.Error("expected", ErrIntrinsicGas, "got", err)
}
- pool.currentState().SetNonce(from, 1)
- pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff))
+ pool.state.SetNonce(from, 1)
+ pool.state.AddBalance(from, big.NewInt(0xffffffffffffff))
tx.GasLimit = big.NewInt(100000)
tx.Price = big.NewInt(1)
tx.SignECDSA(key)
@@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx)
pool.checkQueue()
@@ -76,17 +76,17 @@ func TestTransactionQueue(t *testing.T) {
}
tx = transaction()
+ tx.SetNonce(1)
tx.SignECDSA(key)
from, _ = tx.From()
- pool.currentState().SetNonce(from, 10)
- tx.SetNonce(1)
+ pool.state.SetNonce(from, 2)
pool.queueTx(tx.Hash(), tx)
pool.checkQueue()
if _, ok := pool.txs[tx.Hash()]; ok {
t.Error("expected transaction to be in tx pool")
}
- if len(pool.queue[from]) != 0 {
+ if len(pool.queue[from]) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
@@ -117,7 +117,7 @@ func TestRemoveTx(t *testing.T) {
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx)
pool.addTx(tx.Hash(), tx)
if len(pool.queue) != 1 {
@@ -146,7 +146,7 @@ func TestNegativeValue(t *testing.T) {
tx.Value().Set(big.NewInt(-1))
tx.SignECDSA(key)
from, _ := tx.From()
- pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.state.AddBalance(from, big.NewInt(1))
err := pool.Add(tx)
if err != ErrNegativeValue {
t.Error("expected", ErrNegativeValue, "got", err)
diff --git a/eth/backend.go b/eth/backend.go
index 724763a52..3956dfcaa 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -198,7 +198,6 @@ type Ethereum struct {
net *p2p.Server
eventMux *event.TypeMux
- txSub event.Subscription
miner *miner.Miner
// logger logger.LogSystem
@@ -470,10 +469,6 @@ func (s *Ethereum) Start() error {
s.whisper.Start()
}
- // broadcast transactions
- s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
- go s.txBroadcastLoop()
-
glog.V(logger.Info).Infoln("Server started")
return nil
}
@@ -531,8 +526,6 @@ func (self *Ethereum) AddPeer(nodeURL string) error {
}
func (s *Ethereum) Stop() {
- s.txSub.Unsubscribe() // quits txBroadcastLoop
-
s.net.Stop()
s.protocolManager.Stop()
s.chainManager.Stop()
@@ -552,28 +545,6 @@ func (s *Ethereum) WaitForShutdown() {
<-s.shutdownChan
}
-func (self *Ethereum) txBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.txSub.Chan() {
- event := obj.(core.TxPreEvent)
- self.syncAccounts(event.Tx)
- }
-}
-
-// keep accounts synced up
-func (self *Ethereum) syncAccounts(tx *types.Transaction) {
- from, err := tx.From()
- if err != nil {
- return
- }
-
- if self.accountManager.HasAccount(from) {
- if self.chainManager.TxState().GetNonce(from) < tx.Nonce() {
- self.chainManager.TxState().SetNonce(from, tx.Nonce())
- }
- }
-}
-
// StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval
// by default that is 10 times per epoch
// in epoch n, if we past autoDAGepochHeight within-epoch blocks,
diff --git a/miner/worker.go b/miner/worker.go
index 58efd61db..1580d4d42 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -494,10 +494,6 @@ func (self *worker) commitTransactions(transactions types.Transactions) {
err := self.commitTransaction(tx)
switch {
case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
- // Remove invalid transactions
- from, _ := tx.From()
-
- self.chain.TxState().RemoveNonce(from, tx.Nonce())
current.remove.Add(tx.Hash())
if glog.V(logger.Detail) {
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 157fe76c7..187892a49 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -936,22 +936,22 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS
tx = types.NewTransactionMessage(to, value, gas, price, data)
}
- state := self.backend.ChainManager().TxState()
+ state := self.backend.TxPool().State()
var nonce uint64
if len(nonceStr) != 0 {
nonce = common.Big(nonceStr).Uint64()
} else {
- nonce = state.NewNonce(from)
+ nonce = state.GetNonce(from) + 1 //state.NewNonce(from)
}
tx.SetNonce(nonce)
if err := self.sign(tx, from, false); err != nil {
- state.RemoveNonce(from, tx.Nonce())
+ //state.RemoveNonce(from, tx.Nonce())
return "", err
}
if err := self.backend.TxPool().Add(tx); err != nil {
- state.RemoveNonce(from, tx.Nonce())
+ //state.RemoveNonce(from, tx.Nonce())
return "", err
}