diff options
author | bojie <bojie@dexon.org> | 2019-01-14 20:42:15 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:56 +0800 |
commit | 2155bbd85373a8527e3b565f4316bc4f1517650a (patch) | |
tree | 2f64cbc5a6810aa496524c7509a4998847526123 /core/blockchain.go | |
parent | 3fcb8b6825b3c62c9890a5ca59594f1503e66893 (diff) | |
download | dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.tar dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.tar.gz dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.tar.bz2 dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.tar.lz dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.tar.xz dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.tar.zst dexon-2155bbd85373a8527e3b565f4316bc4f1517650a.zip |
app: remove pending block logic (#149)
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 331 |
1 files changed, 136 insertions, 195 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 089f1c2fa..6aee356d2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -105,15 +105,14 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - blockConfirmedFeed event.Feed - logsFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -154,14 +153,6 @@ type BlockChain struct { addressCost map[uint32]map[common.Address]*big.Int addressCounter map[uint32]map[common.Address]uint64 chainLastHeight sync.Map - - pendingBlockMu sync.RWMutex - lastPendingHeight uint64 - pendingBlocks map[uint64]struct { - block *types.Block - receipts types.Receipts - proctime time.Duration - } } // NewBlockChain returns a fully initialised block chain using information @@ -183,25 +174,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), - quit: make(chan struct{}), - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - pendingBlocks: make(map[uint64]struct { - block *types.Block - receipts types.Receipts - proctime time.Duration - }), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(nil), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + quit: make(chan struct{}), + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + futureBlocks: futureBlocks, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, confirmedBlocks: make(map[uint32]map[coreCommon.Hash]*blockInfo), addressNonce: make(map[uint32]map[common.Address]uint64), addressCost: make(map[uint32]map[common.Address]*big.Int), @@ -1764,6 +1750,16 @@ func (bc *BlockChain) insertDexonChain(chain types.Blocks) (int, []interface{}, } proctime := time.Since(bstart) + chainBlock := bc.GetBlockByNumber(block.NumberU64()) + if chainBlock != nil { + if chainBlock.Hash() != block.Hash() { + return i, nil, nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", block.NumberU64(), + chainBlock.Hash(), block.Hash()) + } + + continue + } + // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { @@ -1814,13 +1810,13 @@ func (bc *BlockChain) VerifyDexonHeader(header *types.Header) error { return bc.hc.verifyTSig(header, bc.verifierCache) } -func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { - n, events, logs, err := bc.processPendingBlock(block, witness) +func (bc *BlockChain) ProcessBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { + root, events, logs, err := bc.processBlock(block, witness) bc.PostChainEvents(events, logs) - return n, err + return root, err } -func (bc *BlockChain) processPendingBlock( +func (bc *BlockChain) processBlock( block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) @@ -1835,7 +1831,6 @@ func (bc *BlockChain) processPendingBlock( var ( stats = insertStats{startTime: mclock.Now()} events = make([]interface{}, 0, 2) - lastCanon *types.Block coalescedLogs []*types.Log ) @@ -1858,28 +1853,23 @@ func (bc *BlockChain) processPendingBlock( ) var parentBlock *types.Block - var pendingState *state.StateDB + var currentState *state.StateDB var err error - parent, exist := bc.pendingBlocks[block.NumberU64()-1] - if !exist { - parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) - if parentBlock == nil { - return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) - } - } else { - parentBlock = parent.block + parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) + if parentBlock == nil { + return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } header.ParentHash = parentBlock.Hash() - pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + currentState, err = state.New(parentBlock.Root(), bc.stateCache) if err != nil { return nil, nil, nil, err } // Iterate over and process the individual transactions. for i, tx := range block.Transactions() { - pendingState.Prepare(tx.Hash(), block.Hash(), i) - receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) + currentState.Prepare(tx.Hash(), block.Hash(), i) + receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, currentState, header, tx, usedGas, bc.vmConfig) if err != nil { return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) } @@ -1888,189 +1878,148 @@ func (bc *BlockChain) processPendingBlock( } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) header.GasUsed = *usedGas - newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) + newBlock, err := bc.engine.Finalize(bc, header, currentState, block.Transactions(), block.Uncles(), receipts) + root := newBlock.Root() if err != nil { return nil, nil, nil, fmt.Errorf("finalize error: %v", err) } - if _, ok := bc.GetRoundHeight(newPendingBlock.Round()); !ok { - bc.storeRoundHeight(newPendingBlock.Round(), newPendingBlock.NumberU64()) + if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok { + bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64()) } - proctime := time.Since(bstart) - // Commit state to refresh stateCache. - _, err = pendingState.Commit(true) - if err != nil { - return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err) - } - - // Add into pending blocks. - bc.addPendingBlock(newPendingBlock, receipts, proctime) - events = append(events, BlockConfirmedEvent{newPendingBlock}) - - log.Debug("Inserted pending block", "height", newPendingBlock.Number(), "hash", newPendingBlock.Hash()) - - // Start insert available pending blocks into db - for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ { - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") - return nil, nil, nil, fmt.Errorf("interrupt") - } - - pendingIns, exist := bc.pendingBlocks[pendingHeight] - if !exist { - log.Error("Block has already inserted", "height", pendingHeight) - continue + chainBlock := bc.GetBlockByNumber(newBlock.NumberU64()) + if chainBlock != nil { + if chainBlock.Hash() != newBlock.Hash() { + return nil, nil, nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", newBlock.NumberU64(), + chainBlock.Hash(), newBlock.Hash()) } - s, err := state.New(pendingIns.block.Root(), bc.stateCache) - if err != nil { - return nil, events, coalescedLogs, err - } + return &root, nil, nil, nil + } - // Write the block to the chain and get the status. - insertTime := time.Now() - status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s) - if err != nil { - return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) - } + // Write the block to the chain and get the status. + status, err := bc.WriteBlockWithState(newBlock, receipts, currentState) + if err != nil { + return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) + } - switch status { - case CanonStatTy: - log.Debug("Inserted new block", "number", pendingIns.block.Number(), "hash", pendingIns.block.Hash(), - "uncles", len(pendingIns.block.Uncles()), "txs", len(pendingIns.block.Transactions()), - "gas", pendingIns.block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", newBlock.Number(), "hash", newBlock.Hash(), + "uncles", len(newBlock.Uncles()), "txs", len(newBlock.Transactions()), + "gas", newBlock.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) - var allLogs []*types.Log - for _, r := range pendingIns.receipts { - allLogs = append(allLogs, r.Logs...) - } - coalescedLogs = append(coalescedLogs, allLogs...) - blockInsertTimer.UpdateSince(insertTime) - events = append(events, ChainEvent{pendingIns.block, pendingIns.block.Hash(), allLogs}) - lastCanon = pendingIns.block + var allLogs []*types.Log + for _, r := range receipts { + allLogs = append(allLogs, r.Logs...) + } + coalescedLogs = append(coalescedLogs, allLogs...) + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainEvent{newBlock, newBlock.Hash(), allLogs}, ChainHeadEvent{newBlock}) - // Only count canonical blocks for GC processing time - bc.gcproc += pendingIns.proctime + // Only count canonical blocks for GC processing time + bc.gcproc += proctime - case SideStatTy: - return nil, nil, nil, fmt.Errorf("insert pending block and fork found") - } - bc.removePendingBlock(pendingHeight) + case SideStatTy: + return nil, nil, nil, fmt.Errorf("insert pending block and fork found") + } - stats.processed++ - stats.usedGas += pendingIns.block.GasUsed() + stats.processed++ + stats.usedGas += newBlock.GasUsed() - cache, _ := bc.stateCache.TrieDB().Size() - stats.report([]*types.Block{pendingIns.block}, 0, cache) + cache, _ := bc.stateCache.TrieDB().Size() + stats.report([]*types.Block{newBlock}, 0, cache) - if pendingHeight == witness.Height { - err = bc.updateLastRoundNumber(pendingIns.block.Round()) - if err != nil { - return nil, nil, nil, err - } - } - } - // Append a single chain head event if we've progressed the chain - if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { - events = append(events, ChainHeadEvent{lastCanon}) + err = bc.updateLastRoundNumber(newBlock.Round()) + if err != nil { + return nil, nil, nil, err } - root := newPendingBlock.Root() return &root, events, coalescedLogs, nil } -func (bc *BlockChain) ProcessEmptyBlock(block *types.Block) error { +func (bc *BlockChain) ProcessEmptyBlock(block *types.Block) (*common.Hash, error) { bstart := time.Now() + var stats = insertStats{startTime: mclock.Now()} var header = block.Header() var parentBlock *types.Block - var pendingState *state.StateDB + var currentState *state.StateDB var err error - parent, exist := bc.pendingBlocks[block.NumberU64()-1] - if !exist { - parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) - if parentBlock == nil { - return fmt.Errorf("parent block %d not exist", block.NumberU64()-1) - } - } else { - parentBlock = parent.block + + parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) + if parentBlock == nil { + return nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } - pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + currentState, err = state.New(parentBlock.Root(), bc.stateCache) if err != nil { - return err + return nil, err } header.ParentHash = parentBlock.Hash() header.GasUsed = 0 - header.Root = pendingState.IntermediateRoot(true) + header.Root = currentState.IntermediateRoot(true) if header.Root != parentBlock.Root() { - return fmt.Errorf("empty block state root must same as parent") + return nil, fmt.Errorf("empty block state root must same as parent") } - newPendingBlock := types.NewBlock(header, nil, nil, nil) - if _, ok := bc.GetRoundHeight(newPendingBlock.Round()); !ok { - bc.storeRoundHeight(newPendingBlock.Round(), newPendingBlock.NumberU64()) + newBlock := types.NewBlock(header, nil, nil, nil) + root := newBlock.Root() + if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok { + bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64()) } + if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok { + bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64()) + } proctime := time.Since(bstart) - bc.addPendingBlock(newPendingBlock, nil, proctime) - bc.PostChainEvents([]interface{}{BlockConfirmedEvent{newPendingBlock}}, nil) - return nil -} - -func (bc *BlockChain) removePendingBlock(height uint64) { - bc.pendingBlockMu.Lock() - defer bc.pendingBlockMu.Unlock() - - delete(bc.pendingBlocks, height) -} - -func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts, proctime time.Duration) { - bc.pendingBlockMu.Lock() - defer bc.pendingBlockMu.Unlock() - - bc.pendingBlocks[block.NumberU64()] = struct { - block *types.Block - receipts types.Receipts - proctime time.Duration - }{block: block, receipts: receipts, proctime: proctime} - bc.lastPendingHeight = block.NumberU64() -} + chainBlock := bc.GetBlockByNumber(newBlock.NumberU64()) + if chainBlock != nil { + if chainBlock.Hash() != newBlock.Hash() { + return nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", newBlock.NumberU64(), + chainBlock.Hash(), newBlock.Hash()) + } -func (bc *BlockChain) GetPendingHeight() uint64 { - bc.pendingBlockMu.RLock() - defer bc.pendingBlockMu.RUnlock() + return &root, nil + } - return bc.lastPendingHeight -} + // Write the block to the chain and get the status. + status, err := bc.WriteBlockWithState(newBlock, nil, currentState) + if err != nil { + return nil, fmt.Errorf("WriteBlockWithState error: %v", err) + } -func (bc *BlockChain) PendingBlock() *types.Block { - bc.pendingBlockMu.RLock() - defer bc.pendingBlockMu.RUnlock() + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", newBlock.Number(), "hash", newBlock.Hash(), + "uncles", len(newBlock.Uncles()), "txs", len(newBlock.Transactions()), + "gas", newBlock.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + blockInsertTimer.UpdateSince(bstart) + // Only count canonical blocks for GC processing time + bc.gcproc += proctime - return bc.pendingBlocks[bc.lastPendingHeight].block -} + case SideStatTy: + return nil, fmt.Errorf("insert pending block and fork found") + } -func (bc *BlockChain) GetPendingBlockByNumber(number uint64) *types.Block { - bc.pendingBlockMu.RLock() - defer bc.pendingBlockMu.RUnlock() + stats.processed++ + stats.usedGas += newBlock.GasUsed() - return bc.pendingBlocks[number].block -} + cache, _ := bc.stateCache.TrieDB().Size() + stats.report([]*types.Block{newBlock}, 0, cache) -func (bc *BlockChain) GetPending() (*types.Block, *state.StateDB) { - block := bc.PendingBlock() - if block == nil { - block = bc.CurrentBlock() - } - s, err := state.New(block.Header().Root, bc.stateCache) + err = bc.updateLastRoundNumber(newBlock.Round()) if err != nil { - panic(err) + return nil, err } - return block, s + + bc.PostChainEvents([]interface{}{ChainEvent{newBlock, newBlock.Hash(), nil}, + ChainHeadEvent{newBlock}}, nil) + + return &root, nil } // GetGovStateByHash extracts the governance contract's state from state trie @@ -2253,9 +2202,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { case ChainHeadEvent: bc.chainHeadFeed.Send(ev) - case BlockConfirmedEvent: - bc.blockConfirmedFeed.Send(ev) - case ChainSideEvent: bc.chainSideFeed.Send(ev) } @@ -2475,11 +2421,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } -// SubscribeBlockConfirmedEvent registers a subscription of ChainHeadEvent. -func (bc *BlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { - return bc.scope.Track(bc.blockConfirmedFeed.Subscribe(ch)) -} - // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) |