diff options
author | Sonic <sonic@dexon.org> | 2018-11-20 12:12:39 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:53 +0800 |
commit | 2113837c006aad6af75c09d37514591fd6863dbc (patch) | |
tree | 0197452a9dfa7670666ca3b4d82a6078916d5515 /core/blockchain.go | |
parent | db18632dd211238fadcdec0fab643698be534b62 (diff) | |
download | go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.tar go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.tar.gz go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.tar.bz2 go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.tar.lz go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.tar.xz go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.tar.zst go-tangerine-2113837c006aad6af75c09d37514591fd6863dbc.zip |
core: implement insert chain, headerchain logic
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 8bf680c38..293bb88f4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -29,6 +29,7 @@ import ( "time" coreCommon "github.com/dexon-foundation/dexon-consensus/common" + dexCore "github.com/dexon-foundation/dexon-consensus/core" coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/hashicorp/golang-lru" @@ -145,6 +146,8 @@ type BlockChain struct { roundHeightMap sync.Map + verifierCache *dexCore.TSigVerifierCache + confirmedBlockInitMu sync.Mutex confirmedBlocks map[uint32]map[coreCommon.Hash]*blockInfo addressNonce map[uint32]map[common.Address]uint64 @@ -231,6 +234,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } + gov := NewGovernance(bc) + bc.verifierCache = dexCore.NewTSigVerifierCache(gov, 5) + // Take ownership of this particular state go bc.update() return bc, nil @@ -1007,6 +1013,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ bytes += batch.ValueSize() batch.Reset() } + if _, ok := bc.GetRoundHeight(block.Round()); !ok { + bc.storeRoundHeight(block.Round(), block.NumberU64()) + } } if batch.ValueSize() > 0 { bytes += batch.ValueSize() @@ -1527,6 +1536,205 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } +// InsertChain2 attempts to insert the given batch of blocks in to the canonical +// chain or, otherwise, create a fork. If an error is returned it will return +// the index number of the failing block as well an error describing what went +// wrong. +// +// After insertion is done, all accumulated events will be fired. +func (bc *BlockChain) InsertChain2(chain types.Blocks) (int, error) { + n, events, logs, err := bc.insertChain2(chain) + bc.PostChainEvents(events, logs) + return n, err +} + +// insertChain2 will execute the actual chain insertion and event aggregation. The +// only reason this method exists as a separate one is to make locking cleaner +// with deferred statements. +func (bc *BlockChain) insertChain2(chain types.Blocks) (int, []interface{}, []*types.Log, error) { + // Sanity check that we have something meaningful to import + if len(chain) == 0 { + return 0, nil, nil, nil + } + // Do a sanity check that the provided chain is actually ordered and linked + for i := 1; i < len(chain); i++ { + if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { + // Chain broke ancestry, log a message (programming error) and skip insertion + log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), + "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) + + return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), + chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) + } + } + // Pre-checks passed, start the full block imports + bc.wg.Add(1) + defer bc.wg.Done() + + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + // A queued approach to delivering events. This is generally + // faster than direct delivery and requires much less mutex + // acquiring. + var ( + stats = insertStats{startTime: mclock.Now()} + events = make([]interface{}, 0, len(chain)) + lastCanon *types.Block + coalescedLogs []*types.Log + ) + + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + + // Iterate over the blocks and insert when the verifier permits + for i, block := range chain { + // If the chain is terminating, stop processing blocks + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + break + } + // If the header is a banned one, straight out abort + if BadHashes[block.Hash()] { + bc.reportBlock(block, nil, ErrBlacklistedHash) + return i, events, coalescedLogs, ErrBlacklistedHash + } + // Wait for the block's verification to complete + bstart := time.Now() + + err := bc.hc.verifyTSig(block.Header(), bc.verifierCache) + if err == nil { + err = bc.Validator().ValidateBody(block) + } + switch { + case err == ErrKnownBlock: + // Block and state both already known. However if the current block is below + // this number we did a rollback and we should reimport it nonetheless. + if bc.CurrentBlock().NumberU64() >= block.NumberU64() { + stats.ignored++ + continue + } + + case err == consensus.ErrFutureBlock: + // Allow up to MaxFuture second in the future blocks. If this limit is exceeded + // the chain is discarded and processed at a later time if given. + max := time.Now().Unix() + maxTimeFutureBlocks + if block.Time() > uint64(max) { + return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) + } + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrPrunedAncestor: + // Block competing with the canonical chain, store in the db, but don't process + // until the competitor TD goes above the canonical TD + currentBlock := bc.CurrentBlock() + localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) + if localTd.Cmp(externTd) > 0 { + if err = bc.WriteBlockWithoutState(block, externTd); err != nil { + return i, events, coalescedLogs, err + } + continue + } + // Competitor chain beat canonical, gather all blocks from the common ancestor + var winner []*types.Block + + parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + for !bc.HasState(parent.Root()) { + winner = append(winner, parent) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + } + for j := 0; j < len(winner)/2; j++ { + winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] + } + // Import all the pruned blocks to make the state available + bc.chainmu.Unlock() + _, evs, logs, err := bc.insertChain2(winner) + bc.chainmu.Lock() + events, coalescedLogs = evs, logs + + if err != nil { + return i, events, coalescedLogs, err + } + + case err != nil: + bc.reportBlock(block, nil, err) + return i, events, coalescedLogs, err + } + // Create a new statedb using the parent block and report an + // error if it fails. + var parent *types.Block + if i == 0 { + parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + } else { + parent = chain[i-1] + } + state, err := state.New(parent.Root(), bc.stateCache) + if err != nil { + return i, events, coalescedLogs, err + } + // Process block using the parent state as reference point. + receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) + if err != nil { + bc.reportBlock(block, receipts, err) + return i, events, coalescedLogs, err + } + // Validate the state using the default validator + err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) + if err != nil { + bc.reportBlock(block, receipts, err) + return i, events, coalescedLogs, err + } + proctime := time.Since(bstart) + + // Write the block to the chain and get the status. + status, err := bc.WriteBlockWithState(block, receipts, state) + if err != nil { + return i, events, coalescedLogs, err + } + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + + coalescedLogs = append(coalescedLogs, logs...) + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainEvent{block, block.Hash(), logs}) + lastCanon = block + + // Only count canonical blocks for GC processing time + bc.gcproc += proctime + + case SideStatTy: + log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", + common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) + + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainSideEvent{block}) + } + stats.processed++ + stats.usedGas += usedGas + + cache, _ := bc.stateCache.TrieDB().Size() + stats.report(chain, i, cache) + if _, ok := bc.GetRoundHeight(block.Round()); !ok { + bc.storeRoundHeight(block.Round(), block.NumberU64()) + } + } + // Append a single chain head event if we've progressed the chain + if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { + events = append(events, ChainHeadEvent{lastCanon}) + } + return 0, events, coalescedLogs, nil +} + func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { n, events, logs, err := bc.processPendingBlock(block, witness) bc.PostChainEvents(events, logs) @@ -2014,6 +2222,24 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return bc.hc.InsertHeaderChain(chain, whFunc, start) } +func (bc *BlockChain) InsertHeaderChain2(chain []*types.HeaderWithGovState, verifierCache *dexCore.TSigVerifierCache) (int, error) { + start := time.Now() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + bc.wg.Add(1) + defer bc.wg.Done() + + whFunc := func(header *types.HeaderWithGovState) error { + bc.mu.Lock() + defer bc.mu.Unlock() + _, err := bc.hc.WriteHeader2(header) + return err + } + + return bc.hc.InsertHeaderChain2(chain, whFunc, start) +} + // writeHeader writes a header into the local chain, given that its parent is // already known. If the total difficulty of the newly inserted header becomes // greater than the current known TD, the canonical chain is re-routed. |