diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-10-16 01:44:30 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-10-16 01:44:30 +0800 |
commit | cefe5c80b1cdcab606a169c0be65d9d2ba9bc941 (patch) | |
tree | dbbb116fa71ee396fdeb743a725b14007b43e845 /core | |
parent | 2f1f2e4811a6f3094f99b55f6553fe27d83f9aad (diff) | |
parent | 402fd6e8c6a2e379351e0aae10a833fae6bcae6c (diff) | |
download | go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.gz go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.bz2 go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.lz go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.xz go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.zst go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.zip |
Merge pull request #1898 from karalabe/eventmux-post-race
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'core')
-rw-r--r-- | core/blockchain.go | 65 | ||||
-rw-r--r-- | core/transaction_pool.go | 2 |
2 files changed, 28 insertions, 39 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index ad545cf69..6c555e9ee 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -483,13 +483,6 @@ func (bc *BlockChain) Stop() { glog.V(logger.Info).Infoln("Chain manager stopped") } -type queueEvent struct { - queue []interface{} - canonicalCount int - sideCount int - splitCount int -} - func (self *BlockChain) procFutureBlocks() { blocks := make([]*types.Block, self.futureBlocks.Len()) for i, hash := range self.futureBlocks.Keys() { @@ -573,10 +566,9 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { // faster than direct delivery and requires much less mutex // acquiring. var ( - queue = make([]interface{}, len(chain)) - queueEvent = queueEvent{queue: queue} - stats struct{ queued, processed, ignored int } - tstart = time.Now() + stats struct{ queued, processed, ignored int } + events = make([]interface{}, 0, len(chain)) + tstart = time.Now() nonceChecked = make([]bool, len(chain)) ) @@ -659,22 +651,21 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { if glog.V(logger.Debug) { glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } - queue[i] = ChainEvent{block, block.Hash(), logs} - queueEvent.canonicalCount++ + events = append(events, ChainEvent{block, block.Hash(), logs}) // This puts transactions in a extra db for rpc PutTransactions(self.chainDb, block, block.Transactions()) // store the receipts PutReceipts(self.chainDb, receipts) + case SideStatTy: if glog.V(logger.Detail) { glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } - queue[i] = ChainSideEvent{block, logs} - queueEvent.sideCount++ + events = append(events, ChainSideEvent{block, logs}) + case SplitStatTy: - queue[i] = ChainSplitEvent{block, logs} - queueEvent.splitCount++ + events = append(events, ChainSplitEvent{block, logs}) } stats.processed++ } @@ -684,8 +675,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { start, end := chain[0], chain[len(chain)-1] glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4]) } - - go self.eventMux.Post(queueEvent) + go self.postChainEvents(events) return 0, nil } @@ -774,32 +764,31 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } +// postChainEvents iterates over the events generated by a chain insertion and +// posts them into the event mux. +func (self *BlockChain) postChainEvents(events []interface{}) { + for _, event := range events { + if event, ok := event.(ChainEvent); ok { + // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long + // and in most cases isn't even necessary. + if self.currentBlock.Hash() == event.Hash { + self.currentGasLimit = CalcGasLimit(event.Block) + self.eventMux.Post(ChainHeadEvent{event.Block}) + } + } + // Fire the insertion events individually too + self.eventMux.Post(event) + } +} + func (self *BlockChain) update() { - events := self.eventMux.Subscribe(queueEvent{}) futureTimer := time.Tick(5 * time.Second) -out: for { select { - case ev := <-events.Chan(): - switch ev := ev.(type) { - case queueEvent: - for _, event := range ev.queue { - switch event := event.(type) { - case ChainEvent: - // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long - // and in most cases isn't even necessary. - if self.currentBlock.Hash() == event.Hash { - self.currentGasLimit = CalcGasLimit(event.Block) - self.eventMux.Post(ChainHeadEvent{event.Block}) - } - } - self.eventMux.Post(event) - } - } case <-futureTimer: self.procFutureBlocks() case <-self.quit: - break out + return } } } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 11d0cb490..a4e6ce3e2 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -93,7 +93,7 @@ func (pool *TxPool) eventLoop() { // we need to know the new state. The new state will help us determine // the nonces in the managed state for ev := range pool.events.Chan() { - switch ev := ev.(type) { + switch ev := ev.Data.(type) { case ChainHeadEvent: pool.mu.Lock() pool.resetState() |