aboutsummaryrefslogtreecommitdiffstats
path: root/core/tx_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go101
1 files changed, 66 insertions, 35 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index b0c251f92..d835c94d1 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -34,6 +34,13 @@ import (
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
+const (
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+ // rmTxChanSize is the size of channel listening to RemovedTransactionEvent.
+ rmTxChanSize = 10
+)
+
var (
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")
@@ -95,7 +102,14 @@ var (
underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
)
-type stateFn func() (*state.StateDB, error)
+// blockChain provides the state of blockchain and current gas limit to do
+// some pre checks in tx pool and event subscribers.
+type blockChain interface {
+ State() (*state.StateDB, error)
+ GasLimit() *big.Int
+ SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+ SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription
+}
// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
@@ -160,12 +174,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
- currentState stateFn // The state function which will allow us to do some pre checks
+ blockChain blockChain
pendingState *state.ManagedState
- gasLimit func() *big.Int // The current gas limit function callback
gasPrice *big.Int
- eventMux *event.TypeMux
- events *event.TypeMuxSubscription
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ chainHeadCh chan ChainHeadEvent
+ chainHeadSub event.Subscription
+ rmTxCh chan RemovedTransactionEvent
+ rmTxSub event.Subscription
signer types.Signer
mu sync.RWMutex
@@ -185,7 +202,7 @@ type TxPool struct {
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// trnsactions from the network.
-func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
@@ -193,17 +210,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e
pool := &TxPool{
config: config,
chainconfig: chainconfig,
+ blockChain: blockChain,
signer: types.NewEIP155Signer(chainconfig.ChainId),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: make(map[common.Hash]*types.Transaction),
- eventMux: eventMux,
- currentState: currentStateFn,
- gasLimit: gasLimitFn,
+ chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+ rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
pendingState: nil,
- events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}),
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all)
@@ -220,6 +236,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
+ // Subscribe events from blockchain
+ pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh)
+ pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
@@ -248,25 +267,27 @@ func (pool *TxPool) loop() {
// Keep waiting for and reacting to the various events
for {
select {
- // Handle any events fired by the system
- case ev, ok := <-pool.events.Chan():
- if !ok {
- return
- }
- switch ev := ev.Data.(type) {
- case ChainHeadEvent:
- pool.mu.Lock()
- if ev.Block != nil {
- if pool.chainconfig.IsHomestead(ev.Block.Number()) {
- pool.homestead = true
- }
+ // Handle ChainHeadEvent
+ case ev := <-pool.chainHeadCh:
+ pool.mu.Lock()
+ if ev.Block != nil {
+ if pool.chainconfig.IsHomestead(ev.Block.Number()) {
+ pool.homestead = true
}
- pool.reset()
- pool.mu.Unlock()
- case RemovedTransactionEvent:
- pool.addTxs(ev.Txs, false)
}
+ pool.reset()
+ pool.mu.Unlock()
+ // Be unsubscribed due to system stopped
+ case <-pool.chainHeadSub.Err():
+ return
+
+ // Handle RemovedTransactionEvent
+ case ev := <-pool.rmTxCh:
+ pool.addTxs(ev.Txs, false)
+ // Be unsubscribed due to system stopped
+ case <-pool.rmTxSub.Err():
+ return
// Handle stats reporting ticks
case <-report.C:
@@ -322,7 +343,7 @@ func (pool *TxPool) lockedReset() {
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset() {
- currentState, err := pool.currentState()
+ currentState, err := pool.blockChain.State()
if err != nil {
log.Error("Failed reset txpool state", "err", err)
return
@@ -347,7 +368,11 @@ func (pool *TxPool) reset() {
// Stop terminates the transaction pool.
func (pool *TxPool) Stop() {
- pool.events.Unsubscribe()
+ // Unsubscribe all subscriptions registered from txpool
+ pool.scope.Close()
+ // Unsubscribe subscriptions registered from blockchain
+ pool.chainHeadSub.Unsubscribe()
+ pool.rmTxSub.Unsubscribe()
pool.wg.Wait()
if pool.journal != nil {
@@ -356,6 +381,12 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}
+// SubscribeTxPreEvent registers a subscription of TxPreEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
+ return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
@@ -468,7 +499,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
- if pool.gasLimit().Cmp(tx.Gas()) < 0 {
+ if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 {
return ErrGasLimit
}
// Make sure the transaction is signed properly
@@ -482,7 +513,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
- currentState, err := pool.currentState()
+ currentState, err := pool.blockChain.State()
if err != nil {
return err
}
@@ -647,7 +678,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
- go pool.eventMux.Post(TxPreEvent{tx})
+ go pool.txFeed.Send(TxPreEvent{tx})
}
// AddLocal enqueues a single transaction into the pool if it is valid, marking
@@ -690,7 +721,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
}
// If we added a new transaction, run promotion checks and return
if !replace {
- state, err := pool.currentState()
+ state, err := pool.blockChain.State()
if err != nil {
return err
}
@@ -717,7 +748,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
}
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
- state, err := pool.currentState()
+ state, err := pool.blockChain.State()
if err != nil {
return err
}
@@ -804,7 +835,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
- gaslimit := pool.gasLimit()
+ gaslimit := pool.blockChain.GasLimit()
// Gather all the accounts potentially needing updates
if accounts == nil {
@@ -973,7 +1004,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
- gaslimit := pool.gasLimit()
+ gaslimit := pool.blockChain.GasLimit()
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {