aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-06-12 22:45:53 +0800
committerobscuren <geffobscura@gmail.com>2015-06-12 22:52:54 +0800
commit645dfd96932c87e256c3edc9035843c6baf4a2e8 (patch)
treea6354a42229b30c60b760589196bcc9af6fc7788
parent90c4493a105ef33c1d10735489dce5a42c30b282 (diff)
downloadgo-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.tar
go-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.tar.gz
go-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.tar.bz2
go-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.tar.lz
go-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.tar.xz
go-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.tar.zst
go-tangerine-645dfd96932c87e256c3edc9035843c6baf4a2e8.zip
core: changed interrupt strategy
Removed chain manager's select/channel approach when checking for interrupts. Now using an atomic int32 instead which checked for every block processed.
-rw-r--r--core/chain_manager.go203
1 files changed, 101 insertions, 102 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 8629fb4ce..e56d82cce 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -8,6 +8,7 @@ import (
"os"
"runtime"
"sync"
+ "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -100,9 +101,10 @@ type ChainManager struct {
cache *BlockCache
futureBlocks *BlockCache
- quit chan struct{}
- procInterupt chan struct{} // interupt signaler for block processing
- wg sync.WaitGroup
+ quit chan struct{}
+ // procInterrupt must be atomically called
+ procInterrupt int32 // interrupt signaler for block processing
+ wg sync.WaitGroup
pow pow.PoW
}
@@ -114,7 +116,6 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
genesisBlock: GenesisBlock(42, stateDb),
eventMux: mux,
quit: make(chan struct{}),
- procInterupt: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit),
pow: pow,
}
@@ -518,7 +519,7 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
func (bc *ChainManager) Stop() {
close(bc.quit)
- close(bc.procInterupt)
+ atomic.StoreInt32(&bc.procInterrupt, 1)
bc.wg.Wait()
@@ -571,126 +572,124 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
defer close(nonceQuit)
txcount := 0
-done:
for i, block := range chain {
- select {
- case <-self.procInterupt:
+ if atomic.LoadInt32(&self.procInterrupt) == 1 {
glog.V(logger.Debug).Infoln("Premature abort during chain processing")
- break done
- default:
- bstart := time.Now()
- // Wait for block i's nonce to be verified before processing
- // its state transition.
- for !nonceChecked[i] {
- r := <-nonceDone
- nonceChecked[r.i] = true
- if !r.valid {
- block := chain[r.i]
- return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
- }
- }
+ break
+ }
- if BadHashes[block.Hash()] {
- err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
- blockErr(block, err)
- return i, err
+ bstart := time.Now()
+ // Wait for block i's nonce to be verified before processing
+ // its state transition.
+ for !nonceChecked[i] {
+ r := <-nonceDone
+ nonceChecked[r.i] = true
+ if !r.valid {
+ block := chain[r.i]
+ return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
}
+ }
- // Setting block.Td regardless of error (known for example) prevents errors down the line
- // in the protocol handler
- block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
-
- // Call in to the block processor and check for errors. It's likely that if one block fails
- // all others will fail too (unless a known block is returned).
- logs, err := self.processor.Process(block)
- if err != nil {
- if IsKnownBlockErr(err) {
- stats.ignored++
- continue
- }
-
- if err == BlockFutureErr {
- // Allow up to MaxFuture second in the future blocks. If this limit
- // is exceeded the chain is discarded and processed at a later time
- // if given.
- if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
- return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
- }
+ if BadHashes[block.Hash()] {
+ err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
+ blockErr(block, err)
+ return i, err
+ }
- block.SetQueued(true)
- self.futureBlocks.Push(block)
- stats.queued++
- continue
- }
+ // Setting block.Td regardless of error (known for example) prevents errors down the line
+ // in the protocol handler
+ block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
+
+ // Call in to the block processor and check for errors. It's likely that if one block fails
+ // all others will fail too (unless a known block is returned).
+ logs, err := self.processor.Process(block)
+ if err != nil {
+ if IsKnownBlockErr(err) {
+ stats.ignored++
+ continue
+ }
- if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
- block.SetQueued(true)
- self.futureBlocks.Push(block)
- stats.queued++
- continue
+ if err == BlockFutureErr {
+ // Allow up to MaxFuture second in the future blocks. If this limit
+ // is exceeded the chain is discarded and processed at a later time
+ // if given.
+ if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
+ return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
}
- blockErr(block, err)
+ block.SetQueued(true)
+ self.futureBlocks.Push(block)
+ stats.queued++
+ continue
+ }
- return i, err
+ if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
+ block.SetQueued(true)
+ self.futureBlocks.Push(block)
+ stats.queued++
+ continue
}
- txcount += len(block.Transactions())
-
- cblock := self.currentBlock
- // Compare the TD of the last known block in the canonical chain to make sure it's greater.
- // At this point it's possible that a different chain (fork) becomes the new canonical chain.
- if block.Td.Cmp(self.Td()) > 0 {
- // chain fork
- if block.ParentHash() != cblock.Hash() {
- // during split we merge two different chains and create the new canonical chain
- err := self.merge(cblock, block)
- if err != nil {
- return i, err
- }
+ blockErr(block, err)
+
+ return i, err
+ }
- queue[i] = ChainSplitEvent{block, logs}
- queueEvent.splitCount++
+ txcount += len(block.Transactions())
+
+ cblock := self.currentBlock
+ // Compare the TD of the last known block in the canonical chain to make sure it's greater.
+ // At this point it's possible that a different chain (fork) becomes the new canonical chain.
+ if block.Td.Cmp(self.Td()) > 0 {
+ // chain fork
+ if block.ParentHash() != cblock.Hash() {
+ // during split we merge two different chains and create the new canonical chain
+ err := self.merge(cblock, block)
+ if err != nil {
+ return i, err
}
- self.mu.Lock()
- self.setTotalDifficulty(block.Td)
- self.insert(block)
- self.mu.Unlock()
+ queue[i] = ChainSplitEvent{block, logs}
+ queueEvent.splitCount++
+ }
- jsonlogger.LogJson(&logger.EthChainNewHead{
- BlockHash: block.Hash().Hex(),
- BlockNumber: block.Number(),
- ChainHeadHash: cblock.Hash().Hex(),
- BlockPrevHash: block.ParentHash().Hex(),
- })
+ self.mu.Lock()
+ self.setTotalDifficulty(block.Td)
+ self.insert(block)
+ self.mu.Unlock()
- self.setTransState(state.New(block.Root(), self.stateDb))
- self.txState.SetState(state.New(block.Root(), self.stateDb))
+ jsonlogger.LogJson(&logger.EthChainNewHead{
+ BlockHash: block.Hash().Hex(),
+ BlockNumber: block.Number(),
+ ChainHeadHash: cblock.Hash().Hex(),
+ BlockPrevHash: block.ParentHash().Hex(),
+ })
- queue[i] = ChainEvent{block, block.Hash(), logs}
- queueEvent.canonicalCount++
+ self.setTransState(state.New(block.Root(), self.stateDb))
+ self.txState.SetState(state.New(block.Root(), self.stateDb))
- if glog.V(logger.Debug) {
- glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
- }
- } else {
- if glog.V(logger.Detail) {
- glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
- }
+ queue[i] = ChainEvent{block, block.Hash(), logs}
+ queueEvent.canonicalCount++
- queue[i] = ChainSideEvent{block, logs}
- queueEvent.sideCount++
+ if glog.V(logger.Debug) {
+ glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
+ }
+ } else {
+ if glog.V(logger.Detail) {
+ glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
- // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
- // not in the canonical chain.
- self.write(block)
- // Delete from future blocks
- self.futureBlocks.Delete(block.Hash())
-
- stats.processed++
+ queue[i] = ChainSideEvent{block, logs}
+ queueEvent.sideCount++
}
+ // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
+ // not in the canonical chain.
+ self.write(block)
+ // Delete from future blocks
+ self.futureBlocks.Delete(block.Hash())
+
+ stats.processed++
+
}
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {