diff options
author | obscuren <geffobscura@gmail.com> | 2015-06-12 19:36:38 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-06-12 19:41:34 +0800 |
commit | 90c4493a105ef33c1d10735489dce5a42c30b282 (patch) | |
tree | 83f5dbea8df1584afabb366ca217c26ba088d2a0 | |
parent | e2c2d8e15ebef85c77f7486f92c6430ca6f30785 (diff) | |
download | dexon-90c4493a105ef33c1d10735489dce5a42c30b282.tar dexon-90c4493a105ef33c1d10735489dce5a42c30b282.tar.gz dexon-90c4493a105ef33c1d10735489dce5a42c30b282.tar.bz2 dexon-90c4493a105ef33c1d10735489dce5a42c30b282.tar.lz dexon-90c4493a105ef33c1d10735489dce5a42c30b282.tar.xz dexon-90c4493a105ef33c1d10735489dce5a42c30b282.tar.zst dexon-90c4493a105ef33c1d10735489dce5a42c30b282.zip |
eth, core: interupt the chain processing on stop
Added an additional channel which is used to interupt the chain manager
when it's processing blocks.
-rw-r--r-- | core/chain_manager.go | 198 | ||||
-rw-r--r-- | eth/backend.go | 2 |
2 files changed, 105 insertions, 95 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index be64b54f4..8629fb4ce 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -100,8 +100,9 @@ type ChainManager struct { cache *BlockCache futureBlocks *BlockCache - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + procInterupt chan struct{} // interupt signaler for block processing + wg sync.WaitGroup pow pow.PoW } @@ -113,6 +114,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow genesisBlock: GenesisBlock(42, stateDb), eventMux: mux, quit: make(chan struct{}), + procInterupt: make(chan struct{}), cache: NewBlockCache(blockCacheLimit), pow: pow, } @@ -516,6 +518,7 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) { func (bc *ChainManager) Stop() { close(bc.quit) + close(bc.procInterupt) bc.wg.Wait() @@ -568,119 +571,126 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { defer close(nonceQuit) txcount := 0 +done: for i, block := range chain { - bstart := time.Now() - // Wait for block i's nonce to be verified before processing - // its state transition. - for !nonceChecked[i] { - r := <-nonceDone - nonceChecked[r.i] = true - if !r.valid { - block := chain[r.i] - return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()} + select { + case <-self.procInterupt: + glog.V(logger.Debug).Infoln("Premature abort during chain processing") + break done + default: + bstart := time.Now() + // Wait for block i's nonce to be verified before processing + // its state transition. + for !nonceChecked[i] { + r := <-nonceDone + nonceChecked[r.i] = true + if !r.valid { + block := chain[r.i] + return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()} + } } - } - if BadHashes[block.Hash()] { - err := fmt.Errorf("Found known bad hash in chain %x", block.Hash()) - blockErr(block, err) - return i, err - } - - // Setting block.Td regardless of error (known for example) prevents errors down the line - // in the protocol handler - block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) - - // Call in to the block processor and check for errors. It's likely that if one block fails - // all others will fail too (unless a known block is returned). - logs, err := self.processor.Process(block) - if err != nil { - if IsKnownBlockErr(err) { - stats.ignored++ - continue + if BadHashes[block.Hash()] { + err := fmt.Errorf("Found known bad hash in chain %x", block.Hash()) + blockErr(block, err) + return i, err } - if err == BlockFutureErr { - // Allow up to MaxFuture second in the future blocks. If this limit - // is exceeded the chain is discarded and processed at a later time - // if given. - if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max { - return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max) + // Setting block.Td regardless of error (known for example) prevents errors down the line + // in the protocol handler + block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) + + // Call in to the block processor and check for errors. It's likely that if one block fails + // all others will fail too (unless a known block is returned). + logs, err := self.processor.Process(block) + if err != nil { + if IsKnownBlockErr(err) { + stats.ignored++ + continue } - block.SetQueued(true) - self.futureBlocks.Push(block) - stats.queued++ - continue - } + if err == BlockFutureErr { + // Allow up to MaxFuture second in the future blocks. If this limit + // is exceeded the chain is discarded and processed at a later time + // if given. + if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max { + return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max) + } - if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) { - block.SetQueued(true) - self.futureBlocks.Push(block) - stats.queued++ - continue - } + block.SetQueued(true) + self.futureBlocks.Push(block) + stats.queued++ + continue + } - blockErr(block, err) + if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) { + block.SetQueued(true) + self.futureBlocks.Push(block) + stats.queued++ + continue + } - return i, err - } + blockErr(block, err) - txcount += len(block.Transactions()) - - cblock := self.currentBlock - // Compare the TD of the last known block in the canonical chain to make sure it's greater. - // At this point it's possible that a different chain (fork) becomes the new canonical chain. - if block.Td.Cmp(self.Td()) > 0 { - // chain fork - if block.ParentHash() != cblock.Hash() { - // during split we merge two different chains and create the new canonical chain - err := self.merge(cblock, block) - if err != nil { - return i, err + return i, err + } + + txcount += len(block.Transactions()) + + cblock := self.currentBlock + // Compare the TD of the last known block in the canonical chain to make sure it's greater. + // At this point it's possible that a different chain (fork) becomes the new canonical chain. + if block.Td.Cmp(self.Td()) > 0 { + // chain fork + if block.ParentHash() != cblock.Hash() { + // during split we merge two different chains and create the new canonical chain + err := self.merge(cblock, block) + if err != nil { + return i, err + } + + queue[i] = ChainSplitEvent{block, logs} + queueEvent.splitCount++ } - queue[i] = ChainSplitEvent{block, logs} - queueEvent.splitCount++ - } + self.mu.Lock() + self.setTotalDifficulty(block.Td) + self.insert(block) + self.mu.Unlock() - self.mu.Lock() - self.setTotalDifficulty(block.Td) - self.insert(block) - self.mu.Unlock() + jsonlogger.LogJson(&logger.EthChainNewHead{ + BlockHash: block.Hash().Hex(), + BlockNumber: block.Number(), + ChainHeadHash: cblock.Hash().Hex(), + BlockPrevHash: block.ParentHash().Hex(), + }) - jsonlogger.LogJson(&logger.EthChainNewHead{ - BlockHash: block.Hash().Hex(), - BlockNumber: block.Number(), - ChainHeadHash: cblock.Hash().Hex(), - BlockPrevHash: block.ParentHash().Hex(), - }) + self.setTransState(state.New(block.Root(), self.stateDb)) + self.txState.SetState(state.New(block.Root(), self.stateDb)) - self.setTransState(state.New(block.Root(), self.stateDb)) - self.txState.SetState(state.New(block.Root(), self.stateDb)) + queue[i] = ChainEvent{block, block.Hash(), logs} + queueEvent.canonicalCount++ - queue[i] = ChainEvent{block, block.Hash(), logs} - queueEvent.canonicalCount++ + if glog.V(logger.Debug) { + glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) + } + } else { + if glog.V(logger.Detail) { + glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) + } - if glog.V(logger.Debug) { - glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) - } - } else { - if glog.V(logger.Detail) { - glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) + queue[i] = ChainSideEvent{block, logs} + queueEvent.sideCount++ } + // Write block to database. Eventually we'll have to improve on this and throw away blocks that are + // not in the canonical chain. + self.write(block) + // Delete from future blocks + self.futureBlocks.Delete(block.Hash()) - queue[i] = ChainSideEvent{block, logs} - queueEvent.sideCount++ - } - // Write block to database. Eventually we'll have to improve on this and throw away blocks that are - // not in the canonical chain. - self.write(block) - // Delete from future blocks - self.futureBlocks.Delete(block.Hash()) - - stats.processed++ + stats.processed++ + } } if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { diff --git a/eth/backend.go b/eth/backend.go index 60e9359dc..d2ec0cc62 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -527,8 +527,8 @@ func (self *Ethereum) AddPeer(nodeURL string) error { func (s *Ethereum) Stop() { s.net.Stop() - s.protocolManager.Stop() s.chainManager.Stop() + s.protocolManager.Stop() s.txPool.Stop() s.eventMux.Stop() if s.whisper != nil { |