diff options
author | obscuren <geffobscura@gmail.com> | 2015-06-10 03:12:25 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-06-10 03:12:25 +0800 |
commit | bac9a94ddf20dc530966cbf6cd384aaf94aedc77 (patch) | |
tree | 0ced967e60315698cc5056a984d7678c417bc1ce /core/transaction_pool.go | |
parent | 0e703d92ac9df61e2ededa8c895c70ded101a607 (diff) | |
parent | 14994fa21bf6f05554ff370d41005d06b68d20a5 (diff) | |
download | go-tangerine-0.9.28.tar go-tangerine-0.9.28.tar.gz go-tangerine-0.9.28.tar.bz2 go-tangerine-0.9.28.tar.lz go-tangerine-0.9.28.tar.xz go-tangerine-0.9.28.tar.zst go-tangerine-0.9.28.zip |
Merge branch 'release/0.9.28'v0.9.28
Diffstat (limited to 'core/transaction_pool.go')
-rw-r--r-- | core/transaction_pool.go | 371 |
1 files changed, 197 insertions, 174 deletions
diff --git a/core/transaction_pool.go b/core/transaction_pool.go index c896488d1..a2f970195 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -6,7 +6,6 @@ import ( "math/big" "sort" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -14,10 +13,10 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" - "gopkg.in/fatih/set.v0" ) var ( + // Transaction Pool Errors ErrInvalidSender = errors.New("Invalid sender") ErrNonce = errors.New("Nonce too low") ErrBalance = errors.New("Insufficient balance") @@ -28,118 +27,141 @@ var ( ErrNegativeValue = errors.New("Negative value") ) -const txPoolQueueSize = 50 - -type TxPoolHook chan *types.Transaction -type TxMsg struct{ Tx *types.Transaction } - type stateFn func() *state.StateDB -const ( - minGasPrice = 1000000 -) - -type TxProcessor interface { - ProcessTransaction(tx *types.Transaction) -} - -// The tx pool a thread safe transaction pool handler. In order to -// guarantee a non blocking pool we use a queue channel which can be -// independently read without needing access to the actual pool. +// TxPool contains all currently known transactions. Transactions +// enter the pool when they are received from the network or submitted +// locally. They exit the pool when they are included in the blockchain. +// +// The pool separates processable transactions (which can be applied to the +// current state) and future transactions. Transactions move between those +// two states over time as they are received and processed. type TxPool struct { - mu sync.RWMutex - // Queueing channel for reading and writing incoming - // transactions to - queueChan chan *types.Transaction - // Quiting channel - quit chan bool - // The state function which will allow us to do some pre checkes - currentState stateFn - // The current gas limit function callback - gasLimit func() *big.Int - // The actual pool - txs map[common.Hash]*types.Transaction - invalidHashes *set.Set - - queue map[common.Address]types.Transactions - - subscribers []chan TxMsg - - eventMux *event.TypeMux + quit chan bool // Quiting channel + currentState stateFn // The state function which will allow us to do some pre checkes + pendingState *state.ManagedState + gasLimit func() *big.Int // The current gas limit function callback + eventMux *event.TypeMux + events event.Subscription + + mu sync.RWMutex + pending map[common.Hash]*types.Transaction // processable transactions + queue map[common.Address]map[common.Hash]*types.Transaction } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { - txPool := &TxPool{ - txs: make(map[common.Hash]*types.Transaction), - queue: make(map[common.Address]types.Transactions), - queueChan: make(chan *types.Transaction, txPoolQueueSize), - quit: make(chan bool), - eventMux: eventMux, - invalidHashes: set.New(), - currentState: currentStateFn, - gasLimit: gasLimitFn, + return &TxPool{ + pending: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]map[common.Hash]*types.Transaction), + quit: make(chan bool), + eventMux: eventMux, + currentState: currentStateFn, + gasLimit: gasLimitFn, + pendingState: state.ManageState(currentStateFn()), } - return txPool } func (pool *TxPool) Start() { - // Queue timer will tick so we can attempt to move items from the queue to the - // main transaction pool. - queueTimer := time.NewTicker(300 * time.Millisecond) - // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) - removalTimer := time.NewTicker(1 * time.Second) -done: - for { - select { - case <-queueTimer.C: - pool.checkQueue() - case <-removalTimer.C: - pool.validatePool() - case <-pool.quit: - break done + // Track chain events. When a chain events occurs (new chain canon block) + // we need to know the new state. The new state will help us determine + // the nonces in the managed state + pool.events = pool.eventMux.Subscribe(ChainEvent{}) + for _ = range pool.events.Chan() { + pool.mu.Lock() + + pool.resetState() + + pool.mu.Unlock() + } +} + +func (pool *TxPool) resetState() { + pool.pendingState = state.ManageState(pool.currentState()) + + // validate the pool of pending transactions, this will remove + // any transactions that have been included in the block or + // have been invalidated because of another transaction (e.g. + // higher gas price) + pool.validatePool() + + // Loop over the pending transactions and base the nonce of the new + // pending transaction set. + for _, tx := range pool.pending { + if addr, err := tx.From(); err == nil { + // Set the nonce. Transaction nonce can never be lower + // than the state nonce; validatePool took care of that. + pool.pendingState.SetNonce(addr, tx.Nonce()) } } + + // Check the queue and move transactions over to the pending if possible + // or remove those that have become invalid + pool.checkQueue() +} + +func (pool *TxPool) Stop() { + pool.pending = make(map[common.Hash]*types.Transaction) + close(pool.quit) + pool.events.Unsubscribe() + glog.V(logger.Info).Infoln("TX Pool stopped") } -func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { +func (pool *TxPool) State() *state.ManagedState { + pool.mu.RLock() + defer pool.mu.RUnlock() + + return pool.pendingState +} + +// validateTx checks whether a transaction is valid according +// to the consensus rules. +func (pool *TxPool) validateTx(tx *types.Transaction) error { // Validate sender var ( from common.Address err error ) + // Validate the transaction sender and it's sig. Throw + // if the from fields is invalid. if from, err = tx.From(); err != nil { return ErrInvalidSender } - // Validate curve param - v, _, _ := tx.Curve() - if v > 28 || v < 27 { - return fmt.Errorf("tx.v != (28 || 27) => %v", v) - } - + // Make sure the account exist. Non existant accounts + // haven't got funds and well therefor never pass. if !pool.currentState().HasAccount(from) { return ErrNonExistentAccount } + // Check the transaction doesn't exceed the current + // block limit gas. if pool.gasLimit().Cmp(tx.GasLimit) < 0 { return ErrGasLimit } + // Transactions can't be negative. This may never happen + // using RLP decoded transactions but may occur if you create + // a transaction using the RPC for example. if tx.Amount.Cmp(common.Big0) < 0 { return ErrNegativeValue } + // Transactor should have enough funds to cover the costs + // cost == V + GP * GL total := new(big.Int).Mul(tx.Price, tx.GasLimit) total.Add(total, tx.Value()) if pool.currentState().GetBalance(from).Cmp(total) < 0 { return ErrInsufficientFunds } + // Should supply enough intrinsic gas if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 { return ErrIntrinsicGas } + // Last but not least check for nonce errors (intensive + // operation, saved for last) if pool.currentState().GetNonce(from) > tx.Nonce() { return ErrNonce } @@ -156,38 +178,36 @@ func (self *TxPool) add(tx *types.Transaction) error { return fmt.Errorf("Invalid transaction (%x)", hash[:4]) } */ - if self.txs[hash] != nil { + if self.pending[hash] != nil { return fmt.Errorf("Known transaction (%x)", hash[:4]) } - err := self.ValidateTransaction(tx) + err := self.validateTx(tx) if err != nil { return err } - - self.queueTx(tx) - - var toname string - if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" - } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) + self.queueTx(hash, tx) if glog.V(logger.Debug) { - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) + var toname string + if to := tx.To(); to != nil { + toname = common.Bytes2Hex(to[:4]) + } else { + toname = "[NEW_CONTRACT]" + } + // we can ignore the error here because From is + // verified in ValidateTransaction. + f, _ := tx.From() + from := common.Bytes2Hex(f[:4]) + glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) } - return nil -} + // check and validate the queueue + self.checkQueue() -func (self *TxPool) Size() int { - return len(self.txs) + return nil } +// Add queues a single transaction in the pool if it is valid. func (self *TxPool) Add(tx *types.Transaction) error { self.mu.Lock() defer self.mu.Unlock() @@ -195,6 +215,7 @@ func (self *TxPool) Add(tx *types.Transaction) error { return self.add(tx) } +// AddTransactions attempts to queue all valid transactions in txs. func (self *TxPool) AddTransactions(txs []*types.Transaction) { self.mu.Lock() defer self.mu.Unlock() @@ -209,81 +230,81 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) { } } -// GetTransaction allows you to check the pending and queued transaction in the -// transaction pool. -// It has two stategies, first check the pool (map) then check the queue +// GetTransaction returns a transaction if it is contained in the pool +// and nil otherwise. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { // check the txs first - if tx, ok := tp.txs[hash]; ok { + if tx, ok := tp.pending[hash]; ok { return tx } - // check queue for _, txs := range tp.queue { - for _, tx := range txs { - if tx.Hash() == hash { - return tx - } + if tx, ok := txs[hash]; ok { + return tx } } - return nil } +// GetTransactions returns all currently processable transactions. +// The returned slice may be modified by the caller. func (self *TxPool) GetTransactions() (txs types.Transactions) { - self.mu.RLock() - defer self.mu.RUnlock() + self.mu.Lock() + defer self.mu.Unlock() + + // check queue first + self.checkQueue() + // invalidate any txs + self.validatePool() - txs = make(types.Transactions, self.Size()) + txs = make(types.Transactions, len(self.pending)) i := 0 - for _, tx := range self.txs { + for _, tx := range self.pending { txs[i] = tx i++ } - - return + return txs } +// GetQueuedTransactions returns all non-processable transactions. func (self *TxPool) GetQueuedTransactions() types.Transactions { self.mu.RLock() defer self.mu.RUnlock() - var txs types.Transactions - for _, ts := range self.queue { - txs = append(txs, ts...) + var ret types.Transactions + for _, txs := range self.queue { + for _, tx := range txs { + ret = append(ret, tx) + } } - - return txs + sort.Sort(types.TxByNonce{ret}) + return ret } +// RemoveTransactions removes all given transactions from the pool. func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() - for _, tx := range txs { self.removeTx(tx.Hash()) } } -func (pool *TxPool) Flush() { - pool.txs = make(map[common.Hash]*types.Transaction) -} - -func (pool *TxPool) Stop() { - pool.Flush() - close(pool.quit) - - glog.V(logger.Info).Infoln("TX Pool stopped") +func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { + from, _ := tx.From() // already validated + if self.queue[from] == nil { + self.queue[from] = make(map[common.Hash]*types.Transaction) + } + self.queue[from][hash] = tx } -func (self *TxPool) queueTx(tx *types.Transaction) { - from, _ := tx.From() - self.queue[from] = append(self.queue[from], tx) -} +func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { + if _, ok := pool.pending[hash]; !ok { + pool.pending[hash] = tx -func (pool *TxPool) addTx(tx *types.Transaction) { - if _, ok := pool.txs[tx.Hash()]; !ok { - pool.txs[tx.Hash()] = tx + // Increment the nonce on the pending state. This can only happen if + // the nonce is +1 to the previous one. + pool.pendingState.SetNonce(addr, tx.AccountNonce+1) // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. @@ -291,42 +312,39 @@ func (pool *TxPool) addTx(tx *types.Transaction) { } } -// check queue will attempt to insert +// checkQueue moves transactions that have become processable to main pool. func (pool *TxPool) checkQueue() { - pool.mu.Lock() - defer pool.mu.Unlock() + state := pool.pendingState - statedb := pool.currentState() + var addq txQueue for address, txs := range pool.queue { - sort.Sort(types.TxByNonce{txs}) - - var ( - nonce = statedb.GetNonce(address) - start int - ) - // Clean up the transactions first and determine the start of the nonces - for _, tx := range txs { - if tx.Nonce() >= nonce { - break + // guessed nonce is the nonce currently kept by the tx pool (pending state) + guessedNonce := state.GetNonce(address) + // true nonce is the nonce known by the last state + trueNonce := pool.currentState().GetNonce(address) + addq := addq[:0] + for hash, tx := range txs { + if tx.AccountNonce < trueNonce { + // Drop queued transactions whose nonce is lower than + // the account nonce because they have been processed. + delete(txs, hash) + } else { + // Collect the remaining transactions for the next pass. + addq = append(addq, txQueueEntry{hash, address, tx}) } - start++ } - pool.queue[address] = txs[start:] - - // expected nonce - enonce := nonce - for _, tx := range pool.queue[address] { - // If the expected nonce does not match up with the next one - // (i.e. a nonce gap), we stop the loop - if enonce != tx.Nonce() { + // Find the next consecutive nonce range starting at the + // current account nonce. + sort.Sort(addq) + for _, e := range addq { + if e.AccountNonce > guessedNonce { break } - enonce++ - - pool.addTx(tx) + delete(txs, e.hash) + pool.addTx(e.hash, address, e.Transaction) } - // delete the entire queue entry if it's empty. There's no need to keep it - if len(pool.queue[address]) == 0 { + // Delete the entire queue entry if it became empty. + if len(txs) == 0 { delete(pool.queue, address) } } @@ -334,36 +352,41 @@ func (pool *TxPool) checkQueue() { func (pool *TxPool) removeTx(hash common.Hash) { // delete from pending pool - delete(pool.txs, hash) - + delete(pool.pending, hash) // delete from queue -out: for address, txs := range pool.queue { - for i, tx := range txs { - if tx.Hash() == hash { - if len(txs) == 1 { - // if only one tx, remove entire address entry - delete(pool.queue, address) - } else { - pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...) - } - break out + if _, ok := txs[hash]; ok { + if len(txs) == 1 { + // if only one tx, remove entire address entry. + delete(pool.queue, address) + } else { + delete(txs, hash) } + break } } } +// validatePool removes invalid and processed transactions from the main pool. func (pool *TxPool) validatePool() { - pool.mu.Lock() - defer pool.mu.Unlock() - - for hash, tx := range pool.txs { - if err := pool.ValidateTransaction(tx); err != nil { - if glog.V(logger.Info) { + for hash, tx := range pool.pending { + if err := pool.validateTx(tx); err != nil { + if glog.V(logger.Core) { glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err) } - - pool.removeTx(hash) + delete(pool.pending, hash) } } } + +type txQueue []txQueueEntry + +type txQueueEntry struct { + hash common.Hash + addr common.Address + *types.Transaction +} + +func (q txQueue) Len() int { return len(q) } +func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } +func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce } |