aboutsummaryrefslogtreecommitdiffstats
path: root/core/transaction_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/transaction_pool.go')
-rw-r--r--core/transaction_pool.go95
1 files changed, 60 insertions, 35 deletions
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index a2f970195..e31f5c6b3 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -19,6 +19,7 @@ var (
// Transaction Pool Errors
ErrInvalidSender = errors.New("Invalid sender")
ErrNonce = errors.New("Nonce too low")
+ ErrCheap = errors.New("Gas price too low for acceptance")
ErrBalance = errors.New("Insufficient balance")
ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
@@ -27,6 +28,10 @@ var (
ErrNegativeValue = errors.New("Negative value")
)
+const (
+ maxQueued = 200 // max limit of queued txs per address
+)
+
type stateFn func() *state.StateDB
// TxPool contains all currently known transactions. Transactions
@@ -41,6 +46,7 @@ type TxPool struct {
currentState stateFn // The state function which will allow us to do some pre checkes
pendingState *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
+ minGasPrice *big.Int
eventMux *event.TypeMux
events event.Subscription
@@ -50,26 +56,35 @@ type TxPool struct {
}
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
- return &TxPool{
+ pool := &TxPool{
pending: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
quit: make(chan bool),
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
+ minGasPrice: new(big.Int),
pendingState: state.ManageState(currentStateFn()),
+ events: eventMux.Subscribe(ChainEvent{}, GasPriceChanged{}),
}
+ go pool.eventLoop()
+
+ return pool
}
-func (pool *TxPool) Start() {
+func (pool *TxPool) eventLoop() {
// Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
- pool.events = pool.eventMux.Subscribe(ChainEvent{})
- for _ = range pool.events.Chan() {
+ for ev := range pool.events.Chan() {
pool.mu.Lock()
- pool.resetState()
+ switch ev := ev.(type) {
+ case ChainEvent:
+ pool.resetState()
+ case GasPriceChanged:
+ pool.minGasPrice = ev.Price
+ }
pool.mu.Unlock()
}
@@ -100,7 +115,6 @@ func (pool *TxPool) resetState() {
}
func (pool *TxPool) Stop() {
- pool.pending = make(map[common.Hash]*types.Transaction)
close(pool.quit)
pool.events.Unsubscribe()
glog.V(logger.Info).Infoln("TX Pool stopped")
@@ -122,6 +136,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
err error
)
+ // Drop transactions under our own minimal accepted gas price
+ if pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
+ return ErrCheap
+ }
+
// Validate the transaction sender and it's sig. Throw
// if the from fields is invalid.
if from, err = tx.From(); err != nil {
@@ -169,15 +188,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
return nil
}
+// validate and queue transactions.
func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash()
- /* XXX I'm unsure about this. This is extremely dangerous and may result
- in total black listing of certain transactions
- if self.invalidHashes.Has(hash) {
- return fmt.Errorf("Invalid transaction (%x)", hash[:4])
- }
- */
if self.pending[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4])
}
@@ -207,6 +221,30 @@ func (self *TxPool) add(tx *types.Transaction) error {
return nil
}
+// queueTx will queue an unknown transaction
+func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
+ from, _ := tx.From() // already validated
+ if self.queue[from] == nil {
+ self.queue[from] = make(map[common.Hash]*types.Transaction)
+ }
+ self.queue[from][hash] = tx
+}
+
+// addTx will add a transaction to the pending (processable queue) list of transactions
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
+ if _, ok := pool.pending[hash]; !ok {
+ pool.pending[hash] = tx
+
+ // Increment the nonce on the pending state. This can only happen if
+ // the nonce is +1 to the previous one.
+ pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
+ // Notify the subscribers. This event is posted in a goroutine
+ // because it's possible that somewhere during the post "Remove transaction"
+ // gets called which will then wait for the global tx pool lock and deadlock.
+ go pool.eventMux.Post(TxPreEvent{tx})
+ }
+}
+
// Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock()
@@ -290,28 +328,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
- from, _ := tx.From() // already validated
- if self.queue[from] == nil {
- self.queue[from] = make(map[common.Hash]*types.Transaction)
- }
- self.queue[from][hash] = tx
-}
-
-func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
- if _, ok := pool.pending[hash]; !ok {
- pool.pending[hash] = tx
-
- // Increment the nonce on the pending state. This can only happen if
- // the nonce is +1 to the previous one.
- pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
- // Notify the subscribers. This event is posted in a goroutine
- // because it's possible that somewhere during the post "Remove transaction"
- // gets called which will then wait for the global tx pool lock and deadlock.
- go pool.eventMux.Post(TxPreEvent{tx})
- }
-}
-
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
state := pool.pendingState
@@ -336,7 +352,16 @@ func (pool *TxPool) checkQueue() {
// Find the next consecutive nonce range starting at the
// current account nonce.
sort.Sort(addq)
- for _, e := range addq {
+ for i, e := range addq {
+ // start deleting the transactions from the queue if they exceed the limit
+ if i > maxQueued {
+ if glog.V(logger.Debug) {
+ glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
+ }
+ delete(pool.queue[address], e.hash)
+ continue
+ }
+
if e.AccountNonce > guessedNonce {
break
}