diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/chain_manager.go | 21 | ||||
-rw-r--r-- | core/transaction_pool.go | 88 | ||||
-rw-r--r-- | core/transaction_pool_test.go | 20 |
3 files changed, 62 insertions, 67 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index d58c0d504..d14a19fea 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB { return self.transState } -func (self *ChainManager) TxState() *state.ManagedState { - self.tsmu.RLock() - defer self.tsmu.RUnlock() - - return self.txState -} - -func (self *ChainManager) setTxState(statedb *state.StateDB) { - self.tsmu.Lock() - defer self.tsmu.Unlock() - self.txState = state.ManageState(statedb) -} - func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } @@ -751,7 +738,7 @@ out: case ev := <-events.Chan(): switch ev := ev.(type) { case queueEvent: - for i, event := range ev.queue { + for _, event := range ev.queue { switch event := event.(type) { case ChainEvent: // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long @@ -760,12 +747,6 @@ out: self.currentGasLimit = CalcGasLimit(event.Block) self.eventMux.Post(ChainHeadEvent{event.Block}) } - case ChainSplitEvent: - // On chain splits we need to reset the transaction state. We can't be sure whether the actual - // state of the accounts are still valid. - if i == ev.splitCount { - self.setTxState(state.New(event.Block.Root(), self.stateDb)) - } } self.eventMux.Post(event) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7d58ffbd9..5e6f2c6a4 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -6,7 +6,6 @@ import ( "math/big" "sort" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -38,10 +37,12 @@ type stateFn func() *state.StateDB // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - quit chan bool // Quiting channel - currentState stateFn // The state function which will allow us to do some pre checkes + quit chan bool // Quiting channel + currentState stateFn // The state function which will allow us to do some pre checkes + state *state.ManagedState gasLimit func() *big.Int // The current gas limit function callback eventMux *event.TypeMux + events event.Subscription mu sync.RWMutex txs map[common.Hash]*types.Transaction // processable transactions @@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, + state: state.ManageState(currentStateFn()), } } func (pool *TxPool) Start() { - // Queue timer will tick so we can attempt to move items from the queue to the - // main transaction pool. - queueTimer := time.NewTicker(300 * time.Millisecond) - // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) - removalTimer := time.NewTicker(1 * time.Second) -done: - for { - select { - case <-queueTimer.C: - pool.checkQueue() - case <-removalTimer.C: - pool.validatePool() - case <-pool.quit: - break done + pool.events = pool.eventMux.Subscribe(ChainEvent{}) + for _ = range pool.events.Chan() { + pool.mu.Lock() + pool.state = state.ManageState(pool.currentState()) + + for _, tx := range pool.txs { + if addr, err := tx.From(); err == nil { + pool.state.SetNonce(addr, tx.Nonce()) + } } + + pool.checkQueue() + pool.mu.Unlock() } } +func (pool *TxPool) Stop() { + pool.txs = make(map[common.Hash]*types.Transaction) + close(pool.quit) + pool.events.Unsubscribe() + glog.V(logger.Info).Infoln("TX Pool stopped") +} + +func (pool *TxPool) State() *state.ManagedState { + pool.mu.RLock() + defer pool.mu.RUnlock() + + return pool.state +} + // validateTx checks whether a transaction is valid according // to the consensus rules. func (pool *TxPool) validateTx(tx *types.Transaction) error { @@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) } + // check and validate the queueue + self.checkQueue() + return nil } @@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { // GetTransactions returns all currently processable transactions. func (self *TxPool) GetTransactions() (txs types.Transactions) { - self.mu.RLock() - defer self.mu.RUnlock() + self.mu.Lock() + defer self.mu.Unlock() + + // check queue first + self.checkQueue() + // invalidate any txs + self.validatePool() txs = make(types.Transactions, len(self.txs)) i := 0 @@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { } } -func (pool *TxPool) Stop() { - pool.txs = make(map[common.Hash]*types.Transaction) - close(pool.quit) - glog.V(logger.Info).Infoln("TX Pool stopped") -} - func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { from, _ := tx.From() // already validated if self.queue[from] == nil { @@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { self.queue[from][hash] = tx } -func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { if _, ok := pool.txs[hash]; !ok { pool.txs[hash] = tx + + pool.state.SetNonce(addr, tx.AccountNonce) // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. @@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { // checkQueue moves transactions that have become processable to main pool. func (pool *TxPool) checkQueue() { - pool.mu.Lock() - defer pool.mu.Unlock() + state := pool.state - statedb := pool.currentState() var addq txQueue for address, txs := range pool.queue { - curnonce := statedb.GetNonce(address) + curnonce := state.GetNonce(address) addq := addq[:0] for hash, tx := range txs { if tx.AccountNonce < curnonce { + fmt.Println("delete the tx", tx.AccountNonce, curnonce) // Drop queued transactions whose nonce is lower than // the account nonce because they have been processed. delete(txs, hash) } else { // Collect the remaining transactions for the next pass. - addq = append(addq, txQueueEntry{hash, tx}) + addq = append(addq, txQueueEntry{hash, address, tx}) } } // Find the next consecutive nonce range starting at the // current account nonce. sort.Sort(addq) for _, e := range addq { - if e.AccountNonce != curnonce { + if e.AccountNonce > curnonce+1 { break } - curnonce++ delete(txs, e.hash) - pool.addTx(e.hash, e.Transaction) + pool.addTx(e.hash, address, e.Transaction) } // Delete the entire queue entry if it became empty. if len(txs) == 0 { @@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) { // validatePool removes invalid and processed transactions from the main pool. func (pool *TxPool) validatePool() { - pool.mu.Lock() - defer pool.mu.Unlock() - for hash, tx := range pool.txs { if err := pool.validateTx(tx); err != nil { if glog.V(logger.Info) { @@ -330,6 +343,7 @@ type txQueue []txQueueEntry type txQueueEntry struct { hash common.Hash + addr common.Address *types.Transaction } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index 600fd9b4f..170bdfa72 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) { } from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) err = pool.Add(tx) if err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) } balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) - pool.currentState().AddBalance(from, balance) + pool.state.AddBalance(from, balance) err = pool.Add(tx) if err != ErrIntrinsicGas { t.Error("expected", ErrIntrinsicGas, "got", err) } - pool.currentState().SetNonce(from, 1) - pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff)) + pool.state.SetNonce(from, 1) + pool.state.AddBalance(from, big.NewInt(0xffffffffffffff)) tx.GasLimit = big.NewInt(100000) tx.Price = big.NewInt(1) tx.SignECDSA(key) @@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) { tx := transaction() tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) pool.queueTx(tx.Hash(), tx) pool.checkQueue() @@ -76,17 +76,17 @@ func TestTransactionQueue(t *testing.T) { } tx = transaction() + tx.SetNonce(1) tx.SignECDSA(key) from, _ = tx.From() - pool.currentState().SetNonce(from, 10) - tx.SetNonce(1) + pool.state.SetNonce(from, 2) pool.queueTx(tx.Hash(), tx) pool.checkQueue() if _, ok := pool.txs[tx.Hash()]; ok { t.Error("expected transaction to be in tx pool") } - if len(pool.queue[from]) != 0 { + if len(pool.queue[from]) > 0 { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } @@ -117,7 +117,7 @@ func TestRemoveTx(t *testing.T) { tx := transaction() tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) pool.queueTx(tx.Hash(), tx) pool.addTx(tx.Hash(), tx) if len(pool.queue) != 1 { @@ -146,7 +146,7 @@ func TestNegativeValue(t *testing.T) { tx.Value().Set(big.NewInt(-1)) tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) err := pool.Add(tx) if err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) |