diff options
author | Miya Chen <miyatlchen@gmail.com> | 2017-08-18 18:58:36 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-08-18 18:58:36 +0800 |
commit | bf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch) | |
tree | a8b86720edf085a6531e7042ef33f36a993540d5 /light/txpool.go | |
parent | a4da8416eec6a00c358b6a612d21e7cdf859d588 (diff) | |
download | dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.bz2 dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.lz dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.xz dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip |
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'light/txpool.go')
-rw-r--r-- | light/txpool.go | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/light/txpool.go b/light/txpool.go index 7cbb991e8..bd215b992 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -33,6 +33,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + // txPermanent is the number of mined blocks after a mined transaction is // considered permanent and no rollback is expected var txPermanent = uint64(500) @@ -43,21 +48,23 @@ var txPermanent = uint64(500) // always receive all locally signed transactions in the same order as they are // created. type TxPool struct { - config *params.ChainConfig - signer types.Signer - quit chan bool - eventMux *event.TypeMux - events *event.TypeMuxSubscription - mu sync.RWMutex - chain *LightChain - odr OdrBackend - chainDb ethdb.Database - relay TxRelayBackend - head common.Hash - nonce map[common.Address]uint64 // "pending" nonce - pending map[common.Hash]*types.Transaction // pending transactions by tx hash - mined map[common.Hash][]*types.Transaction // mined transactions by block hash - clearIdx uint64 // earliest block nr that can contain mined tx info + config *params.ChainConfig + signer types.Signer + quit chan bool + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + mu sync.RWMutex + chain *LightChain + odr OdrBackend + chainDb ethdb.Database + relay TxRelayBackend + head common.Hash + nonce map[common.Address]uint64 // "pending" nonce + pending map[common.Hash]*types.Transaction // pending transactions by tx hash + mined map[common.Hash][]*types.Transaction // mined transactions by block hash + clearIdx uint64 // earliest block nr that can contain mined tx info homestead bool } @@ -78,23 +85,24 @@ type TxRelayBackend interface { } // NewTxPool creates a new light transaction pool -func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool { +func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool { pool := &TxPool{ - config: config, - signer: types.NewEIP155Signer(config.ChainId), - nonce: make(map[common.Address]uint64), - pending: make(map[common.Hash]*types.Transaction), - mined: make(map[common.Hash][]*types.Transaction), - quit: make(chan bool), - eventMux: eventMux, - events: eventMux.Subscribe(core.ChainHeadEvent{}), - chain: chain, - relay: relay, - odr: chain.Odr(), - chainDb: chain.Odr().Database(), - head: chain.CurrentHeader().Hash(), - clearIdx: chain.CurrentHeader().Number.Uint64(), - } + config: config, + signer: types.NewEIP155Signer(config.ChainId), + nonce: make(map[common.Address]uint64), + pending: make(map[common.Hash]*types.Transaction), + mined: make(map[common.Hash][]*types.Transaction), + quit: make(chan bool), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chain: chain, + relay: relay, + odr: chain.Odr(), + chainDb: chain.Odr().Database(), + head: chain.CurrentHeader().Hash(), + clearIdx: chain.CurrentHeader().Number.Uint64(), + } + // Subscribe events from blockchain + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) go pool.eventLoop() return pool @@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3 // eventLoop processes chain head events and also notifies the tx relay backend // about the new head hash and tx state changes func (pool *TxPool) eventLoop() { - for ev := range pool.events.Chan() { - switch ev.Data.(type) { - case core.ChainHeadEvent: - pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header()) + for { + select { + case ev := <-pool.chainHeadCh: + pool.setNewHead(ev.Block.Header()) // hack in order to avoid hogging the lock; this part will // be replaced by a subsequent PR. time.Sleep(time.Millisecond) + + // System stopped + case <-pool.chainHeadSub.Err(): + return } } } @@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) { // Stop stops the light transaction pool func (pool *TxPool) Stop() { + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() close(pool.quit) - pool.events.Unsubscribe() log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // Stats returns the number of currently pending (locally created) transactions func (pool *TxPool) Stats() (pending int) { pool.mu.RLock() @@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error { // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. - go self.eventMux.Post(core.TxPreEvent{Tx: tx}) + go self.txFeed.Send(core.TxPreEvent{Tx: tx}) } // Print a log message if low enough level is set |