aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorbojie <bojie@dexon.org>2019-01-14 20:42:15 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:27:21 +0800
commitf587289bc3d5f679c6e92acb817a40a88a19e970 (patch)
tree55b30e09ee4222973fc935ba83c08ac6f13a24dd /core
parent2b03b31d46e166e70ea82b806db47bb6408282e3 (diff)
downloadgo-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.tar
go-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.tar.gz
go-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.tar.bz2
go-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.tar.lz
go-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.tar.xz
go-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.tar.zst
go-tangerine-f587289bc3d5f679c6e92acb817a40a88a19e970.zip
app: remove pending block logic (#149)
Diffstat (limited to 'core')
-rw-r--r--core/block_validator.go7
-rw-r--r--core/blockchain.go331
-rw-r--r--core/blockchain_test.go38
-rw-r--r--core/events.go2
-rw-r--r--core/tx_pool.go77
-rw-r--r--core/tx_pool_test.go38
6 files changed, 197 insertions, 296 deletions
diff --git a/core/block_validator.go b/core/block_validator.go
index 660bd09f8..c6038381f 100644
--- a/core/block_validator.go
+++ b/core/block_validator.go
@@ -102,12 +102,9 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat
}
func (v *BlockValidator) ValidateWitnessData(height uint64, data types.WitnessData) error {
- b := v.bc.GetPendingBlockByNumber(height)
+ b := v.bc.GetBlockByNumber(height)
if b == nil {
- b = v.bc.GetBlockByNumber(height)
- if b == nil {
- return fmt.Errorf("can not find block %v either pending or confirmed block", height)
- }
+ return fmt.Errorf("can not find block %v either pending or confirmed block", height)
}
if b.Root() != data.Root {
diff --git a/core/blockchain.go b/core/blockchain.go
index 089f1c2fa..6aee356d2 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -105,15 +105,14 @@ type BlockChain struct {
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
- hc *HeaderChain
- rmLogsFeed event.Feed
- chainFeed event.Feed
- chainSideFeed event.Feed
- chainHeadFeed event.Feed
- blockConfirmedFeed event.Feed
- logsFeed event.Feed
- scope event.SubscriptionScope
- genesisBlock *types.Block
+ hc *HeaderChain
+ rmLogsFeed event.Feed
+ chainFeed event.Feed
+ chainSideFeed event.Feed
+ chainHeadFeed event.Feed
+ logsFeed event.Feed
+ scope event.SubscriptionScope
+ genesisBlock *types.Block
mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
@@ -154,14 +153,6 @@ type BlockChain struct {
addressCost map[uint32]map[common.Address]*big.Int
addressCounter map[uint32]map[common.Address]uint64
chainLastHeight sync.Map
-
- pendingBlockMu sync.RWMutex
- lastPendingHeight uint64
- pendingBlocks map[uint64]struct {
- block *types.Block
- receipts types.Receipts
- proctime time.Duration
- }
}
// NewBlockChain returns a fully initialised block chain using information
@@ -183,25 +174,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
badBlocks, _ := lru.New(badBlockLimit)
bc := &BlockChain{
- chainConfig: chainConfig,
- cacheConfig: cacheConfig,
- db: db,
- triegc: prque.New(nil),
- stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
- quit: make(chan struct{}),
- bodyCache: bodyCache,
- bodyRLPCache: bodyRLPCache,
- receiptsCache: receiptsCache,
- blockCache: blockCache,
- futureBlocks: futureBlocks,
- engine: engine,
- vmConfig: vmConfig,
- badBlocks: badBlocks,
- pendingBlocks: make(map[uint64]struct {
- block *types.Block
- receipts types.Receipts
- proctime time.Duration
- }),
+ chainConfig: chainConfig,
+ cacheConfig: cacheConfig,
+ db: db,
+ triegc: prque.New(nil),
+ stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
+ quit: make(chan struct{}),
+ bodyCache: bodyCache,
+ bodyRLPCache: bodyRLPCache,
+ receiptsCache: receiptsCache,
+ blockCache: blockCache,
+ futureBlocks: futureBlocks,
+ engine: engine,
+ vmConfig: vmConfig,
+ badBlocks: badBlocks,
confirmedBlocks: make(map[uint32]map[coreCommon.Hash]*blockInfo),
addressNonce: make(map[uint32]map[common.Address]uint64),
addressCost: make(map[uint32]map[common.Address]*big.Int),
@@ -1764,6 +1750,16 @@ func (bc *BlockChain) insertDexonChain(chain types.Blocks) (int, []interface{},
}
proctime := time.Since(bstart)
+ chainBlock := bc.GetBlockByNumber(block.NumberU64())
+ if chainBlock != nil {
+ if chainBlock.Hash() != block.Hash() {
+ return i, nil, nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", block.NumberU64(),
+ chainBlock.Hash(), block.Hash())
+ }
+
+ continue
+ }
+
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, state)
if err != nil {
@@ -1814,13 +1810,13 @@ func (bc *BlockChain) VerifyDexonHeader(header *types.Header) error {
return bc.hc.verifyTSig(header, bc.verifierCache)
}
-func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) {
- n, events, logs, err := bc.processPendingBlock(block, witness)
+func (bc *BlockChain) ProcessBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) {
+ root, events, logs, err := bc.processBlock(block, witness)
bc.PostChainEvents(events, logs)
- return n, err
+ return root, err
}
-func (bc *BlockChain) processPendingBlock(
+func (bc *BlockChain) processBlock(
block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) {
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
@@ -1835,7 +1831,6 @@ func (bc *BlockChain) processPendingBlock(
var (
stats = insertStats{startTime: mclock.Now()}
events = make([]interface{}, 0, 2)
- lastCanon *types.Block
coalescedLogs []*types.Log
)
@@ -1858,28 +1853,23 @@ func (bc *BlockChain) processPendingBlock(
)
var parentBlock *types.Block
- var pendingState *state.StateDB
+ var currentState *state.StateDB
var err error
- parent, exist := bc.pendingBlocks[block.NumberU64()-1]
- if !exist {
- parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1)
- if parentBlock == nil {
- return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
- }
- } else {
- parentBlock = parent.block
+ parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1)
+ if parentBlock == nil {
+ return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
}
header.ParentHash = parentBlock.Hash()
- pendingState, err = state.New(parentBlock.Root(), bc.stateCache)
+ currentState, err = state.New(parentBlock.Root(), bc.stateCache)
if err != nil {
return nil, nil, nil, err
}
// Iterate over and process the individual transactions.
for i, tx := range block.Transactions() {
- pendingState.Prepare(tx.Hash(), block.Hash(), i)
- receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig)
+ currentState.Prepare(tx.Hash(), block.Hash(), i)
+ receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, currentState, header, tx, usedGas, bc.vmConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce())
}
@@ -1888,189 +1878,148 @@ func (bc *BlockChain) processPendingBlock(
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
header.GasUsed = *usedGas
- newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts)
+ newBlock, err := bc.engine.Finalize(bc, header, currentState, block.Transactions(), block.Uncles(), receipts)
+ root := newBlock.Root()
if err != nil {
return nil, nil, nil, fmt.Errorf("finalize error: %v", err)
}
- if _, ok := bc.GetRoundHeight(newPendingBlock.Round()); !ok {
- bc.storeRoundHeight(newPendingBlock.Round(), newPendingBlock.NumberU64())
+ if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok {
+ bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64())
}
-
proctime := time.Since(bstart)
- // Commit state to refresh stateCache.
- _, err = pendingState.Commit(true)
- if err != nil {
- return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err)
- }
-
- // Add into pending blocks.
- bc.addPendingBlock(newPendingBlock, receipts, proctime)
- events = append(events, BlockConfirmedEvent{newPendingBlock})
-
- log.Debug("Inserted pending block", "height", newPendingBlock.Number(), "hash", newPendingBlock.Hash())
-
- // Start insert available pending blocks into db
- for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ {
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
- log.Debug("Premature abort during blocks processing")
- return nil, nil, nil, fmt.Errorf("interrupt")
- }
-
- pendingIns, exist := bc.pendingBlocks[pendingHeight]
- if !exist {
- log.Error("Block has already inserted", "height", pendingHeight)
- continue
+ chainBlock := bc.GetBlockByNumber(newBlock.NumberU64())
+ if chainBlock != nil {
+ if chainBlock.Hash() != newBlock.Hash() {
+ return nil, nil, nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", newBlock.NumberU64(),
+ chainBlock.Hash(), newBlock.Hash())
}
- s, err := state.New(pendingIns.block.Root(), bc.stateCache)
- if err != nil {
- return nil, events, coalescedLogs, err
- }
+ return &root, nil, nil, nil
+ }
- // Write the block to the chain and get the status.
- insertTime := time.Now()
- status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s)
- if err != nil {
- return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err)
- }
+ // Write the block to the chain and get the status.
+ status, err := bc.WriteBlockWithState(newBlock, receipts, currentState)
+ if err != nil {
+ return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err)
+ }
- switch status {
- case CanonStatTy:
- log.Debug("Inserted new block", "number", pendingIns.block.Number(), "hash", pendingIns.block.Hash(),
- "uncles", len(pendingIns.block.Uncles()), "txs", len(pendingIns.block.Transactions()),
- "gas", pendingIns.block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
+ switch status {
+ case CanonStatTy:
+ log.Debug("Inserted new block", "number", newBlock.Number(), "hash", newBlock.Hash(),
+ "uncles", len(newBlock.Uncles()), "txs", len(newBlock.Transactions()),
+ "gas", newBlock.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
- var allLogs []*types.Log
- for _, r := range pendingIns.receipts {
- allLogs = append(allLogs, r.Logs...)
- }
- coalescedLogs = append(coalescedLogs, allLogs...)
- blockInsertTimer.UpdateSince(insertTime)
- events = append(events, ChainEvent{pendingIns.block, pendingIns.block.Hash(), allLogs})
- lastCanon = pendingIns.block
+ var allLogs []*types.Log
+ for _, r := range receipts {
+ allLogs = append(allLogs, r.Logs...)
+ }
+ coalescedLogs = append(coalescedLogs, allLogs...)
+ blockInsertTimer.UpdateSince(bstart)
+ events = append(events, ChainEvent{newBlock, newBlock.Hash(), allLogs}, ChainHeadEvent{newBlock})
- // Only count canonical blocks for GC processing time
- bc.gcproc += pendingIns.proctime
+ // Only count canonical blocks for GC processing time
+ bc.gcproc += proctime
- case SideStatTy:
- return nil, nil, nil, fmt.Errorf("insert pending block and fork found")
- }
- bc.removePendingBlock(pendingHeight)
+ case SideStatTy:
+ return nil, nil, nil, fmt.Errorf("insert pending block and fork found")
+ }
- stats.processed++
- stats.usedGas += pendingIns.block.GasUsed()
+ stats.processed++
+ stats.usedGas += newBlock.GasUsed()
- cache, _ := bc.stateCache.TrieDB().Size()
- stats.report([]*types.Block{pendingIns.block}, 0, cache)
+ cache, _ := bc.stateCache.TrieDB().Size()
+ stats.report([]*types.Block{newBlock}, 0, cache)
- if pendingHeight == witness.Height {
- err = bc.updateLastRoundNumber(pendingIns.block.Round())
- if err != nil {
- return nil, nil, nil, err
- }
- }
- }
- // Append a single chain head event if we've progressed the chain
- if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
- events = append(events, ChainHeadEvent{lastCanon})
+ err = bc.updateLastRoundNumber(newBlock.Round())
+ if err != nil {
+ return nil, nil, nil, err
}
- root := newPendingBlock.Root()
return &root, events, coalescedLogs, nil
}
-func (bc *BlockChain) ProcessEmptyBlock(block *types.Block) error {
+func (bc *BlockChain) ProcessEmptyBlock(block *types.Block) (*common.Hash, error) {
bstart := time.Now()
+ var stats = insertStats{startTime: mclock.Now()}
var header = block.Header()
var parentBlock *types.Block
- var pendingState *state.StateDB
+ var currentState *state.StateDB
var err error
- parent, exist := bc.pendingBlocks[block.NumberU64()-1]
- if !exist {
- parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1)
- if parentBlock == nil {
- return fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
- }
- } else {
- parentBlock = parent.block
+
+ parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1)
+ if parentBlock == nil {
+ return nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
}
- pendingState, err = state.New(parentBlock.Root(), bc.stateCache)
+ currentState, err = state.New(parentBlock.Root(), bc.stateCache)
if err != nil {
- return err
+ return nil, err
}
header.ParentHash = parentBlock.Hash()
header.GasUsed = 0
- header.Root = pendingState.IntermediateRoot(true)
+ header.Root = currentState.IntermediateRoot(true)
if header.Root != parentBlock.Root() {
- return fmt.Errorf("empty block state root must same as parent")
+ return nil, fmt.Errorf("empty block state root must same as parent")
}
- newPendingBlock := types.NewBlock(header, nil, nil, nil)
- if _, ok := bc.GetRoundHeight(newPendingBlock.Round()); !ok {
- bc.storeRoundHeight(newPendingBlock.Round(), newPendingBlock.NumberU64())
+ newBlock := types.NewBlock(header, nil, nil, nil)
+ root := newBlock.Root()
+ if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok {
+ bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64())
}
+ if _, ok := bc.GetRoundHeight(newBlock.Round()); !ok {
+ bc.storeRoundHeight(newBlock.Round(), newBlock.NumberU64())
+ }
proctime := time.Since(bstart)
- bc.addPendingBlock(newPendingBlock, nil, proctime)
- bc.PostChainEvents([]interface{}{BlockConfirmedEvent{newPendingBlock}}, nil)
- return nil
-}
-
-func (bc *BlockChain) removePendingBlock(height uint64) {
- bc.pendingBlockMu.Lock()
- defer bc.pendingBlockMu.Unlock()
-
- delete(bc.pendingBlocks, height)
-}
-
-func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts, proctime time.Duration) {
- bc.pendingBlockMu.Lock()
- defer bc.pendingBlockMu.Unlock()
-
- bc.pendingBlocks[block.NumberU64()] = struct {
- block *types.Block
- receipts types.Receipts
- proctime time.Duration
- }{block: block, receipts: receipts, proctime: proctime}
- bc.lastPendingHeight = block.NumberU64()
-}
+ chainBlock := bc.GetBlockByNumber(newBlock.NumberU64())
+ if chainBlock != nil {
+ if chainBlock.Hash() != newBlock.Hash() {
+ return nil, fmt.Errorf("block %v exist but hash is not equal: exist %v expect %v", newBlock.NumberU64(),
+ chainBlock.Hash(), newBlock.Hash())
+ }
-func (bc *BlockChain) GetPendingHeight() uint64 {
- bc.pendingBlockMu.RLock()
- defer bc.pendingBlockMu.RUnlock()
+ return &root, nil
+ }
- return bc.lastPendingHeight
-}
+ // Write the block to the chain and get the status.
+ status, err := bc.WriteBlockWithState(newBlock, nil, currentState)
+ if err != nil {
+ return nil, fmt.Errorf("WriteBlockWithState error: %v", err)
+ }
-func (bc *BlockChain) PendingBlock() *types.Block {
- bc.pendingBlockMu.RLock()
- defer bc.pendingBlockMu.RUnlock()
+ switch status {
+ case CanonStatTy:
+ log.Debug("Inserted new block", "number", newBlock.Number(), "hash", newBlock.Hash(),
+ "uncles", len(newBlock.Uncles()), "txs", len(newBlock.Transactions()),
+ "gas", newBlock.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
+ blockInsertTimer.UpdateSince(bstart)
+ // Only count canonical blocks for GC processing time
+ bc.gcproc += proctime
- return bc.pendingBlocks[bc.lastPendingHeight].block
-}
+ case SideStatTy:
+ return nil, fmt.Errorf("insert pending block and fork found")
+ }
-func (bc *BlockChain) GetPendingBlockByNumber(number uint64) *types.Block {
- bc.pendingBlockMu.RLock()
- defer bc.pendingBlockMu.RUnlock()
+ stats.processed++
+ stats.usedGas += newBlock.GasUsed()
- return bc.pendingBlocks[number].block
-}
+ cache, _ := bc.stateCache.TrieDB().Size()
+ stats.report([]*types.Block{newBlock}, 0, cache)
-func (bc *BlockChain) GetPending() (*types.Block, *state.StateDB) {
- block := bc.PendingBlock()
- if block == nil {
- block = bc.CurrentBlock()
- }
- s, err := state.New(block.Header().Root, bc.stateCache)
+ err = bc.updateLastRoundNumber(newBlock.Round())
if err != nil {
- panic(err)
+ return nil, err
}
- return block, s
+
+ bc.PostChainEvents([]interface{}{ChainEvent{newBlock, newBlock.Hash(), nil},
+ ChainHeadEvent{newBlock}}, nil)
+
+ return &root, nil
}
// GetGovStateByHash extracts the governance contract's state from state trie
@@ -2253,9 +2202,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
case ChainHeadEvent:
bc.chainHeadFeed.Send(ev)
- case BlockConfirmedEvent:
- bc.blockConfirmedFeed.Send(ev)
-
case ChainSideEvent:
bc.chainSideFeed.Send(ev)
}
@@ -2475,11 +2421,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}
-// SubscribeBlockConfirmedEvent registers a subscription of ChainHeadEvent.
-func (bc *BlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription {
- return bc.scope.Track(bc.blockConfirmedFeed.Subscribe(ch))
-}
-
// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index a902f0032..45eb191f2 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -1626,7 +1626,7 @@ func (d *dexconTest) Finalize(chain consensus.ChainReader, header *types.Header,
return types.NewBlock(header, txs, uncles, receipts), nil
}
-func TestProcessPendingBlock(t *testing.T) {
+func TestProcessBlock(t *testing.T) {
db := ethdb.NewMemDatabase()
key, err := crypto.GenerateKey()
@@ -1672,8 +1672,8 @@ func TestProcessPendingBlock(t *testing.T) {
}
} else {
witnessData := types.WitnessData{
- Root: chain.pendingBlocks[uint64(i)].block.Root(),
- ReceiptHash: chain.pendingBlocks[uint64(i)].block.ReceiptHash(),
+ Root: chain.CurrentBlock().Root(),
+ ReceiptHash: chain.CurrentBlock().ReceiptHash(),
}
witnessDataBytes, err = rlp.EncodeToBytes(&witnessData)
if err != nil {
@@ -1689,7 +1689,7 @@ func TestProcessPendingBlock(t *testing.T) {
t.Fatalf("sign tx error: %v", err)
}
- _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{
+ _, err = chain.ProcessBlock(types.NewBlock(&types.Header{
Number: new(big.Int).SetUint64(uint64(i) + 1),
Time: uint64(time.Now().UnixNano() / 1000000),
GasLimit: 10000,
@@ -1703,13 +1703,13 @@ func TestProcessPendingBlock(t *testing.T) {
t.Fatalf("process pending block error: %v", err)
}
- if chain.CurrentBlock().NumberU64() != uint64(i) {
- t.Fatalf("expect current height %v but %v", uint64(i), chain.CurrentBlock().NumberU64())
+ if chain.CurrentBlock().NumberU64() != uint64(i+1) {
+ t.Fatalf("expect current height %v but %v", uint64(i+1), chain.CurrentBlock().NumberU64())
}
}
// Witness rlp decode fail.
- _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{
+ _, err = chain.ProcessBlock(types.NewBlock(&types.Header{
Number: new(big.Int).SetUint64(processNum + 1),
Time: uint64(time.Now().UnixNano() / 1000000),
GasLimit: 10000,
@@ -1724,14 +1724,14 @@ func TestProcessPendingBlock(t *testing.T) {
// Validate witness fail with unknown block.
witnessData := types.WitnessData{
- Root: chain.pendingBlocks[processNum].block.Root(),
- ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(),
+ Root: chain.CurrentBlock().Root(),
+ ReceiptHash: chain.CurrentBlock().ReceiptHash(),
}
witnessDataBytes, err := rlp.EncodeToBytes(&witnessData)
if err != nil {
t.Fatalf("rlp encode fail: %v", err)
}
- _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{
+ _, err = chain.ProcessBlock(types.NewBlock(&types.Header{
Number: new(big.Int).SetUint64(processNum + 1),
Time: uint64(time.Now().UnixNano() / 1000000),
GasLimit: 10000,
@@ -1747,14 +1747,14 @@ func TestProcessPendingBlock(t *testing.T) {
// Validate witness fail with unexpected root.
witnessData = types.WitnessData{
- Root: chain.pendingBlocks[processNum].block.Root(),
- ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(),
+ Root: chain.CurrentBlock().Root(),
+ ReceiptHash: chain.CurrentBlock().ReceiptHash(),
}
witnessDataBytes, err = rlp.EncodeToBytes(&witnessData)
if err != nil {
t.Fatalf("rlp encode fail: %v", err)
}
- _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{
+ _, err = chain.ProcessBlock(types.NewBlock(&types.Header{
Number: new(big.Int).SetUint64(processNum + 1),
Time: uint64(time.Now().UnixNano() / 1000000),
GasLimit: 10000,
@@ -1770,8 +1770,8 @@ func TestProcessPendingBlock(t *testing.T) {
// Apply transaction fail with insufficient fund.
witnessData = types.WitnessData{
- Root: chain.pendingBlocks[processNum].block.Root(),
- ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(),
+ Root: chain.CurrentBlock().Root(),
+ ReceiptHash: chain.CurrentBlock().ReceiptHash(),
}
witnessDataBytes, err = rlp.EncodeToBytes(&witnessData)
if err != nil {
@@ -1786,7 +1786,7 @@ func TestProcessPendingBlock(t *testing.T) {
t.Fatalf("sign tx error: %v", err)
}
- _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{
+ _, err = chain.ProcessBlock(types.NewBlock(&types.Header{
Number: new(big.Int).SetUint64(processNum + 1),
Time: uint64(time.Now().UnixNano() / 1000000),
GasLimit: 10000,
@@ -1802,8 +1802,8 @@ func TestProcessPendingBlock(t *testing.T) {
// Apply transaction fail with nonce too height.
witnessData = types.WitnessData{
- Root: chain.pendingBlocks[processNum].block.Root(),
- ReceiptHash: chain.pendingBlocks[processNum].block.ReceiptHash(),
+ Root: chain.CurrentBlock().Root(),
+ ReceiptHash: chain.CurrentBlock().ReceiptHash(),
}
witnessDataBytes, err = rlp.EncodeToBytes(&witnessData)
if err != nil {
@@ -1818,7 +1818,7 @@ func TestProcessPendingBlock(t *testing.T) {
t.Fatalf("sign tx error: %v", err)
}
- _, err = chain.ProcessPendingBlock(types.NewBlock(&types.Header{
+ _, err = chain.ProcessBlock(types.NewBlock(&types.Header{
Number: new(big.Int).SetUint64(processNum + 1),
Time: uint64(time.Now().UnixNano() / 1000000),
GasLimit: 10000,
diff --git a/core/events.go b/core/events.go
index e174e8aad..c2cabafd3 100644
--- a/core/events.go
+++ b/core/events.go
@@ -50,8 +50,6 @@ type ChainSideEvent struct {
type ChainHeadEvent struct{ Block *types.Block }
-type BlockConfirmedEvent struct{ Block *types.Block }
-
type NewNotarySetEvent struct {
Round uint64
Pubkeys map[string]struct{} // pubkeys in hex format
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 546ef0a99..871b50be1 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -37,9 +37,6 @@ import (
const (
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10
-
- // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent.
- blockConfirmedChanSize = 10
)
var (
@@ -121,7 +118,6 @@ type blockChain interface {
StateAt(root common.Hash) (*state.StateDB, error)
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
- SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription
}
// TxPoolConfig are the configuration parameters of the transaction pool.
@@ -206,18 +202,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- config TxPoolConfig
- chainconfig *params.ChainConfig
- chain blockChain
- gasPrice *big.Int
- txFeed event.Feed
- scope event.SubscriptionScope
- chainHeadCh chan ChainHeadEvent
- chainHeadSub event.Subscription
- blockConfirmedCh chan BlockConfirmedEvent
- blockConfirmedSub event.Subscription
- signer types.Signer
- mu sync.RWMutex
+ config TxPoolConfig
+ chainconfig *params.ChainConfig
+ chain blockChain
+ gasPrice *big.Int
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ chainHeadCh chan ChainHeadEvent
+ chainHeadSub event.Subscription
+ signer types.Signer
+ mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
@@ -234,30 +228,27 @@ type TxPool struct {
wg sync.WaitGroup // for shutdown sync
- homestead bool
- isBlockProposer bool
+ homestead bool
}
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
-func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, isBlockProposer bool) *TxPool {
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
- config: config,
- chainconfig: chainconfig,
- chain: chain,
- signer: types.NewEIP155Signer(chainconfig.ChainID),
- pending: make(map[common.Address]*txList),
- queue: make(map[common.Address]*txList),
- beats: make(map[common.Address]time.Time),
- all: newTxLookup(),
- chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
- blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize),
- gasPrice: new(big.Int).SetUint64(config.PriceLimit),
- isBlockProposer: isBlockProposer,
+ config: config,
+ chainconfig: chainconfig,
+ chain: chain,
+ signer: types.NewEIP155Signer(chainconfig.ChainID),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ beats: make(map[common.Address]time.Time),
+ all: newTxLookup(),
+ chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+ gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
@@ -279,7 +270,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
}
}
// Subscribe events from blockchain
- pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh)
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
@@ -315,9 +305,6 @@ func (pool *TxPool) loop() {
select {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
- if pool.isBlockProposer {
- break
- }
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
@@ -331,24 +318,6 @@ func (pool *TxPool) loop() {
// Be unsubscribed due to system stopped
case <-pool.chainHeadSub.Err():
return
- // Handle BlockConfirmedEvent
- case ev := <-pool.blockConfirmedCh:
- if !pool.isBlockProposer {
- break
- }
- if ev.Block != nil {
- pool.mu.Lock()
- if pool.chainconfig.IsHomestead(ev.Block.Number()) {
- pool.homestead = true
- }
- pool.reset(head.Header(), ev.Block.Header())
- head = ev.Block
-
- pool.mu.Unlock()
- }
- // Be unsubscribed due to system stopped
- case <-pool.blockConfirmedSub.Err():
- return
// Handle stats reporting ticks
case <-report.C:
@@ -439,7 +408,7 @@ func (pool *TxPool) Stop() {
pool.scope.Close()
// Unsubscribe subscriptions registered from blockchain
- pool.blockConfirmedSub.Unsubscribe()
+ pool.chainHeadSub.Unsubscribe()
pool.wg.Wait()
if pool.journal != nil {
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index dc664eedd..96151850d 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -69,10 +69,6 @@ func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) even
return bc.chainHeadFeed.Subscribe(ch)
}
-func (bc *testBlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription {
- return bc.blockConfirmedFeed.Subscribe(ch)
-}
-
func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) *types.Transaction {
return pricedTransaction(nonce, gaslimit, big.NewInt(1), key)
}
@@ -87,7 +83,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
key, _ := crypto.GenerateKey()
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
return pool, key
}
@@ -197,7 +193,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
tx0 := transaction(0, 100000, key)
tx1 := transaction(1, 100000, key)
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
nonce := pool.State().GetNonce(address)
@@ -562,7 +558,7 @@ func TestTransactionPostponing(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create two test accounts to produce different gap profiles with
@@ -781,7 +777,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
config.NoLocals = nolocals
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create a number of test accounts and fund them (last one will be the local)
@@ -869,7 +865,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
config.Lifetime = time.Second
config.NoLocals = nolocals
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create two test accounts to ensure remotes expire but locals do not
@@ -1022,7 +1018,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create a number of test accounts and fund them
@@ -1070,7 +1066,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
config.AccountQueue = 2
config.GlobalSlots = 8
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create a number of test accounts and fund them
@@ -1102,7 +1098,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
config := testTxPoolConfig
config.GlobalSlots = 1
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create a number of test accounts and fund them
@@ -1147,7 +1143,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
@@ -1268,7 +1264,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create a number of test accounts and fund them
@@ -1334,7 +1330,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
config.GlobalSlots = 2
config.GlobalQueue = 2
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
@@ -1442,7 +1438,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
config.AccountQueue = 1024
config.GlobalQueue = 0
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
@@ -1504,7 +1500,7 @@ func TestTransactionReplacement(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
@@ -1603,7 +1599,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
config.Journal = journal
config.Rejournal = time.Second
- pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(config, params.TestChainConfig, blockchain)
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
@@ -1640,7 +1636,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool = NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool = NewTxPool(config, params.TestChainConfig, blockchain)
pending, queued = pool.Stats()
if queued != 0 {
@@ -1666,7 +1662,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool = NewTxPool(config, params.TestChainConfig, blockchain, false)
+ pool = NewTxPool(config, params.TestChainConfig, blockchain)
pending, queued = pool.Stats()
if pending != 0 {
@@ -1696,7 +1692,7 @@ func TestTransactionStatusCheck(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
- pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
+ pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
// Create the test accounts to check various transaction statuses with