From 65a48f9cd8461917d8047b1cd4901d068b61ff00 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 9 Jun 2015 23:46:56 +0200 Subject: core: fixed race condition in the transaction pool Removed `Stop/Start` mechanism from the transaction pool. --- core/transaction_pool.go | 60 ++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 30 deletions(-) (limited to 'core') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index a2f970195..b63a4dcab 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -50,7 +50,7 @@ type TxPool struct { } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { - return &TxPool{ + pool := &TxPool{ pending: make(map[common.Hash]*types.Transaction), queue: make(map[common.Address]map[common.Hash]*types.Transaction), quit: make(chan bool), @@ -59,9 +59,12 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( gasLimit: gasLimitFn, pendingState: state.ManageState(currentStateFn()), } + go pool.eventLoop() + + return pool } -func (pool *TxPool) Start() { +func (pool *TxPool) eventLoop() { // Track chain events. When a chain events occurs (new chain canon block) // we need to know the new state. The new state will help us determine // the nonces in the managed state @@ -169,15 +172,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { return nil } +// validate and queue transactions. func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() - /* XXX I'm unsure about this. This is extremely dangerous and may result - in total black listing of certain transactions - if self.invalidHashes.Has(hash) { - return fmt.Errorf("Invalid transaction (%x)", hash[:4]) - } - */ if self.pending[hash] != nil { return fmt.Errorf("Known transaction (%x)", hash[:4]) } @@ -207,6 +205,30 @@ func (self *TxPool) add(tx *types.Transaction) error { return nil } +// queueTx will queue an unknown transaction +func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { + from, _ := tx.From() // already validated + if self.queue[from] == nil { + self.queue[from] = make(map[common.Hash]*types.Transaction) + } + self.queue[from][hash] = tx +} + +// addTx will add a transaction to the pending (processable queue) list of transactions +func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { + if _, ok := pool.pending[hash]; !ok { + pool.pending[hash] = tx + + // Increment the nonce on the pending state. This can only happen if + // the nonce is +1 to the previous one. + pool.pendingState.SetNonce(addr, tx.AccountNonce+1) + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) + } +} + // Add queues a single transaction in the pool if it is valid. func (self *TxPool) Add(tx *types.Transaction) error { self.mu.Lock() @@ -290,28 +312,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { } } -func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { - from, _ := tx.From() // already validated - if self.queue[from] == nil { - self.queue[from] = make(map[common.Hash]*types.Transaction) - } - self.queue[from][hash] = tx -} - -func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { - if _, ok := pool.pending[hash]; !ok { - pool.pending[hash] = tx - - // Increment the nonce on the pending state. This can only happen if - // the nonce is +1 to the previous one. - pool.pendingState.SetNonce(addr, tx.AccountNonce+1) - // Notify the subscribers. This event is posted in a goroutine - // because it's possible that somewhere during the post "Remove transaction" - // gets called which will then wait for the global tx pool lock and deadlock. - go pool.eventMux.Post(TxPreEvent{tx}) - } -} - // checkQueue moves transactions that have become processable to main pool. func (pool *TxPool) checkQueue() { state := pool.pendingState -- cgit v1.2.3