diff options
author | obscuren <geffobscura@gmail.com> | 2015-06-10 03:12:25 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-06-10 03:12:25 +0800 |
commit | bac9a94ddf20dc530966cbf6cd384aaf94aedc77 (patch) | |
tree | 0ced967e60315698cc5056a984d7678c417bc1ce /core/chain_manager.go | |
parent | 0e703d92ac9df61e2ededa8c895c70ded101a607 (diff) | |
parent | 14994fa21bf6f05554ff370d41005d06b68d20a5 (diff) | |
download | go-tangerine-0.9.28.tar go-tangerine-0.9.28.tar.gz go-tangerine-0.9.28.tar.bz2 go-tangerine-0.9.28.tar.lz go-tangerine-0.9.28.tar.xz go-tangerine-0.9.28.tar.zst go-tangerine-0.9.28.zip |
Merge branch 'release/0.9.28'v0.9.28
Diffstat (limited to 'core/chain_manager.go')
-rw-r--r-- | core/chain_manager.go | 214 |
1 files changed, 105 insertions, 109 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 3e8ef6fd8..6897c453c 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -30,8 +30,9 @@ var ( ) const ( - blockCacheLimit = 10000 - maxFutureBlocks = 256 + blockCacheLimit = 10000 + maxFutureBlocks = 256 + maxTimeFutureBlocks = 30 ) func CalcDifficulty(block, parent *types.Header) *big.Int { @@ -55,10 +56,7 @@ func CalcTD(block, parent *types.Block) *big.Int { if parent == nil { return block.Difficulty() } - - td := new(big.Int).Add(parent.Td, block.Header().Difficulty) - - return td + return new(big.Int).Add(parent.Td, block.Header().Difficulty) } func CalcGasLimit(parent *types.Block) *big.Int { @@ -108,16 +106,23 @@ type ChainManager struct { pow pow.PoW } -func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) *ChainManager { +func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) { bc := &ChainManager{ blockDb: blockDb, stateDb: stateDb, - genesisBlock: GenesisBlock(stateDb), + genesisBlock: GenesisBlock(42, stateDb), eventMux: mux, quit: make(chan struct{}), cache: NewBlockCache(blockCacheLimit), pow: pow, } + + // Check the genesis block given to the chain manager. If the genesis block mismatches block number 0 + // throw an error. If no block or the same block's found continue. + if g := bc.GetBlockByNumber(0); g != nil && g.Hash() != genesis.Hash() { + return nil, fmt.Errorf("Genesis mismatch. Maybe different nonce (%d vs %d)? %x / %x", g.Nonce(), genesis.Nonce(), g.Hash().Bytes()[:4], genesis.Hash().Bytes()[:4]) + } + bc.genesisBlock = genesis bc.setLastState() // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain @@ -143,7 +148,7 @@ func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.T go bc.update() - return bc + return bc, nil } func (bc *ChainManager) SetHead(head *types.Block) { @@ -170,11 +175,13 @@ func (self *ChainManager) Td() *big.Int { self.mu.RLock() defer self.mu.RUnlock() - return self.td + return new(big.Int).Set(self.td) } func (self *ChainManager) GasLimit() *big.Int { - // return self.currentGasLimit + self.mu.RLock() + defer self.mu.RUnlock() + return self.currentBlock.GasLimit() } @@ -196,7 +203,7 @@ func (self *ChainManager) Status() (td *big.Int, currentBlock common.Hash, genes self.mu.RLock() defer self.mu.RUnlock() - return self.td, self.currentBlock.Hash(), self.genesisBlock.Hash() + return new(big.Int).Set(self.td), self.currentBlock.Hash(), self.genesisBlock.Hash() } func (self *ChainManager) SetProcessor(proc types.BlockProcessor) { @@ -214,19 +221,6 @@ func (self *ChainManager) TransState() *state.StateDB { return self.transState } -func (self *ChainManager) TxState() *state.ManagedState { - self.tsmu.RLock() - defer self.tsmu.RUnlock() - - return self.txState -} - -func (self *ChainManager) setTxState(statedb *state.StateDB) { - self.tsmu.Lock() - defer self.tsmu.Unlock() - self.txState = state.ManageState(statedb) -} - func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } @@ -353,13 +347,24 @@ func (bc *ChainManager) ResetWithGenesisBlock(gb *types.Block) { // Export writes the active chain to the given writer. func (self *ChainManager) Export(w io.Writer) error { + if err := self.ExportN(w, uint64(0), self.currentBlock.NumberU64()); err != nil { + return err + } + return nil +} + +// ExportN writes a subset of the active chain to the given writer. +func (self *ChainManager) ExportN(w io.Writer, first uint64, last uint64) error { self.mu.RLock() defer self.mu.RUnlock() - glog.V(logger.Info).Infof("exporting %v blocks...\n", self.currentBlock.Header().Number) - last := self.currentBlock.NumberU64() + if first > last { + return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) + } + + glog.V(logger.Info).Infof("exporting %d blocks...\n", last-first+1) - for nr := uint64(1); nr <= last; nr++ { + for nr := first; nr <= last; nr++ { block := self.GetBlockByNumber(nr) if block == nil { return fmt.Errorf("export failed on #%d: not found", nr) @@ -373,11 +378,13 @@ func (self *ChainManager) Export(w io.Writer) error { return nil } +// insert injects a block into the current chain block chain. Note, this function +// assumes that the `mu` mutex is held! func (bc *ChainManager) insert(block *types.Block) { key := append(blockNumPre, block.Number().Bytes()...) bc.blockDb.Put(key, block.Hash().Bytes()) - bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) + bc.currentBlock = block bc.lastBlockHash = block.Hash() } @@ -481,9 +488,10 @@ func (self *ChainManager) GetAncestors(block *types.Block, length int) (blocks [ return } +// setTotalDifficulty updates the TD of the chain manager. Note, this function +// assumes that the `mu` mutex is held! func (bc *ChainManager) setTotalDifficulty(td *big.Int) { - //bc.blockDb.Put([]byte("LTD"), td.Bytes()) - bc.td = td + bc.td = new(big.Int).Set(td) } func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) { @@ -522,13 +530,14 @@ type queueEvent struct { } func (self *ChainManager) procFutureBlocks() { - blocks := make([]*types.Block, len(self.futureBlocks.blocks)) + var blocks []*types.Block self.futureBlocks.Each(func(i int, block *types.Block) { - blocks[i] = block + blocks = append(blocks, block) }) - - types.BlockBy(types.Number).Sort(blocks) - self.InsertChain(blocks) + if len(blocks) > 0 { + types.BlockBy(types.Number).Sort(blocks) + self.InsertChain(blocks) + } } // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned @@ -540,17 +549,35 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() - // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. + // A queued approach to delivering events. This is generally + // faster than direct delivery and requires much less mutex + // acquiring. var ( queue = make([]interface{}, len(chain)) queueEvent = queueEvent{queue: queue} stats struct{ queued, processed, ignored int } tstart = time.Now() + + nonceDone = make(chan nonceResult, len(chain)) + nonceQuit = make(chan struct{}) + nonceChecked = make([]bool, len(chain)) ) + // Start the parallel nonce verifier. + go verifyNonces(self.pow, chain, nonceQuit, nonceDone) + defer close(nonceQuit) + for i, block := range chain { - if block == nil { - continue + bstart := time.Now() + // Wait for block i's nonce to be verified before processing + // its state transition. + for !nonceChecked[i] { + r := <-nonceDone + nonceChecked[r.i] = true + if !r.valid { + block := chain[r.i] + return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()} + } } if BadHashes[block.Hash()] { @@ -559,10 +586,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } - // create a nonce channel for parallisation of the nonce check - nonceErrCh := make(chan error) - go verifyBlockNonce(self.pow, block, nonceErrCh) - // Setting block.Td regardless of error (known for example) prevents errors down the line // in the protocol handler block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) @@ -571,15 +594,19 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // all others will fail too (unless a known block is returned). logs, err := self.processor.Process(block) if err != nil { - // empty the nonce channel - <-nonceErrCh - if IsKnownBlockErr(err) { stats.ignored++ continue } if err == BlockFutureErr { + // Allow up to MaxFuture second in the future blocks. If this limit + // is exceeded the chain is discarded and processed at a later time + // if given. + if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max { + return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max) + } + block.SetQueued(true) self.futureBlocks.Push(block) stats.queued++ @@ -597,16 +624,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } - // Wait and check nonce channel and make sure it checks out fine - // otherwise return the error - if err := <-nonceErrCh; err != nil { - return i, err - } cblock := self.currentBlock // Compare the TD of the last known block in the canonical chain to make sure it's greater. // At this point it's possible that a different chain (fork) becomes the new canonical chain. - if block.Td.Cmp(self.td) > 0 { + if block.Td.Cmp(self.Td()) > 0 { // chain fork if block.ParentHash() != cblock.Hash() { // during split we merge two different chains and create the new canonical chain @@ -619,8 +641,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { queueEvent.splitCount++ } + self.mu.Lock() self.setTotalDifficulty(block.Td) self.insert(block) + self.mu.Unlock() jsonlogger.LogJson(&logger.EthChainNewHead{ BlockHash: block.Hash().Hex(), @@ -636,11 +660,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { queueEvent.canonicalCount++ if glog.V(logger.Debug) { - glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...)\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) + glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } } else { if glog.V(logger.Detail) { - glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) + glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } queue[i] = ChainSideEvent{block, logs} @@ -728,9 +752,11 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error { } // insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly + self.mu.Lock() for _, block := range newChain { self.insert(block) } + self.mu.Unlock() return nil } @@ -744,7 +770,7 @@ out: case ev := <-events.Chan(): switch ev := ev.(type) { case queueEvent: - for i, event := range ev.queue { + for _, event := range ev.queue { switch event := event.(type) { case ChainEvent: // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long @@ -753,12 +779,6 @@ out: self.currentGasLimit = CalcGasLimit(event.Block) self.eventMux.Post(ChainHeadEvent{event.Block}) } - case ChainSplitEvent: - // On chain splits we need to reset the transaction state. We can't be sure whether the actual - // state of the accounts are still valid. - if i == ev.splitCount { - self.setTxState(state.New(event.Block.Root(), self.stateDb)) - } } self.eventMux.Post(event) @@ -776,66 +796,42 @@ func blockErr(block *types.Block, err error) { h := block.Header() glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes()) glog.V(logger.Error).Infoln(err) - glog.V(logger.Debug).Infoln(block) + glog.V(logger.Debug).Infoln(verifyNonces) +} + +type nonceResult struct { + i int + valid bool } -// verifyNonces verifies nonces of the given blocks in parallel and returns +// block verifies nonces of the given blocks in parallel and returns // an error if one of the blocks nonce verifications failed. -func verifyNonces(pow pow.PoW, blocks []*types.Block) error { +func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) { // Spawn a few workers. They listen for blocks on the in channel // and send results on done. The workers will exit in the // background when in is closed. var ( - in = make(chan *types.Block) - done = make(chan error, runtime.GOMAXPROCS(0)) + in = make(chan int) + nworkers = runtime.GOMAXPROCS(0) ) defer close(in) - for i := 0; i < cap(done); i++ { - go verifyNonce(pow, in, done) + if len(blocks) < nworkers { + nworkers = len(blocks) } - // Feed blocks to the workers, aborting at the first invalid nonce. - var ( - running, i int - block *types.Block - sendin = in - ) - for i < len(blocks) || running > 0 { - if i == len(blocks) { - // Disable sending to in. - sendin = nil - } else { - block = blocks[i] - i++ - } - select { - case sendin <- block: - running++ - case err := <-done: - running-- - if err != nil { - return err + for i := 0; i < nworkers; i++ { + go func() { + for i := range in { + done <- nonceResult{i: i, valid: pow.Verify(blocks[i])} } - } + }() } - return nil -} - -// verifyNonce is a worker for the verifyNonces method. It will run until -// in is closed. -func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) { - for block := range in { - if !pow.Verify(block) { - done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) - } else { - done <- nil + // Feed block indices to the workers. + for i := range blocks { + select { + case in <- i: + continue + case <-quit: + return } } } - -func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) { - if !pow.Verify(block) { - done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) - } else { - done <- nil - } -} |