From bf1e2631281e1e439533f2abcf1e99a7b2f9552a Mon Sep 17 00:00:00 2001 From: Miya Chen Date: Fri, 18 Aug 2017 18:58:36 +0800 Subject: core, light: send chain events using event.Feed (#14865) --- light/lightchain.go | 57 ++++++++++++++++++++++------- light/lightchain_test.go | 7 ++-- light/odr_test.go | 6 +-- light/trie_test.go | 3 +- light/txpool.go | 95 +++++++++++++++++++++++++++++------------------- light/txpool_test.go | 8 ++-- 6 files changed, 111 insertions(+), 65 deletions(-) (limited to 'light') diff --git a/light/lightchain.go b/light/lightchain.go index a51043975..df194ecad 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -44,11 +44,14 @@ var ( // headers, downloading block bodies and receipts on demand through an ODR // interface. It only does header validation during chain insertion. type LightChain struct { - hc *core.HeaderChain - chainDb ethdb.Database - odr OdrBackend - eventMux *event.TypeMux - genesisBlock *types.Block + hc *core.HeaderChain + chainDb ethdb.Database + odr OdrBackend + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex chainmu sync.RWMutex @@ -69,7 +72,7 @@ type LightChain struct { // NewLightChain returns a fully initialised light chain using information // available in the database. It initialises the default Ethereum header // validator. -func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux) (*LightChain, error) { +func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine) (*LightChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -77,7 +80,6 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. bc := &LightChain{ chainDb: odr.Database(), odr: odr, - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -316,16 +318,18 @@ func (self *LightChain) Rollback(chain []common.Hash) { } // postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. +// posts them into the event feed. func (self *LightChain) postChainEvents(events []interface{}) { for _, event := range events { - if event, ok := event.(core.ChainEvent); ok { - if self.LastBlockHash() == event.Hash { - self.eventMux.Post(core.ChainHeadEvent{Block: event.Block}) + switch ev := event.(type) { + case core.ChainEvent: + if self.LastBlockHash() == ev.Hash { + self.chainHeadFeed.Send(core.ChainHeadEvent{Block: ev.Block}) } + self.chainFeed.Send(ev) + case core.ChainSideEvent: + self.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - self.eventMux.Post(event) } } @@ -467,3 +471,30 @@ func (self *LightChain) LockChain() { func (self *LightChain) UnlockChain() { self.chainmu.RUnlock() } + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (self *LightChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return self.scope.Track(self.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (self *LightChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return self.scope.Track(self.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (self *LightChain) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return self.scope.Track(self.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent implements the interface of filters.Backend +// LightChain does not send logs events, so return an empty subscription. +func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent implements the interface of filters.Backend +// LightChain does not send core.RemovedLogsEvent, so return an empty subscription. +func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} diff --git a/light/lightchain_test.go b/light/lightchain_test.go index 0ad640525..40a4d396a 100644 --- a/light/lightchain_test.go +++ b/light/lightchain_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -55,7 +54,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) { db, _ := ethdb.NewMemDatabase() gspec := core.Genesis{Config: params.TestChainConfig} genesis := gspec.MustCommit(db) - blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker(), new(event.TypeMux)) + blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker()) // Create and inject the requested chain if n == 0 { @@ -75,7 +74,7 @@ func newTestLightChain() *LightChain { Config: params.TestChainConfig, } gspec.MustCommit(db) - lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker(), new(event.TypeMux)) + lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker()) if err != nil { panic(err) } @@ -339,7 +338,7 @@ func TestReorgBadHeaderHashes(t *testing.T) { defer func() { delete(core.BadHashes, headers[3].Hash()) }() // Create a new LightChain and check that it rolled back the state. - ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux)) + ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker()) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } diff --git a/light/odr_test.go b/light/odr_test.go index 544b64eff..bd1e976e8 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -233,7 +232,6 @@ func testChainGen(i int, block *core.BlockGen) { func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { var ( - evmux = new(event.TypeMux) sdb, _ = ethdb.NewMemDatabase() ldb, _ = ethdb.NewMemDatabase() gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}} @@ -241,14 +239,14 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { t.Fatal(err) } odr := &testOdr{sdb: sdb, ldb: ldb} - lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux) + lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) if err != nil { t.Fatal(err) } diff --git a/light/trie_test.go b/light/trie_test.go index 9b2cf7c2b..5f45c01af 100644 --- a/light/trie_test.go +++ b/light/trie_test.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) @@ -41,7 +40,7 @@ func TestNodeIterator(t *testing.T) { genesis = gspec.MustCommit(fulldb) ) gspec.MustCommit(lightdb) - blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, fulldb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/light/txpool.go b/light/txpool.go index 7cbb991e8..bd215b992 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -33,6 +33,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + // txPermanent is the number of mined blocks after a mined transaction is // considered permanent and no rollback is expected var txPermanent = uint64(500) @@ -43,21 +48,23 @@ var txPermanent = uint64(500) // always receive all locally signed transactions in the same order as they are // created. type TxPool struct { - config *params.ChainConfig - signer types.Signer - quit chan bool - eventMux *event.TypeMux - events *event.TypeMuxSubscription - mu sync.RWMutex - chain *LightChain - odr OdrBackend - chainDb ethdb.Database - relay TxRelayBackend - head common.Hash - nonce map[common.Address]uint64 // "pending" nonce - pending map[common.Hash]*types.Transaction // pending transactions by tx hash - mined map[common.Hash][]*types.Transaction // mined transactions by block hash - clearIdx uint64 // earliest block nr that can contain mined tx info + config *params.ChainConfig + signer types.Signer + quit chan bool + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + mu sync.RWMutex + chain *LightChain + odr OdrBackend + chainDb ethdb.Database + relay TxRelayBackend + head common.Hash + nonce map[common.Address]uint64 // "pending" nonce + pending map[common.Hash]*types.Transaction // pending transactions by tx hash + mined map[common.Hash][]*types.Transaction // mined transactions by block hash + clearIdx uint64 // earliest block nr that can contain mined tx info homestead bool } @@ -78,23 +85,24 @@ type TxRelayBackend interface { } // NewTxPool creates a new light transaction pool -func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool { +func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool { pool := &TxPool{ - config: config, - signer: types.NewEIP155Signer(config.ChainId), - nonce: make(map[common.Address]uint64), - pending: make(map[common.Hash]*types.Transaction), - mined: make(map[common.Hash][]*types.Transaction), - quit: make(chan bool), - eventMux: eventMux, - events: eventMux.Subscribe(core.ChainHeadEvent{}), - chain: chain, - relay: relay, - odr: chain.Odr(), - chainDb: chain.Odr().Database(), - head: chain.CurrentHeader().Hash(), - clearIdx: chain.CurrentHeader().Number.Uint64(), - } + config: config, + signer: types.NewEIP155Signer(config.ChainId), + nonce: make(map[common.Address]uint64), + pending: make(map[common.Hash]*types.Transaction), + mined: make(map[common.Hash][]*types.Transaction), + quit: make(chan bool), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chain: chain, + relay: relay, + odr: chain.Odr(), + chainDb: chain.Odr().Database(), + head: chain.CurrentHeader().Hash(), + clearIdx: chain.CurrentHeader().Number.Uint64(), + } + // Subscribe events from blockchain + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) go pool.eventLoop() return pool @@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3 // eventLoop processes chain head events and also notifies the tx relay backend // about the new head hash and tx state changes func (pool *TxPool) eventLoop() { - for ev := range pool.events.Chan() { - switch ev.Data.(type) { - case core.ChainHeadEvent: - pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header()) + for { + select { + case ev := <-pool.chainHeadCh: + pool.setNewHead(ev.Block.Header()) // hack in order to avoid hogging the lock; this part will // be replaced by a subsequent PR. time.Sleep(time.Millisecond) + + // System stopped + case <-pool.chainHeadSub.Err(): + return } } } @@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) { // Stop stops the light transaction pool func (pool *TxPool) Stop() { + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() close(pool.quit) - pool.events.Unsubscribe() log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // Stats returns the number of currently pending (locally created) transactions func (pool *TxPool) Stats() (pending int) { pool.mu.RLock() @@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error { // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. - go self.eventMux.Post(core.TxPreEvent{Tx: tx}) + go self.txFeed.Send(core.TxPreEvent{Tx: tx}) } // Print a log message if low enough level is set diff --git a/light/txpool_test.go b/light/txpool_test.go index f23832a41..fe7936ac2 100644 --- a/light/txpool_test.go +++ b/light/txpool_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -82,7 +81,6 @@ func TestTxPool(t *testing.T) { } var ( - evmux = new(event.TypeMux) sdb, _ = ethdb.NewMemDatabase() ldb, _ = ethdb.NewMemDatabase() gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}} @@ -90,7 +88,7 @@ func TestTxPool(t *testing.T) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, poolTestBlocks, txPoolTestChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) @@ -102,9 +100,9 @@ func TestTxPool(t *testing.T) { discard: make(chan int, 1), mined: make(chan int, 1), } - lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux) + lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) txPermanent = 50 - pool := NewTxPool(params.TestChainConfig, evmux, lightchain, relay) + pool := NewTxPool(params.TestChainConfig, lightchain, relay) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() -- cgit v1.2.3