From 6feecf26f7a292cf384616153c8e5bf0617a12bf Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Wed, 31 Oct 2018 18:21:55 +0800 Subject: core: tx_pool: remove transactions on BlockConfirmed event --- core/blockchain.go | 26 ++++++++++++++++-------- core/events.go | 2 ++ core/tx_pool.go | 56 ++++++++++++++++++++++++++-------------------------- core/tx_pool_test.go | 10 +++++----- 4 files changed, 53 insertions(+), 41 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 0e86f2b59..264617857 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -104,14 +104,15 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + blockConfirmedFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -1610,6 +1611,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes // add into pending blocks bc.addPendingBlock(newPendingBlock, receipts) + events = append(events, BlockConfirmedEvent{newPendingBlock}) // start insert available pending blocks into db for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ { @@ -1852,6 +1854,9 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { case ChainHeadEvent: bc.chainHeadFeed.Send(ev) + case BlockConfirmedEvent: + bc.blockConfirmedFeed.Send(ev) + case ChainSideEvent: bc.chainSideFeed.Send(ev) } @@ -2044,6 +2049,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +// SubscribeBlockConfirmedEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { + return bc.scope.Track(bc.blockConfirmedFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) diff --git a/core/events.go b/core/events.go index e76aa4784..1231daa37 100644 --- a/core/events.go +++ b/core/events.go @@ -47,6 +47,8 @@ type ChainSideEvent struct { type ChainHeadEvent struct{ Block *types.Block } +type BlockConfirmedEvent struct{ Block *types.Block } + type NewNotarySetEvent struct { Round uint64 Pubkeys map[string]struct{} // pubkeys in hex format diff --git a/core/tx_pool.go b/core/tx_pool.go index eccf82e93..622c8ce9d 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -36,8 +36,8 @@ import ( ) const ( - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. - chainHeadChanSize = 10 + // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent. + blockConfirmedChanSize = 10 ) var ( @@ -118,7 +118,7 @@ type blockChain interface { GetBlock(hash common.Hash, number uint64) *types.Block StateAt(root common.Hash) (*state.StateDB, error) - SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription + SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription } // TxPoolConfig are the configuration parameters of the transaction pool. @@ -203,16 +203,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - scope event.SubscriptionScope - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + scope event.SubscriptionScope + blockConfirmedCh chan BlockConfirmedEvent + blockConfirmedSub event.Subscription + signer types.Signer + mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces @@ -240,16 +240,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.NewEIP155Signer(chainconfig.ChainID), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainID), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -271,7 +271,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block } } // Subscribe events from blockchain - pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) + pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh) // Start the event loop and return pool.wg.Add(1) @@ -304,8 +304,8 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { - // Handle ChainHeadEvent - case ev := <-pool.chainHeadCh: + // Handle BlockConfirmedEvent + case ev := <-pool.blockConfirmedCh: if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { @@ -317,7 +317,7 @@ func (pool *TxPool) loop() { pool.mu.Unlock() } // Be unsubscribed due to system stopped - case <-pool.chainHeadSub.Err(): + case <-pool.blockConfirmedSub.Err(): return // Handle stats reporting ticks @@ -461,7 +461,7 @@ func (pool *TxPool) Stop() { pool.scope.Close() // Unsubscribe subscriptions registered from blockchain - pool.chainHeadSub.Unsubscribe() + pool.blockConfirmedSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f43ba9e58..0ef926d1d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -45,9 +45,9 @@ func init() { } type testBlockChain struct { - statedb *state.StateDB - gasLimit uint64 - chainHeadFeed *event.Feed + statedb *state.StateDB + gasLimit uint64 + blockConfirmedFeed *event.Feed } func (bc *testBlockChain) CurrentBlock() *types.Block { @@ -64,8 +64,8 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } -func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { - return bc.chainHeadFeed.Subscribe(ch) +func (bc *testBlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { + return bc.blockConfirmedFeed.Subscribe(ch) } func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) *types.Transaction { -- cgit v1.2.3