diff options
author | Miya Chen <miyatlchen@gmail.com> | 2017-08-18 18:58:36 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-08-18 18:58:36 +0800 |
commit | bf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch) | |
tree | a8b86720edf085a6531e7042ef33f36a993540d5 /core/tx_pool.go | |
parent | a4da8416eec6a00c358b6a612d21e7cdf859d588 (diff) | |
download | go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.bz2 go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.lz go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.xz go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip |
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 101 |
1 files changed, 66 insertions, 35 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index b0c251f92..d835c94d1 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -34,6 +34,13 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // rmTxChanSize is the size of channel listening to RemovedTransactionEvent. + rmTxChanSize = 10 +) + var ( // ErrInvalidSender is returned if the transaction contains an invalid signature. ErrInvalidSender = errors.New("invalid sender") @@ -95,7 +102,14 @@ var ( underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) -type stateFn func() (*state.StateDB, error) +// blockChain provides the state of blockchain and current gas limit to do +// some pre checks in tx pool and event subscribers. +type blockChain interface { + State() (*state.StateDB, error) + GasLimit() *big.Int + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription + SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription +} // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { @@ -160,12 +174,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig - currentState stateFn // The state function which will allow us to do some pre checks + blockChain blockChain pendingState *state.ManagedState - gasLimit func() *big.Int // The current gas limit function callback gasPrice *big.Int - eventMux *event.TypeMux - events *event.TypeMuxSubscription + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + rmTxCh chan RemovedTransactionEvent + rmTxSub event.Subscription signer types.Signer mu sync.RWMutex @@ -185,7 +202,7 @@ type TxPool struct { // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -193,17 +210,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e pool := &TxPool{ config: config, chainconfig: chainconfig, + blockChain: blockChain, signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: make(map[common.Hash]*types.Transaction), - eventMux: eventMux, - currentState: currentStateFn, - gasLimit: gasLimitFn, + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) @@ -220,6 +236,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e log.Warn("Failed to rotate transaction journal", "err", err) } } + // Subscribe events from blockchain + pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) + pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh) // Start the event loop and return pool.wg.Add(1) go pool.loop() @@ -248,25 +267,27 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { - // Handle any events fired by the system - case ev, ok := <-pool.events.Chan(): - if !ok { - return - } - switch ev := ev.Data.(type) { - case ChainHeadEvent: - pool.mu.Lock() - if ev.Block != nil { - if pool.chainconfig.IsHomestead(ev.Block.Number()) { - pool.homestead = true - } + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + pool.mu.Lock() + if ev.Block != nil { + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true } - pool.reset() - pool.mu.Unlock() - case RemovedTransactionEvent: - pool.addTxs(ev.Txs, false) } + pool.reset() + pool.mu.Unlock() + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return + + // Handle RemovedTransactionEvent + case ev := <-pool.rmTxCh: + pool.addTxs(ev.Txs, false) + // Be unsubscribed due to system stopped + case <-pool.rmTxSub.Err(): + return // Handle stats reporting ticks case <-report.C: @@ -322,7 +343,7 @@ func (pool *TxPool) lockedReset() { // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. func (pool *TxPool) reset() { - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { log.Error("Failed reset txpool state", "err", err) return @@ -347,7 +368,11 @@ func (pool *TxPool) reset() { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { - pool.events.Unsubscribe() + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() + pool.rmTxSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { @@ -356,6 +381,12 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() @@ -468,7 +499,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { + if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { return ErrGasLimit } // Make sure the transaction is signed properly @@ -482,7 +513,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { return err } @@ -647,7 +678,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - go pool.eventMux.Post(TxPreEvent{tx}) + go pool.txFeed.Send(TxPreEvent{tx}) } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -690,7 +721,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // If we added a new transaction, run promotion checks and return if !replace { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -717,7 +748,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { } // Only reprocess the internal state if something was actually added if len(dirty) > 0 { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -804,7 +835,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Gather all the accounts potentially needing updates if accounts == nil { @@ -973,7 +1004,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { |