diff options
author | bojie <bojie@dexon.org> | 2019-01-14 20:42:15 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-01-14 20:42:15 +0800 |
commit | 899022022f7f922b43e739e844800362587d5d56 (patch) | |
tree | 9719615c15a7964cd503fd0156189a366ad85eb9 /core | |
parent | 5823540202423aed2cbdf8c125e95d8b1a20e245 (diff) | |
download | dexon-899022022f7f922b43e739e844800362587d5d56.tar dexon-899022022f7f922b43e739e844800362587d5d56.tar.gz dexon-899022022f7f922b43e739e844800362587d5d56.tar.bz2 dexon-899022022f7f922b43e739e844800362587d5d56.tar.lz dexon-899022022f7f922b43e739e844800362587d5d56.tar.xz dexon-899022022f7f922b43e739e844800362587d5d56.tar.zst dexon-899022022f7f922b43e739e844800362587d5d56.zip |
app: remove pending block logic (#149)
Diffstat (limited to 'core')
-rw-r--r-- | core/block_validator.go | 7 | ||||
-rw-r--r-- | core/blockchain.go | 331 | ||||
-rw-r--r-- | core/blockchain_test.go | 38 | ||||
-rw-r--r-- | core/events.go | 2 | ||||
-rw-r--r-- | core/tx_pool.go | 73 | ||||
-rw-r--r-- | core/tx_pool_test.go | 4 |
6 files changed, 179 insertions, 276 deletions
diff --git a/core/block_validator.go b/core/block_validator.go index 660bd09f8..c6038381f 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -102,12 +102,9 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat } func (v *BlockValidator) ValidateWitnessData(height uint64, data types.WitnessData) error { - b := v.bc.GetPendingBlockByNumber(height) + b := v.bc.GetBlockByNumber(height) if b == nil { - b = v.bc.GetBlockByNumber(height) - if b == nil { - return fmt.Errorf("can not find block %v either pending or confirmed block", height) - } + return fmt.Errorf("can not find block %v either pending or confirmed block", height) } if b.Root() != data.Root { diff --git a/core/blockchain.go b/core/blockchain.go index 4ddc2aadb..b69b6c681 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -105,15 +105,14 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - blockConfirmedFeed event.Feed - logsFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -154,14 +153,6 @@ type BlockChain struct { addressCost map[uint32]map[common.Address]*big.Int addressCounter map[uint32]map[common.Address]uint64 chainLastHeight sync.Map - - pendingBlockMu sync.RWMutex - lastPendingHeight uint64 - pendingBlocks map[uint64]struct { - block *types.Block - receipts types.Receipts - proctime time.Duration - } } // NewBlockChain returns a fully initialised block chain using information @@ -183,25 +174,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), - quit: make(chan struct{}), - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - pendingBlocks: make(map[uint64]struct { - block *types.Block - receipts types.Receipts - proctime time.Duration - }), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(nil), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + quit: make(chan struct{}), + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + futureBlocks: futureBlocks, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, confirmedBlocks: make(map[uint32]map[coreCommon.Hash]*blockInfo), addressNonce: make(map[uint32]map[common.Address]uint64), addressCost: make(map[uint32]map[common.Address]*big.Int), @@ -1758,6 +1744,16 @@ func (bc *BlockChain) insertDexonChain(chain types.Blocks) (int, []interface{}, } proctime := time.Since(bstart) + chainBlock := bc.GetBlockByNumber(block.NumberU64()) + if chainBlock != nil { + if chainBlock.Hash() != block.Hash() { + return i, nil, nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", block.NumberU64(), + chainBlock.Hash(), block.Hash()) + } + + continue + } + // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { @@ -1808,13 +1804,13 @@ func (bc *BlockChain) VerifyDexonHeader(header *types.Header) error { return bc.hc.verifyTSig(header, bc.verifierCache) } -func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { - n, events, logs, err := bc.processPendingBlock(block, witness) +func (bc *BlockChain) ProcessBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { + root, events, logs, err := bc.processBlock(block, witness) bc.PostChainEvents(events, logs) - return n, err + return root, err } -func (bc *BlockChain) processPendingBlock( +func (bc *BlockChain) processBlock( block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) @@ -1829,7 +1825,6 @@ func (bc *BlockChain) processPendingBlock( var ( stats = insertStats{startTime: mclock.Now()} events = make([]interface{}, 0, 2) - lastCanon *types.Block coalescedLogs []*types.Log ) @@ -1852,28 +1847,23 @@ func (bc *BlockChain) processPendingBlock( ) var parentBlock *types.Block - var pendingState *state.StateDB + var currentState *state.StateDB var err error - parent, exist := bc.pendingBlocks[block.NumberU64()-1] - if !exist { - parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) - if parentBlock == nil { - return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) - } - } else { - parentBlock = parent.block + parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) + if parentBlock == nil { + return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } header.ParentHash = parentBlock.Hash() - pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + currentState, err = state.New(parentBlock.Root(), bc.stateCache) if err != nil { return nil, nil, nil, err } // Iterate over and process the individual transactions. for i, tx := range block.Transactions() { - pendingState.Prepare(tx.Hash(), block.Hash(), i) - receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) + currentState.Prepare(tx.Hash(), block.Hash(), i) + receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, currentState, header, tx, usedGas, bc.vmConfig) if err != nil { return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) } @@ -1882,189 +1872,148 @@ func (bc *BlockChain) processPendingBlock( } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) header.GasUsed = *usedGas - newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) + newBlock, err := bc.engine.Finalize(bc, header, currentState, block.Transactions(), block.Uncles(), receipts) + root := newBlock.Root() if err != nil { return nil, nil, nil, fmt.Errorf("finalize error: %v", err) } - if _, ok := bc.GetRoundHeight(newPendingBlock.Round()); !ok { - bc.storeRoundHeight(newPendingBlock.Round(), newPendingBlock.NumberU64()) + if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok { + bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64()) } - proctime := time.Since(bstart) - // Commit state to refresh stateCache. - _, err = pendingState.Commit(true) - if err != nil { - return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err) - } - - // Add into pending blocks. - bc.addPendingBlock(newPendingBlock, receipts, proctime) - events = append(events, BlockConfirmedEvent{newPendingBlock}) - - log.Debug("Inserted pending block", "height", newPendingBlock.Number(), "hash", newPendingBlock.Hash()) - - // Start insert available pending blocks into db - for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ { - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") - return nil, nil, nil, fmt.Errorf("interrupt") - } - - pendingIns, exist := bc.pendingBlocks[pendingHeight] - if !exist { - log.Error("Block has already inserted", "height", pendingHeight) - continue + chainBlock := bc.GetBlockByNumber(newBlock.NumberU64()) + if chainBlock != nil { + if chainBlock.Hash() != newBlock.Hash() { + return nil, nil, nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", newBlock.NumberU64(), + chainBlock.Hash(), newBlock.Hash()) } - s, err := state.New(pendingIns.block.Root(), bc.stateCache) - if err != nil { - return nil, events, coalescedLogs, err - } + return &root, nil, nil, nil + } - // Write the block to the chain and get the status. - insertTime := time.Now() - status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s) - if err != nil { - return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) - } + // Write the block to the chain and get the status. + status, err := bc.WriteBlockWithState(newBlock, receipts, currentState) + if err != nil { + return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) + } - switch status { - case CanonStatTy: - log.Debug("Inserted new block", "number", pendingIns.block.Number(), "hash", pendingIns.block.Hash(), - "uncles", len(pendingIns.block.Uncles()), "txs", len(pendingIns.block.Transactions()), - "gas", pendingIns.block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", newBlock.Number(), "hash", newBlock.Hash(), + "uncles", len(newBlock.Uncles()), "txs", len(newBlock.Transactions()), + "gas", newBlock.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) - var allLogs []*types.Log - for _, r := range pendingIns.receipts { - allLogs = append(allLogs, r.Logs...) - } - coalescedLogs = append(coalescedLogs, allLogs...) - blockInsertTimer.UpdateSince(insertTime) - events = append(events, ChainEvent{pendingIns.block, pendingIns.block.Hash(), allLogs}) - lastCanon = pendingIns.block + var allLogs []*types.Log + for _, r := range receipts { + allLogs = append(allLogs, r.Logs...) + } + coalescedLogs = append(coalescedLogs, allLogs...) + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainEvent{newBlock, newBlock.Hash(), allLogs}, ChainHeadEvent{newBlock}) - // Only count canonical blocks for GC processing time - bc.gcproc += pendingIns.proctime + // Only count canonical blocks for GC processing time + bc.gcproc += proctime - case SideStatTy: - return nil, nil, nil, fmt.Errorf("insert pending block and fork found") - } - bc.removePendingBlock(pendingHeight) + case SideStatTy: + return nil, nil, nil, fmt.Errorf("insert pending block and fork found") + } - stats.processed++ - stats.usedGas += pendingIns.block.GasUsed() + stats.processed++ + stats.usedGas += newBlock.GasUsed() - cache, _ := bc.stateCache.TrieDB().Size() - stats.report([]*types.Block{pendingIns.block}, 0, cache) + cache, _ := bc.stateCache.TrieDB().Size() + stats.report([]*types.Block{newBlock}, 0, cache) - if pendingHeight == witness.Height { - err = bc.updateLastRoundNumber(pendingIns.block.Round()) - if err != nil { - return nil, nil, nil, err - } - } - } - // Append a single chain head event if we've progressed the chain - if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { - events = append(events, ChainHeadEvent{lastCanon}) + err = bc.updateLastRoundNumber(newBlock.Round()) + if err != nil { + return nil, nil, nil, err } - root := newPendingBlock.Root() return &root, events, coalescedLogs, nil } -func (bc *BlockChain) ProcessEmptyBlock(block *types.Block) error { +func (bc *BlockChain) ProcessEmptyBlock(block *types.Block) (*common.Hash, error) { bstart := time.Now() + var stats = insertStats{startTime: mclock.Now()} var header = block.Header() var parentBlock *types.Block - var pendingState *state.StateDB + var currentState *state.StateDB var err error - parent, exist := bc.pendingBlocks[block.NumberU64()-1] - if !exist { - parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) - if parentBlock == nil { - return fmt.Errorf("parent block %d not exist", block.NumberU64()-1) - } - } else { - parentBlock = parent.block + + parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) + if parentBlock == nil { + return nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } - pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + currentState, err = state.New(parentBlock.Root(), bc.stateCache) if err != nil { - return err + return nil, err } header.ParentHash = parentBlock.Hash() header.GasUsed = 0 - header.Root = pendingState.IntermediateRoot(true) + header.Root = currentState.IntermediateRoot(true) if header.Root != parentBlock.Root() { - return fmt.Errorf("empty block state root must same as parent") + return nil, fmt.Errorf("empty block state root must same as parent") } - newPendingBlock := types.NewBlock(header, nil, nil, nil) - if _, ok := bc.GetRoundHeight(newPendingBlock.Round()); !ok { - bc.storeRoundHeight(newPendingBlock.Round(), newPendingBlock.NumberU64()) + newBlock := types.NewBlock(header, nil, nil, nil) + root := newBlock.Root() + if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok { + bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64()) } + if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok { + bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64()) + } proctime := time.Since(bstart) - bc.addPendingBlock(newPendingBlock, nil, proctime) - bc.PostChainEvents([]interface{}{BlockConfirmedEvent{newPendingBlock}}, nil) - return nil -} - -func (bc *BlockChain) removePendingBlock(height uint64) { - bc.pendingBlockMu.Lock() - defer bc.pendingBlockMu.Unlock() - - delete(bc.pendingBlocks, height) -} - -func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts, proctime time.Duration) { - bc.pendingBlockMu.Lock() - defer bc.pendingBlockMu.Unlock() - - bc.pendingBlocks[block.NumberU64()] = struct { - block *types.Block - receipts types.Receipts - proctime time.Duration - }{block: block, receipts: receipts, proctime: proctime} - bc.lastPendingHeight = block.NumberU64() -} + chainBlock := bc.GetBlockByNumber(newBlock.NumberU64()) + if chainBlock != nil { + if chainBlock.Hash() != newBlock.Hash() { + return nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", newBlock.NumberU64(), + chainBlock.Hash(), newBlock.Hash()) + } -func (bc *BlockChain) GetPendingHeight() uint64 { - bc.pendingBlockMu.RLock() - defer bc.pendingBlockMu.RUnlock() + return &root, nil + } - return bc.lastPendingHeight -} + // Write the block to the chain and get the status. + status, err := bc.WriteBlockWithState(newBlock, nil, currentState) + if err != nil { + return nil, fmt.Errorf("WriteBlockWithState error: %v", err) + } -func (bc *BlockChain) PendingBlock() *types.Block { - bc.pendingBlockMu.RLock() - defer bc.pendingBlockMu.RUnlock() + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", newBlock.Number(), "hash", newBlock.Hash(), + "uncles", len(newBlock.Uncles()), "txs", len(newBlock.Transactions()), + "gas", newBlock.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + blockInsertTimer.UpdateSince(bstart) + // Only count canonical blocks for GC processing time + bc.gcproc += proctime - return bc.pendingBlocks[bc.lastPendingHeight].block -} + case SideStatTy: + return nil, fmt.Errorf("insert pending block and fork found") + } -func (bc *BlockChain) GetPendingBlockByNumber(number uint64) *types.Block { - bc.pendingBlockMu.RLock() - defer bc.pendingBlockMu.RUnlock() + stats.processed++ + stats.usedGas += newBlock.GasUsed() - return bc.pendingBlocks[number].block -} + cache, _ := bc.stateCache.TrieDB().Size() + stats.report([]*types.Block{newBlock}, 0, cache) -func (bc *BlockChain) GetPending() (*types.Block, *state.StateDB) { - block := bc.PendingBlock() - if block == nil { - block = bc.CurrentBlock() - } - s, err := state.New(block.Header().Root, bc.stateCache) + err = bc.updateLastRoundNumber(newBlock.Round()) if err != nil { - panic(err) + return nil, err } - return block, s + + bc.PostChainEvents([]interface{}{ChainEvent{newBlock, newBlock.Hash(), nil}, + ChainHeadEvent{newBlock}}, nil) + + return &root, nil } // GetGovStateByHash extracts the governance contract's state from state trie @@ -2225,9 +2174,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { case ChainHeadEvent: bc.chainHeadFeed.Send(ev) - case BlockConfirmedEvent: - bc.blockConfirmedFeed.Send(ev) - case ChainSideEvent: bc.chainSideFeed.Send(ev) } @@ -2447,11 +2393,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } -// SubscribeBlockConfirmedEvent registers a subscription of ChainHeadEvent. -func (bc *BlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { - return bc.scope.Track(bc.blockConfirmedFeed.Subscribe(ch)) -} - // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index f83356bff..183534b89 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1420,7 +1420,7 @@ func (d *dexconTest) Finalize(chain consensus.ChainReader, header *types.Header, return types.NewBlock(header, txs, uncles, receipts), nil } -func TestProcessPendingBlock(t *testing.T) { +func TestProcessBlock(t *testing.T) { db := ethdb.NewMemDatabase() key, err := crypto.GenerateKey() @@ -1466,8 +1466,8 @@ func TestProcessPendingBlock(t *testing.T) { } } else { witnessData := types.WitnessData{ - Root: chain.pendingBlocks[uint64(i)].block.Root(), - ReceiptHash: chain.pendingBlocks[uint64(i)].block.ReceiptHash(), + Root: chain.CurrentBlock().Root(), + ReceiptHash: chain.CurrentBlock().ReceiptHash(), } witnessDataBytes, err = rlp.EncodeToBytes(&witnessData) if err != nil { @@ -1483,7 +1483,7 @@ func TestProcessPendingBlock(t *testing.T) { t.Fatalf("sign tx error: %v", err) } - _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{ + _, err = chain.ProcessBlock(types.NewBlock(&types.Header{ Number: new(big.Int).SetUint64(uint64(i) + 1), Time: big.NewInt(time.Now().UnixNano() / 1000000), GasLimit: 10000, @@ -1497,13 +1497,13 @@ func TestProcessPendingBlock(t *testing.T) { t.Fatalf("process pending block error: %v", err) } - if chain.CurrentBlock().NumberU64() != uint64(i) { - t.Fatalf("expect current height %v but %v", uint64(i), chain.CurrentBlock().NumberU64()) + if chain.CurrentBlock().NumberU64() != uint64(i+1) { + t.Fatalf("expect current height %v but %v", uint64(i+1), chain.CurrentBlock().NumberU64()) } } // Witness rlp decode fail. - _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{ + _, err = chain.ProcessBlock(types.NewBlock(&types.Header{ Number: new(big.Int).SetUint64(processNum + 1), Time: big.NewInt(time.Now().UnixNano() / 1000000), GasLimit: 10000, @@ -1518,14 +1518,14 @@ func TestProcessPendingBlock(t *testing.T) { // Validate witness fail with unknown block. witnessData := types.WitnessData{ - Root: chain.pendingBlocks[processNum].block.Root(), - ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(), + Root: chain.CurrentBlock().Root(), + ReceiptHash: chain.CurrentBlock().ReceiptHash(), } witnessDataBytes, err := rlp.EncodeToBytes(&witnessData) if err != nil { t.Fatalf("rlp encode fail: %v", err) } - _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{ + _, err = chain.ProcessBlock(types.NewBlock(&types.Header{ Number: new(big.Int).SetUint64(processNum + 1), Time: big.NewInt(time.Now().UnixNano() / 1000000), GasLimit: 10000, @@ -1541,14 +1541,14 @@ func TestProcessPendingBlock(t *testing.T) { // Validate witness fail with unexpected root. witnessData = types.WitnessData{ - Root: chain.pendingBlocks[processNum].block.Root(), - ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(), + Root: chain.CurrentBlock().Root(), + ReceiptHash: chain.CurrentBlock().ReceiptHash(), } witnessDataBytes, err = rlp.EncodeToBytes(&witnessData) if err != nil { t.Fatalf("rlp encode fail: %v", err) } - _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{ + _, err = chain.ProcessBlock(types.NewBlock(&types.Header{ Number: new(big.Int).SetUint64(processNum + 1), Time: big.NewInt(time.Now().UnixNano() / 1000000), GasLimit: 10000, @@ -1564,8 +1564,8 @@ func TestProcessPendingBlock(t *testing.T) { // Apply transaction fail with insufficient fund. witnessData = types.WitnessData{ - Root: chain.pendingBlocks[processNum].block.Root(), - ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(), + Root: chain.CurrentBlock().Root(), + ReceiptHash: chain.CurrentBlock().ReceiptHash(), } witnessDataBytes, err = rlp.EncodeToBytes(&witnessData) if err != nil { @@ -1580,7 +1580,7 @@ func TestProcessPendingBlock(t *testing.T) { t.Fatalf("sign tx error: %v", err) } - _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{ + _, err = chain.ProcessBlock(types.NewBlock(&types.Header{ Number: new(big.Int).SetUint64(processNum + 1), Time: big.NewInt(time.Now().UnixNano() / 1000000), GasLimit: 10000, @@ -1596,8 +1596,8 @@ func TestProcessPendingBlock(t *testing.T) { // Apply transaction fail with nonce too height. witnessData = types.WitnessData{ - Root: chain.pendingBlocks[processNum].block.Root(), - ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(), + Root: chain.CurrentBlock().Root(), + ReceiptHash: chain.CurrentBlock().ReceiptHash(), } witnessDataBytes, err = rlp.EncodeToBytes(&witnessData) if err != nil { @@ -1612,7 +1612,7 @@ func TestProcessPendingBlock(t *testing.T) { t.Fatalf("sign tx error: %v", err) } - _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{ + _, err = chain.ProcessBlock(types.NewBlock(&types.Header{ Number: new(big.Int).SetUint64(processNum + 1), Time: big.NewInt(time.Now().UnixNano() / 1000000), GasLimit: 10000, diff --git a/core/events.go b/core/events.go index e174e8aad..c2cabafd3 100644 --- a/core/events.go +++ b/core/events.go @@ -50,8 +50,6 @@ type ChainSideEvent struct { type ChainHeadEvent struct{ Block *types.Block } -type BlockConfirmedEvent struct{ Block *types.Block } - type NewNotarySetEvent struct { Round uint64 Pubkeys map[string]struct{} // pubkeys in hex format diff --git a/core/tx_pool.go b/core/tx_pool.go index f4961a034..d1827257e 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -37,9 +37,6 @@ import ( const ( // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 - - // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent. - blockConfirmedChanSize = 10 ) var ( @@ -121,7 +118,6 @@ type blockChain interface { StateAt(root common.Hash) (*state.StateDB, error) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription - SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription } // TxPoolConfig are the configuration parameters of the transaction pool. @@ -186,18 +182,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - scope event.SubscriptionScope - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - blockConfirmedCh chan BlockConfirmedEvent - blockConfirmedSub event.Subscription - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + signer types.Signer + mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces @@ -226,18 +220,17 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.NewEIP155Signer(chainconfig.ChainID), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), - isBlockProposer: isBlockProposer, + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainID), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), + isBlockProposer: isBlockProposer, } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -259,7 +252,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block } } // Subscribe events from blockchain - pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh) pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return @@ -295,9 +287,6 @@ func (pool *TxPool) loop() { select { // Handle ChainHeadEvent case ev := <-pool.chainHeadCh: - if pool.isBlockProposer { - break - } if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { @@ -311,24 +300,6 @@ func (pool *TxPool) loop() { // Be unsubscribed due to system stopped case <-pool.chainHeadSub.Err(): return - // Handle BlockConfirmedEvent - case ev := <-pool.blockConfirmedCh: - if !pool.isBlockProposer { - break - } - if ev.Block != nil { - pool.mu.Lock() - if pool.chainconfig.IsHomestead(ev.Block.Number()) { - pool.homestead = true - } - pool.reset(head.Header(), ev.Block.Header()) - head = ev.Block - - pool.mu.Unlock() - } - // Be unsubscribed due to system stopped - case <-pool.blockConfirmedSub.Err(): - return // Handle stats reporting ticks case <-report.C: @@ -419,7 +390,7 @@ func (pool *TxPool) Stop() { pool.scope.Close() // Unsubscribe subscriptions registered from blockchain - pool.blockConfirmedSub.Unsubscribe() + pool.chainHeadSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index abb13010d..6024453b4 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -69,10 +69,6 @@ func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) even return bc.chainHeadFeed.Subscribe(ch) } -func (bc *testBlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { - return bc.blockConfirmedFeed.Subscribe(ch) -} - func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) *types.Transaction { return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) } |