aboutsummaryrefslogtreecommitdiffstats
path: root/light/txpool.go
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /light/txpool.go
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloaddexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.bz2
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.lz
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.xz
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'light/txpool.go')
-rw-r--r--light/txpool.go95
1 files changed, 58 insertions, 37 deletions
diff --git a/light/txpool.go b/light/txpool.go
index 7cbb991e8..bd215b992 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -33,6 +33,11 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
+const (
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+)
+
// txPermanent is the number of mined blocks after a mined transaction is
// considered permanent and no rollback is expected
var txPermanent = uint64(500)
@@ -43,21 +48,23 @@ var txPermanent = uint64(500)
// always receive all locally signed transactions in the same order as they are
// created.
type TxPool struct {
- config *params.ChainConfig
- signer types.Signer
- quit chan bool
- eventMux *event.TypeMux
- events *event.TypeMuxSubscription
- mu sync.RWMutex
- chain *LightChain
- odr OdrBackend
- chainDb ethdb.Database
- relay TxRelayBackend
- head common.Hash
- nonce map[common.Address]uint64 // "pending" nonce
- pending map[common.Hash]*types.Transaction // pending transactions by tx hash
- mined map[common.Hash][]*types.Transaction // mined transactions by block hash
- clearIdx uint64 // earliest block nr that can contain mined tx info
+ config *params.ChainConfig
+ signer types.Signer
+ quit chan bool
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ chainHeadCh chan core.ChainHeadEvent
+ chainHeadSub event.Subscription
+ mu sync.RWMutex
+ chain *LightChain
+ odr OdrBackend
+ chainDb ethdb.Database
+ relay TxRelayBackend
+ head common.Hash
+ nonce map[common.Address]uint64 // "pending" nonce
+ pending map[common.Hash]*types.Transaction // pending transactions by tx hash
+ mined map[common.Hash][]*types.Transaction // mined transactions by block hash
+ clearIdx uint64 // earliest block nr that can contain mined tx info
homestead bool
}
@@ -78,23 +85,24 @@ type TxRelayBackend interface {
}
// NewTxPool creates a new light transaction pool
-func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool {
+func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool {
pool := &TxPool{
- config: config,
- signer: types.NewEIP155Signer(config.ChainId),
- nonce: make(map[common.Address]uint64),
- pending: make(map[common.Hash]*types.Transaction),
- mined: make(map[common.Hash][]*types.Transaction),
- quit: make(chan bool),
- eventMux: eventMux,
- events: eventMux.Subscribe(core.ChainHeadEvent{}),
- chain: chain,
- relay: relay,
- odr: chain.Odr(),
- chainDb: chain.Odr().Database(),
- head: chain.CurrentHeader().Hash(),
- clearIdx: chain.CurrentHeader().Number.Uint64(),
- }
+ config: config,
+ signer: types.NewEIP155Signer(config.ChainId),
+ nonce: make(map[common.Address]uint64),
+ pending: make(map[common.Hash]*types.Transaction),
+ mined: make(map[common.Hash][]*types.Transaction),
+ quit: make(chan bool),
+ chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
+ chain: chain,
+ relay: relay,
+ odr: chain.Odr(),
+ chainDb: chain.Odr().Database(),
+ head: chain.CurrentHeader().Hash(),
+ clearIdx: chain.CurrentHeader().Number.Uint64(),
+ }
+ // Subscribe events from blockchain
+ pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
go pool.eventLoop()
return pool
@@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3
// eventLoop processes chain head events and also notifies the tx relay backend
// about the new head hash and tx state changes
func (pool *TxPool) eventLoop() {
- for ev := range pool.events.Chan() {
- switch ev.Data.(type) {
- case core.ChainHeadEvent:
- pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header())
+ for {
+ select {
+ case ev := <-pool.chainHeadCh:
+ pool.setNewHead(ev.Block.Header())
// hack in order to avoid hogging the lock; this part will
// be replaced by a subsequent PR.
time.Sleep(time.Millisecond)
+
+ // System stopped
+ case <-pool.chainHeadSub.Err():
+ return
}
}
}
@@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) {
// Stop stops the light transaction pool
func (pool *TxPool) Stop() {
+ // Unsubscribe all subscriptions registered from txpool
+ pool.scope.Close()
+ // Unsubscribe subscriptions registered from blockchain
+ pool.chainHeadSub.Unsubscribe()
close(pool.quit)
- pool.events.Unsubscribe()
log.Info("Transaction pool stopped")
}
+// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
// Stats returns the number of currently pending (locally created) transactions
func (pool *TxPool) Stats() (pending int) {
pool.mu.RLock()
@@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
- go self.eventMux.Post(core.TxPreEvent{Tx: tx})
+ go self.txFeed.Send(core.TxPreEvent{Tx: tx})
}
// Print a log message if low enough level is set