From bf1e2631281e1e439533f2abcf1e99a7b2f9552a Mon Sep 17 00:00:00 2001 From: Miya Chen Date: Fri, 18 Aug 2017 18:58:36 +0800 Subject: core, light: send chain events using event.Feed (#14865) --- core/blockchain.go | 92 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 23 deletions(-) (limited to 'core/blockchain.go') diff --git a/core/blockchain.go b/core/blockchain.go index 3fb8be15f..f3ca4e08c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -79,10 +79,16 @@ const ( type BlockChain struct { config *params.ChainConfig // chain & network configuration - hc *HeaderChain - chainDb ethdb.Database - eventMux *event.TypeMux - genesisBlock *types.Block + hc *HeaderChain + chainDb ethdb.Database + rmTxFeed event.Feed + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -115,7 +121,7 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) { +func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co config: config, chainDb: chainDb, stateCache: state.NewDatabase(chainDb), - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } + // Unsubscribe all subscriptions registered from blockchain + bc.scope.Close() close(bc.quit) atomic.StoreInt32(&bc.procInterrupt, 1) @@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) + // We need some control over the mining operation. Acquiring locks and waiting + // for the miner to create new block takes too long and in most cases isn't + // even necessary. + if bc.LastBlockHash() == block.Hash() { + events = append(events, ChainHeadEvent{block}) + } // Write the positional metadata for transaction and receipt lookups if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { @@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { stats.usedGas += usedGas.Uint64() stats.report(chain, i) } - go bc.postChainEvents(events, coalescedLogs) + go bc.PostChainEvents(events, coalescedLogs) return 0, nil } @@ -1184,16 +1197,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Must be posted in a goroutine because of the transaction pool trying // to acquire the chain manager lock if len(diff) > 0 { - go bc.eventMux.Post(RemovedTransactionEvent{diff}) + go bc.rmTxFeed.Send(RemovedTransactionEvent{diff}) } if len(deletedLogs) > 0 { - go bc.eventMux.Post(RemovedLogsEvent{deletedLogs}) + go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } if len(oldChain) > 0 { go func() { for _, block := range oldChain { - bc.eventMux.Post(ChainSideEvent{Block: block}) + bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } }() } @@ -1201,22 +1214,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -// postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. -func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) { +// PostChainEvents iterates over the events generated by a chain insertion and +// posts them into the event feed. +// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. +func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { // post event logs for further processing - bc.eventMux.Post(logs) + if logs != nil { + bc.logsFeed.Send(logs) + } for _, event := range events { - if event, ok := event.(ChainEvent); ok { - // We need some control over the mining operation. Acquiring locks and waiting - // for the miner to create new block takes too long and in most cases isn't - // even necessary. - if bc.LastBlockHash() == event.Hash { - bc.eventMux.Post(ChainHeadEvent{event.Block}) - } + switch ev := event.(type) { + case ChainEvent: + bc.chainFeed.Send(ev) + + case ChainHeadEvent: + bc.chainHeadFeed.Send(ev) + + case ChainSideEvent: + bc.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - bc.eventMux.Post(event) } } @@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } + +// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent. +func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.scope.Track(bc.rmTxFeed.Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent. +func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription { + return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) +} + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription { + return bc.scope.Track(bc.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { + return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent registers a subscription of []*types.Log. +func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return bc.scope.Track(bc.logsFeed.Subscribe(ch)) +} -- cgit v1.2.3