aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-10-16 01:44:30 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-10-16 01:44:30 +0800
commitcefe5c80b1cdcab606a169c0be65d9d2ba9bc941 (patch)
treedbbb116fa71ee396fdeb743a725b14007b43e845 /core
parent2f1f2e4811a6f3094f99b55f6553fe27d83f9aad (diff)
parent402fd6e8c6a2e379351e0aae10a833fae6bcae6c (diff)
downloaddexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar
dexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.gz
dexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.bz2
dexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.lz
dexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.xz
dexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.zst
dexon-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.zip
Merge pull request #1898 from karalabe/eventmux-post-race
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'core')
-rw-r--r--core/blockchain.go65
-rw-r--r--core/transaction_pool.go2
2 files changed, 28 insertions, 39 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index ad545cf69..6c555e9ee 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -483,13 +483,6 @@ func (bc *BlockChain) Stop() {
glog.V(logger.Info).Infoln("Chain manager stopped")
}
-type queueEvent struct {
- queue []interface{}
- canonicalCount int
- sideCount int
- splitCount int
-}
-
func (self *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, self.futureBlocks.Len())
for i, hash := range self.futureBlocks.Keys() {
@@ -573,10 +566,9 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// faster than direct delivery and requires much less mutex
// acquiring.
var (
- queue = make([]interface{}, len(chain))
- queueEvent = queueEvent{queue: queue}
- stats struct{ queued, processed, ignored int }
- tstart = time.Now()
+ stats struct{ queued, processed, ignored int }
+ events = make([]interface{}, 0, len(chain))
+ tstart = time.Now()
nonceChecked = make([]bool, len(chain))
)
@@ -659,22 +651,21 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if glog.V(logger.Debug) {
glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
- queue[i] = ChainEvent{block, block.Hash(), logs}
- queueEvent.canonicalCount++
+ events = append(events, ChainEvent{block, block.Hash(), logs})
// This puts transactions in a extra db for rpc
PutTransactions(self.chainDb, block, block.Transactions())
// store the receipts
PutReceipts(self.chainDb, receipts)
+
case SideStatTy:
if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
- queue[i] = ChainSideEvent{block, logs}
- queueEvent.sideCount++
+ events = append(events, ChainSideEvent{block, logs})
+
case SplitStatTy:
- queue[i] = ChainSplitEvent{block, logs}
- queueEvent.splitCount++
+ events = append(events, ChainSplitEvent{block, logs})
}
stats.processed++
}
@@ -684,8 +675,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
start, end := chain[0], chain[len(chain)-1]
glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
}
-
- go self.eventMux.Post(queueEvent)
+ go self.postChainEvents(events)
return 0, nil
}
@@ -774,32 +764,31 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return nil
}
+// postChainEvents iterates over the events generated by a chain insertion and
+// posts them into the event mux.
+func (self *BlockChain) postChainEvents(events []interface{}) {
+ for _, event := range events {
+ if event, ok := event.(ChainEvent); ok {
+ // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
+ // and in most cases isn't even necessary.
+ if self.currentBlock.Hash() == event.Hash {
+ self.currentGasLimit = CalcGasLimit(event.Block)
+ self.eventMux.Post(ChainHeadEvent{event.Block})
+ }
+ }
+ // Fire the insertion events individually too
+ self.eventMux.Post(event)
+ }
+}
+
func (self *BlockChain) update() {
- events := self.eventMux.Subscribe(queueEvent{})
futureTimer := time.Tick(5 * time.Second)
-out:
for {
select {
- case ev := <-events.Chan():
- switch ev := ev.(type) {
- case queueEvent:
- for _, event := range ev.queue {
- switch event := event.(type) {
- case ChainEvent:
- // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
- // and in most cases isn't even necessary.
- if self.currentBlock.Hash() == event.Hash {
- self.currentGasLimit = CalcGasLimit(event.Block)
- self.eventMux.Post(ChainHeadEvent{event.Block})
- }
- }
- self.eventMux.Post(event)
- }
- }
case <-futureTimer:
self.procFutureBlocks()
case <-self.quit:
- break out
+ return
}
}
}
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 11d0cb490..a4e6ce3e2 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -93,7 +93,7 @@ func (pool *TxPool) eventLoop() {
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
for ev := range pool.events.Chan() {
- switch ev := ev.(type) {
+ switch ev := ev.Data.(type) {
case ChainHeadEvent:
pool.mu.Lock()
pool.resetState()