From c33cc382b3561ca91871111933f81653bfd8532f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 21 Sep 2015 15:36:29 +0300 Subject: core: support inserting pure header chains --- core/blockchain.go | 298 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 233 insertions(+), 65 deletions(-) (limited to 'core/blockchain.go') diff --git a/core/blockchain.go b/core/blockchain.go index 7bfe13a11..48c935eac 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -67,9 +67,9 @@ type BlockChain struct { chainmu sync.RWMutex tsmu sync.RWMutex - td *big.Int - currentBlock *types.Block - currentGasLimit *big.Int + checkpoint int // checkpoint counts towards the new checkpoint + currentHeader *types.Header // Current head of the header chain (may be above the block chain!) + currentBlock *types.Block // Current head of the block chain headerCache *lru.Cache // Cache for the most recent block headers bodyCache *lru.Cache // Cache for the most recent block bodies @@ -120,20 +120,15 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl } glog.V(logger.Info).Infoln("WARNING: Wrote default ethereum genesis block") } - if err := bc.setLastState(); err != nil { + if err := bc.loadLastState(); err != nil { return nil, err } // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain for hash, _ := range BadHashes { - if block := bc.GetBlock(hash); block != nil { - glog.V(logger.Error).Infof("Found bad hash. Reorganising chain to state %x\n", block.ParentHash().Bytes()[:4]) - block = bc.GetBlock(block.ParentHash()) - if block == nil { - glog.Fatal("Unable to complete. Parent block not found. Corrupted DB?") - } - bc.SetHead(block) - - glog.V(logger.Error).Infoln("Chain reorg was successfull. Resuming normal operation") + if header := bc.GetHeader(hash); header != nil { + glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4]) + bc.SetHead(header.Number.Uint64() - 1) + glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation") } } // Take ownership of this particular state @@ -141,46 +136,104 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl return bc, nil } -func (bc *BlockChain) SetHead(head *types.Block) { +// loadLastState loads the last known chain state from the database. This method +// assumes that the chain manager mutex is held. +func (self *BlockChain) loadLastState() error { + // Restore the last known head block + head := GetHeadBlockHash(self.chainDb) + if head == (common.Hash{}) { + // Corrupt or empty database, init from scratch + self.Reset() + } else { + if block := self.GetBlock(head); block != nil { + // Block found, set as the current head + self.currentBlock = block + } else { + // Corrupt or empty database, init from scratch + self.Reset() + } + } + // Restore the last known head header + self.currentHeader = self.currentBlock.Header() + if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) { + if header := self.GetHeader(head); header != nil { + self.currentHeader = header + } + } + // Issue a status log and return + headerTd := self.GetTd(self.currentHeader.Hash()) + blockTd := self.GetTd(self.currentBlock.Hash()) + + glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash(), headerTd) + glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash(), blockTd) + + return nil +} + +// SetHead rewind the local chain to a new head entity. In the case of headers, +// everything above the new head will be deleted and the new one set. In the case +// of blocks though, the head may be further rewound if block bodies are missing +// (non-archive nodes after a fast sync). +func (bc *BlockChain) SetHead(head uint64) { bc.mu.Lock() defer bc.mu.Unlock() - for block := bc.currentBlock; block != nil && block.Hash() != head.Hash(); block = bc.GetBlock(block.ParentHash()) { - DeleteBlock(bc.chainDb, block.Hash()) + // Delete everything from the current header head (is above block head) + for i := bc.currentHeader.Number.Uint64(); i > head; i-- { + if hash := GetCanonicalHash(bc.chainDb, i); hash != (common.Hash{}) { + DeleteCanonicalHash(bc.chainDb, i) + DeleteHeader(bc.chainDb, hash) + DeleteBody(bc.chainDb, hash) + DeleteTd(bc.chainDb, hash) + } + } + bc.currentHeader = GetHeader(bc.chainDb, GetCanonicalHash(bc.chainDb, head)) + + // Rewind the block chain until a whole block is found + for bc.GetBlockByNumber(head) == nil { + head-- } + bc.currentBlock = bc.GetBlockByNumber(head) + + // Clear out any stale content from the caches bc.headerCache.Purge() bc.bodyCache.Purge() bc.bodyRLPCache.Purge() bc.blockCache.Purge() bc.futureBlocks.Purge() - bc.currentBlock = head - bc.setTotalDifficulty(bc.GetTd(head.Hash())) - bc.insert(head) - bc.setLastState() + // Update all computed fields to the new head + bc.insert(bc.currentBlock) + bc.loadLastState() } -func (self *BlockChain) Td() *big.Int { +func (self *BlockChain) GasLimit() *big.Int { self.mu.RLock() defer self.mu.RUnlock() - return new(big.Int).Set(self.td) + return self.currentBlock.GasLimit() } -func (self *BlockChain) GasLimit() *big.Int { +func (self *BlockChain) LastBlockHash() common.Hash { self.mu.RLock() defer self.mu.RUnlock() - return self.currentBlock.GasLimit() + return self.currentBlock.Hash() } -func (self *BlockChain) LastBlockHash() common.Hash { +// CurrentHeader retrieves the current head header of the canonical chain. The +// header is retrieved from the chain manager's internal cache, involving no +// database operations. +func (self *BlockChain) CurrentHeader() *types.Header { self.mu.RLock() defer self.mu.RUnlock() - return self.currentBlock.Hash() + return self.currentHeader } +// CurrentBlock retrieves the current head block of the canonical chain. The +// block is retrieved from the chain manager's internal cache, involving no +// database operations. func (self *BlockChain) CurrentBlock() *types.Block { self.mu.RLock() defer self.mu.RUnlock() @@ -192,7 +245,7 @@ func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesis self.mu.RLock() defer self.mu.RUnlock() - return new(big.Int).Set(self.td), self.currentBlock.Hash(), self.genesisBlock.Hash() + return self.GetTd(self.currentBlock.Hash()), self.currentBlock.Hash(), self.genesisBlock.Hash() } func (self *BlockChain) SetProcessor(proc types.BlockProcessor) { @@ -203,26 +256,6 @@ func (self *BlockChain) State() (*state.StateDB, error) { return state.New(self.CurrentBlock().Root(), self.chainDb) } -func (bc *BlockChain) setLastState() error { - head := GetHeadBlockHash(bc.chainDb) - if head != (common.Hash{}) { - block := bc.GetBlock(head) - if block != nil { - bc.currentBlock = block - } - } else { - bc.Reset() - } - bc.td = bc.GetTd(bc.currentBlock.Hash()) - bc.currentGasLimit = CalcGasLimit(bc.currentBlock) - - if glog.V(logger.Info) { - glog.Infof("Last block (#%v) %x TD=%v\n", bc.currentBlock.Number(), bc.currentBlock.Hash(), bc.td) - } - - return nil -} - // Reset purges the entire blockchain, restoring it to its genesis state. func (bc *BlockChain) Reset() { bc.ResetWithGenesisBlock(bc.genesisBlock) @@ -238,6 +271,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) { for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.ParentHash()) { DeleteBlock(bc.chainDb, block.Hash()) } + for header := bc.currentHeader; header != nil; header = bc.GetHeader(header.ParentHash) { + DeleteBlock(bc.chainDb, header.Hash()) + } bc.headerCache.Purge() bc.bodyCache.Purge() bc.bodyRLPCache.Purge() @@ -254,7 +290,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) { bc.genesisBlock = genesis bc.insert(bc.genesisBlock) bc.currentBlock = bc.genesisBlock - bc.setTotalDifficulty(genesis.Difficulty()) + bc.currentHeader = bc.genesisBlock.Header() } // Export writes the active chain to the given writer. @@ -290,17 +326,26 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return nil } -// insert injects a block into the current chain block chain. Note, this function -// assumes that the `mu` mutex is held! +// insert injects a new head block into the current block chain. This method +// assumes that the block is indeed a true head. It will also reset the head +// header to this very same block to prevent the headers from diverging on a +// different header chain. +// +// Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) insert(block *types.Block) { // Add the block to the canonical chain number scheme and mark as the head if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil { glog.Fatalf("failed to insert block number: %v", err) } if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil { - glog.Fatalf("failed to insert block number: %v", err) + glog.Fatalf("failed to insert head block hash: %v", err) } + if err := WriteHeadHeaderHash(bc.chainDb, block.Hash()); err != nil { + glog.Fatalf("failed to insert head header hash: %v", err) + } + // Update the internal state with the head block bc.currentBlock = block + bc.currentHeader = block.Header() } // Accessors @@ -456,19 +501,15 @@ func (self *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*ty return } -func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) (uncles []*types.Header) { +// GetUnclesInChain retrieves all the uncles from a given block backwards until +// a specific distance is reached. +func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header { + uncles := []*types.Header{} for i := 0; block != nil && i < length; i++ { uncles = append(uncles, block.Uncles()...) block = self.GetBlock(block.ParentHash()) } - - return -} - -// setTotalDifficulty updates the TD of the chain manager. Note, this function -// assumes that the `mu` mutex is held! -func (bc *BlockChain) setTotalDifficulty(td *big.Int) { - bc.td = new(big.Int).Set(td) + return uncles } func (bc *BlockChain) Stop() { @@ -504,6 +545,135 @@ const ( SideStatTy ) +// writeHeader writes a header into the local chain, given that its parent is +// already known. If the total difficulty of the newly inserted header becomes +// greater than the old known TD, the canonical chain is re-routed. +// +// Note: This method is not concurrent-safe with inserting blocks simultaneously +// into the chain, as side effects caused by reorganizations cannot be emulated +// without the real blocks. Hence, writing headers directly should only be done +// in two scenarios: pure-header mode of operation (light clients), or properly +// separated header/block phases (non-archive clients). +func (self *BlockChain) writeHeader(header *types.Header) error { + self.wg.Add(1) + defer self.wg.Done() + + // Calculate the total difficulty of the header + ptd := self.GetTd(header.ParentHash) + if ptd == nil { + return ParentError(header.ParentHash) + } + td := new(big.Int).Add(header.Difficulty, ptd) + + // Make sure no inconsistent state is leaked during insertion + self.mu.Lock() + defer self.mu.Unlock() + + // If the total difficulty is higher than our known, add it to the canonical chain + if td.Cmp(self.GetTd(self.currentHeader.Hash())) > 0 { + // Delete any canonical number assignments above the new head + for i := header.Number.Uint64() + 1; GetCanonicalHash(self.chainDb, i) != (common.Hash{}); i++ { + DeleteCanonicalHash(self.chainDb, i) + } + // Overwrite any stale canonical number assignments + head := self.GetHeader(header.ParentHash) + for GetCanonicalHash(self.chainDb, head.Number.Uint64()) != head.Hash() { + WriteCanonicalHash(self.chainDb, head.Hash(), head.Number.Uint64()) + head = self.GetHeader(head.ParentHash) + } + // Extend the canonical chain with the new header + if err := WriteCanonicalHash(self.chainDb, header.Hash(), header.Number.Uint64()); err != nil { + glog.Fatalf("failed to insert header number: %v", err) + } + if err := WriteHeadHeaderHash(self.chainDb, header.Hash()); err != nil { + glog.Fatalf("failed to insert head header hash: %v", err) + } + self.currentHeader = types.CopyHeader(header) + } + // Irrelevant of the canonical status, write the header itself to the database + if err := WriteTd(self.chainDb, header.Hash(), td); err != nil { + glog.Fatalf("failed to write header total difficulty: %v", err) + } + if err := WriteHeader(self.chainDb, header); err != nil { + glog.Fatalf("filed to write header contents: %v", err) + } + return nil +} + +// InsertHeaderChain will attempt to insert the given header chain in to the +// local chain, possibly creating a dork. If an error is returned, it will +// return the index number of the failing header as well an error describing +// what went wrong. +// +// The verify parameter can be used to fine tune whether nonce verification +// should be done or not. The reason behind the optional check is because some +// of the header retrieval mechanisms already need to verfy nonces, as well as +// because nonces can be verified sparsely, not needing to check each. +func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (int, error) { + self.wg.Add(1) + defer self.wg.Done() + + // Make sure only one thread manipulates the chain at once + self.chainmu.Lock() + defer self.chainmu.Unlock() + + // Collect some import statistics to report on + stats := struct{ processed, ignored int }{} + start := time.Now() + + // Start the parallel nonce verifier, with a fake nonce if not requested + verifier := self.pow + if !verify { + verifier = FakePow{} + } + nonceAbort, nonceResults := verifyNoncesFromHeaders(verifier, chain) + defer close(nonceAbort) + + // Iterate over the headers, inserting any new ones + complete := make([]bool, len(chain)) + for i, header := range chain { + // Short circuit insertion if shutting down + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("Premature abort during header chain processing") + break + } + hash := header.Hash() + + // Accumulate verification results until the next header is verified + for !complete[i] { + if res := <-nonceResults; res.valid { + complete[res.index] = true + } else { + header := chain[res.index] + return res.index, &BlockNonceErr{ + Hash: header.Hash(), + Number: new(big.Int).Set(header.Number), + Nonce: header.Nonce.Uint64(), + } + } + } + if BadHashes[hash] { + glog.V(logger.Error).Infof("Bad header %d [%x…], known bad hash", header.Number, hash) + return i, BadHashError(hash) + } + // Write the header to the chain and get the status + if self.HasHeader(hash) { + stats.ignored++ + continue + } + if err := self.writeHeader(header); err != nil { + return i, err + } + stats.processed++ + } + // Report some public statistics so the user has a clue what's going on + first, last := chain[0], chain[len(chain)-1] + glog.V(logger.Info).Infof("imported %d header(s) (%d ignored) in %v. #%v [%x… / %x…]", stats.processed, stats.ignored, + time.Since(start), last.Number, first.Hash().Bytes()[:4], last.Hash().Bytes()[:4]) + + return 0, nil +} + // WriteBlock writes the block to the chain. func (self *BlockChain) WriteBlock(block *types.Block) (status writeStatus, err error) { self.wg.Add(1) @@ -522,7 +692,7 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status writeStatus, err // Compare the TD of the last known block in the canonical chain to make sure it's greater. // At this point it's possible that a different chain (fork) becomes the new canonical chain. - if td.Cmp(self.Td()) > 0 { + if td.Cmp(self.GetTd(self.currentBlock.Hash())) > 0 { // chain fork if block.ParentHash() != cblock.Hash() { // during split we merge two different chains and create the new canonical chain @@ -534,7 +704,6 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status writeStatus, err status = CanonStatTy self.mu.Lock() - self.setTotalDifficulty(td) self.insert(block) self.mu.Unlock() } else { @@ -580,7 +749,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { txcount := 0 for i, block := range chain { if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("Premature abort during chain processing") + glog.V(logger.Debug).Infoln("Premature abort during block chain processing") break } @@ -788,8 +957,7 @@ func (self *BlockChain) postChainEvents(events []interface{}) { if event, ok := event.(ChainEvent); ok { // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long // and in most cases isn't even necessary. - if self.currentBlock.Hash() == event.Hash { - self.currentGasLimit = CalcGasLimit(event.Block) + if self.LastBlockHash() == event.Hash { self.eventMux.Post(ChainHeadEvent{event.Block}) } } -- cgit v1.2.3