From bf1e2631281e1e439533f2abcf1e99a7b2f9552a Mon Sep 17 00:00:00 2001 From: Miya Chen Date: Fri, 18 Aug 2017 18:58:36 +0800 Subject: core, light: send chain events using event.Feed (#14865) --- core/bench_test.go | 6 +- core/block_validator_test.go | 9 +-- core/blockchain.go | 92 ++++++++++++++++------ core/blockchain_test.go | 53 ++++++------- core/chain_makers.go | 3 +- core/chain_makers_test.go | 5 +- core/dao_test.go | 17 ++-- core/genesis_test.go | 4 +- core/tx_pool.go | 101 +++++++++++++++--------- core/tx_pool_test.go | 181 ++++++++++++++++++++++++++----------------- 10 files changed, 289 insertions(+), 182 deletions(-) (limited to 'core') diff --git a/core/bench_test.go b/core/bench_test.go index b9250f7d3..ab25c27d3 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -175,8 +174,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Time the insertion of the new chain. // State and blocks are stored in the same DB. - evmux := new(event.TypeMux) - chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer chainman.Stop() b.ReportAllocs() b.ResetTimer() @@ -286,7 +284,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) { if err != nil { b.Fatalf("error opening database at %v: %v", dir, err) } - chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) if err != nil { b.Fatalf("error creating chain: %v", err) } diff --git a/core/block_validator_test.go b/core/block_validator_test.go index c0afc2955..6d54c2b93 100644 --- a/core/block_validator_test.go +++ b/core/block_validator_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -43,7 +42,7 @@ func TestHeaderVerification(t *testing.T) { headers[i] = block.Header() } // Run the header checker for blocks one-by-one, checking for both valid and invalid nonces - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) defer chain.Stop() for i := 0; i < len(blocks); i++ { @@ -107,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) { var results <-chan error if valid { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } else { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } @@ -174,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) { defer runtime.GOMAXPROCS(old) // Start the verifications and immediately abort - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}) defer chain.Stop() abort, results := chain.engine.VerifyHeaders(chain, headers, seals) diff --git a/core/blockchain.go b/core/blockchain.go index 3fb8be15f..f3ca4e08c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -79,10 +79,16 @@ const ( type BlockChain struct { config *params.ChainConfig // chain & network configuration - hc *HeaderChain - chainDb ethdb.Database - eventMux *event.TypeMux - genesisBlock *types.Block + hc *HeaderChain + chainDb ethdb.Database + rmTxFeed event.Feed + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -115,7 +121,7 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) { +func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co config: config, chainDb: chainDb, stateCache: state.NewDatabase(chainDb), - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } + // Unsubscribe all subscriptions registered from blockchain + bc.scope.Close() close(bc.quit) atomic.StoreInt32(&bc.procInterrupt, 1) @@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) + // We need some control over the mining operation. Acquiring locks and waiting + // for the miner to create new block takes too long and in most cases isn't + // even necessary. + if bc.LastBlockHash() == block.Hash() { + events = append(events, ChainHeadEvent{block}) + } // Write the positional metadata for transaction and receipt lookups if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { @@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { stats.usedGas += usedGas.Uint64() stats.report(chain, i) } - go bc.postChainEvents(events, coalescedLogs) + go bc.PostChainEvents(events, coalescedLogs) return 0, nil } @@ -1184,16 +1197,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Must be posted in a goroutine because of the transaction pool trying // to acquire the chain manager lock if len(diff) > 0 { - go bc.eventMux.Post(RemovedTransactionEvent{diff}) + go bc.rmTxFeed.Send(RemovedTransactionEvent{diff}) } if len(deletedLogs) > 0 { - go bc.eventMux.Post(RemovedLogsEvent{deletedLogs}) + go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } if len(oldChain) > 0 { go func() { for _, block := range oldChain { - bc.eventMux.Post(ChainSideEvent{Block: block}) + bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } }() } @@ -1201,22 +1214,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -// postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. -func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) { +// PostChainEvents iterates over the events generated by a chain insertion and +// posts them into the event feed. +// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. +func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { // post event logs for further processing - bc.eventMux.Post(logs) + if logs != nil { + bc.logsFeed.Send(logs) + } for _, event := range events { - if event, ok := event.(ChainEvent); ok { - // We need some control over the mining operation. Acquiring locks and waiting - // for the miner to create new block takes too long and in most cases isn't - // even necessary. - if bc.LastBlockHash() == event.Hash { - bc.eventMux.Post(ChainHeadEvent{event.Block}) - } + switch ev := event.(type) { + case ChainEvent: + bc.chainFeed.Send(ev) + + case ChainHeadEvent: + bc.chainHeadFeed.Send(ev) + + case ChainSideEvent: + bc.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - bc.eventMux.Post(event) } } @@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } + +// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent. +func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.scope.Track(bc.rmTxFeed.Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent. +func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription { + return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) +} + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription { + return bc.scope.Track(bc.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { + return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent registers a subscription of []*types.Log. +func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return bc.scope.Track(bc.logsFeed.Subscribe(ch)) +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4a0f44940..470974a0a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -47,7 +46,7 @@ func newTestBlockChain(fake bool) *BlockChain { if !fake { engine = ethash.NewTester() } - blockchain, err := NewBlockChain(db, gspec.Config, engine, new(event.TypeMux), vm.Config{}) + blockchain, err := NewBlockChain(db, gspec.Config, engine, vm.Config{}) if err != nil { panic(err) } @@ -497,7 +496,7 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Create a new BlockChain and check that it rolled back the state. - ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), vm.Config{}) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } @@ -610,7 +609,7 @@ func TestFastVsFullChains(t *testing.T) { // Import the chain as an archive node for the comparison baseline archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer archive.Stop() if n, err := archive.InsertChain(blocks); err != nil { @@ -619,7 +618,7 @@ func TestFastVsFullChains(t *testing.T) { // Fast import the chain as a non-archive node to test fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -697,7 +696,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) } @@ -710,7 +709,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a non-archive node and ensure all pointers are updated fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -731,7 +730,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { lightDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(lightDb) - light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } @@ -800,8 +799,7 @@ func TestChainTxReorgs(t *testing.T) { } }) // Import the chain. This runs all block validation rules. - evmux := &event.TypeMux{} - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) if i, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert original chain[%d]: %v", i, err) } @@ -872,11 +870,11 @@ func TestLogReorgs(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - var evmux event.TypeMux - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() - subs := evmux.Subscribe(RemovedLogsEvent{}) + rmLogsCh := make(chan RemovedLogsEvent) + blockchain.SubscribeRemovedLogsEvent(rmLogsCh) chain, _ := GenerateChain(params.TestChainConfig, genesis, db, 2, func(i int, gen *BlockGen) { if i == 1 { tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code), signer, key1) @@ -895,9 +893,14 @@ func TestLogReorgs(t *testing.T) { t.Fatalf("failed to insert forked chain: %v", err) } - ev := <-subs.Chan() - if len(ev.Data.(RemovedLogsEvent).Logs) == 0 { - t.Error("expected logs") + timeout := time.NewTimer(1 * time.Second) + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-timeout.C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") } } @@ -914,8 +917,7 @@ func TestReorgSideEvent(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - evmux := &event.TypeMux{} - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, gen *BlockGen) {}) @@ -933,7 +935,8 @@ func TestReorgSideEvent(t *testing.T) { } gen.AddTx(tx) }) - subs := evmux.Subscribe(ChainSideEvent{}) + chainSideCh := make(chan ChainSideEvent) + blockchain.SubscribeChainSideEvent(chainSideCh) if _, err := blockchain.InsertChain(replacementBlocks); err != nil { t.Fatalf("failed to insert chain: %v", err) } @@ -956,8 +959,8 @@ func TestReorgSideEvent(t *testing.T) { done: for { select { - case ev := <-subs.Chan(): - block := ev.Data.(ChainSideEvent).Block + case ev := <-chainSideCh: + block := ev.Block if _, ok := expectedSideHashes[block.Hash()]; !ok { t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash()) } @@ -977,7 +980,7 @@ done: // make sure no more events are fired select { - case e := <-subs.Chan(): + case e := <-chainSideCh: t.Errorf("unexpected event fired: %v", e) case <-time.After(250 * time.Millisecond): } @@ -1038,10 +1041,9 @@ func TestEIP155Transition(t *testing.T) { Alloc: GenesisAlloc{address: {Balance: funds}, deleteAddr: {Balance: new(big.Int)}}, } genesis = gspec.MustCommit(db) - mux event.TypeMux ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, db, 4, func(i int, block *BlockGen) { @@ -1148,9 +1150,8 @@ func TestEIP161AccountRemoval(t *testing.T) { Alloc: GenesisAlloc{address: {Balance: funds}}, } genesis = gspec.MustCommit(db) - mux event.TypeMux ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, block *BlockGen) { diff --git a/core/chain_makers.go b/core/chain_makers.go index 976a8114d..cb5825d18 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -236,7 +235,7 @@ func newCanonical(n int, full bool) (ethdb.Database, *BlockChain, error) { db, _ := ethdb.NewMemDatabase() genesis := gspec.MustCommit(db) - blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), vm.Config{}) // Create and inject the requested chain if n == 0 { return db, blockchain, nil diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index 28eb76c63..2260c62fb 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -80,9 +79,7 @@ func ExampleGenerateChain() { }) // Import the chain. This runs all block validation rules. - evmux := &event.TypeMux{} - - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() if i, err := blockchain.InsertChain(chain); err != nil { diff --git a/core/dao_test.go b/core/dao_test.go index 99bf1ecae..d6e11d78a 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -42,13 +41,13 @@ func TestDAOForkRangeExtradata(t *testing.T) { proDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(proDb) proConf := ¶ms.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: true} - proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), vm.Config{}) defer proBc.Stop() conDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(conDb) conConf := ¶ms.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: false} - conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), vm.Config{}) defer conBc.Stop() if _, err := proBc.InsertChain(prefix); err != nil { @@ -62,8 +61,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a pro-fork block, and try to feed into the no-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -85,8 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a no-fork block, and try to feed into the pro-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) @@ -109,8 +106,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that contra-forkers accept pro-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -127,8 +123,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that pro-forkers accept contra-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) diff --git a/core/genesis_test.go b/core/genesis_test.go index 8b193759f..482f86190 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -119,9 +118,8 @@ func TestSetupGenesis(t *testing.T) { // Commit the 'old' genesis block with Homestead transition at #2. // Advance to block #4, past the homestead transition block of customg. genesis := oldcustomg.MustCommit(db) - bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}) defer bc.Stop() - bc.SetValidator(bproc{}) bc.InsertChain(makeBlockChainWithDiff(genesis, []int{2, 3, 4, 5}, 0)) bc.CurrentBlock() diff --git a/core/tx_pool.go b/core/tx_pool.go index b0c251f92..d835c94d1 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -34,6 +34,13 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // rmTxChanSize is the size of channel listening to RemovedTransactionEvent. + rmTxChanSize = 10 +) + var ( // ErrInvalidSender is returned if the transaction contains an invalid signature. ErrInvalidSender = errors.New("invalid sender") @@ -95,7 +102,14 @@ var ( underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) -type stateFn func() (*state.StateDB, error) +// blockChain provides the state of blockchain and current gas limit to do +// some pre checks in tx pool and event subscribers. +type blockChain interface { + State() (*state.StateDB, error) + GasLimit() *big.Int + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription + SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription +} // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { @@ -160,12 +174,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig - currentState stateFn // The state function which will allow us to do some pre checks + blockChain blockChain pendingState *state.ManagedState - gasLimit func() *big.Int // The current gas limit function callback gasPrice *big.Int - eventMux *event.TypeMux - events *event.TypeMuxSubscription + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + rmTxCh chan RemovedTransactionEvent + rmTxSub event.Subscription signer types.Signer mu sync.RWMutex @@ -185,7 +202,7 @@ type TxPool struct { // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -193,17 +210,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e pool := &TxPool{ config: config, chainconfig: chainconfig, + blockChain: blockChain, signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: make(map[common.Hash]*types.Transaction), - eventMux: eventMux, - currentState: currentStateFn, - gasLimit: gasLimitFn, + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) @@ -220,6 +236,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e log.Warn("Failed to rotate transaction journal", "err", err) } } + // Subscribe events from blockchain + pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) + pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh) // Start the event loop and return pool.wg.Add(1) go pool.loop() @@ -248,25 +267,27 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { - // Handle any events fired by the system - case ev, ok := <-pool.events.Chan(): - if !ok { - return - } - switch ev := ev.Data.(type) { - case ChainHeadEvent: - pool.mu.Lock() - if ev.Block != nil { - if pool.chainconfig.IsHomestead(ev.Block.Number()) { - pool.homestead = true - } + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + pool.mu.Lock() + if ev.Block != nil { + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true } - pool.reset() - pool.mu.Unlock() - case RemovedTransactionEvent: - pool.addTxs(ev.Txs, false) } + pool.reset() + pool.mu.Unlock() + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return + + // Handle RemovedTransactionEvent + case ev := <-pool.rmTxCh: + pool.addTxs(ev.Txs, false) + // Be unsubscribed due to system stopped + case <-pool.rmTxSub.Err(): + return // Handle stats reporting ticks case <-report.C: @@ -322,7 +343,7 @@ func (pool *TxPool) lockedReset() { // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. func (pool *TxPool) reset() { - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { log.Error("Failed reset txpool state", "err", err) return @@ -347,7 +368,11 @@ func (pool *TxPool) reset() { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { - pool.events.Unsubscribe() + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() + pool.rmTxSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { @@ -356,6 +381,12 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() @@ -468,7 +499,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { + if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { return ErrGasLimit } // Make sure the transaction is signed properly @@ -482,7 +513,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { return err } @@ -647,7 +678,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - go pool.eventMux.Post(TxPreEvent{tx}) + go pool.txFeed.Send(TxPreEvent{tx}) } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -690,7 +721,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // If we added a new transaction, run promotion checks and return if !replace { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -717,7 +748,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { } // Only reprocess the internal state if something was actually added if len(dirty) > 0 { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -804,7 +835,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Gather all the accounts potentially needing updates if accounts == nil { @@ -973,7 +1004,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index fcb330051..ee1ddd4bb 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -44,6 +44,29 @@ func init() { testTxPoolConfig.Journal = "" } +type testBlockChain struct { + statedb *state.StateDB + gasLimit *big.Int + chainHeadFeed *event.Feed + rmTxFeed *event.Feed +} + +func (bc *testBlockChain) State() (*state.StateDB, error) { + return bc.statedb, nil +} + +func (bc *testBlockChain) GasLimit() *big.Int { + return new(big.Int).Set(bc.gasLimit) +} + +func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.chainHeadFeed.Subscribe(ch) +} + +func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.rmTxFeed.Subscribe(ch) +} + func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) } @@ -56,9 +79,10 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} key, _ := crypto.GenerateKey() - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) return pool, key } @@ -96,6 +120,31 @@ func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } +type testChain struct { + *testBlockChain + address common.Address + trigger *bool +} + +// testChain.State() is used multiple times to reset the pending state. +// when simulate is true it will create a state that indicates +// that tx0 and tx1 are included in the chain. +func (c *testChain) State() (*state.StateDB, error) { + // delay "state change" by one. The tx pool fetches the + // state multiple times and by delaying it a bit we simulate + // a state change between those fetches. + stdb := c.statedb + if *c.trigger { + db, _ := ethdb.NewMemDatabase() + c.statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) + // simulate that the new head block included tx0 and tx1 + c.statedb.SetNonce(c.address, 2) + c.statedb.SetBalance(c.address, new(big.Int).SetUint64(params.Ether)) + *c.trigger = false + } + return stdb, nil +} + // This test simulates a scenario where a new block is imported during a // state reset and tests whether the pending state is in sync with the // block head event that initiated the resetState(). @@ -104,38 +153,18 @@ func TestStateChangeDuringPoolReset(t *testing.T) { db, _ = ethdb.NewMemDatabase() key, _ = crypto.GenerateKey() address = crypto.PubkeyToAddress(key.PublicKey) - mux = new(event.TypeMux) statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) trigger = false ) // setup pool with 2 transaction in it statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) + blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger} tx0 := transaction(0, big.NewInt(100000), key) tx1 := transaction(1, big.NewInt(100000), key) - // stateFunc is used multiple times to reset the pending state. - // when simulate is true it will create a state that indicates - // that tx0 and tx1 are included in the chain. - stateFunc := func() (*state.StateDB, error) { - // delay "state change" by one. The tx pool fetches the - // state multiple times and by delaying it a bit we simulate - // a state change between those fetches. - stdb := statedb - if trigger { - statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) - // simulate that the new head block included tx0 and tx1 - statedb.SetNonce(address, 2) - statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) - trigger = false - } - return stdb, nil - } - - gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } - - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() nonce := pool.State().GetNonce(address) @@ -176,7 +205,7 @@ func TestInvalidTransactions(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) @@ -211,7 +240,7 @@ func TestTransactionQueue(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset() pool.enqueueTx(tx.Hash(), tx) @@ -241,7 +270,7 @@ func TestTransactionQueue(t *testing.T) { tx2 := transaction(10, big.NewInt(100), key) tx3 := transaction(11, big.NewInt(100), key) from, _ = deriveSender(tx1) - currentState, _ = pool.currentState() + currentState, _ = pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset() @@ -264,7 +293,7 @@ func TestRemoveTx(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(1)) tx1 := transaction(0, big.NewInt(100), key) @@ -296,7 +325,7 @@ func TestNegativeValue(t *testing.T) { tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) @@ -311,8 +340,8 @@ func TestTransactionChainFork(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.currentState = func() (*state.StateDB, error) { return statedb, nil } - currentState, _ := pool.currentState() + pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() } @@ -339,8 +368,8 @@ func TestTransactionDoubleNonce(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.currentState = func() (*state.StateDB, error) { return statedb, nil } - currentState, _ := pool.currentState() + pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() } @@ -358,7 +387,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if replace, err := pool.add(tx2, false); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } - state, _ := pool.currentState() + state, _ := pool.blockChain.State() pool.promoteExecutables(state, []common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -386,7 +415,7 @@ func TestMissingNonce(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) tx := transaction(1, big.NewInt(100000), key) if _, err := pool.add(tx, false); err != nil { @@ -409,7 +438,7 @@ func TestNonceRecovery(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.SetNonce(addr, n) currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() @@ -431,11 +460,14 @@ func TestRemovedTxEvent(t *testing.T) { tx := transaction(0, big.NewInt(1000000), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000000000000)) pool.lockedReset() - pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) - pool.eventMux.Post(ChainHeadEvent{nil}) + blockChain, _ := pool.blockChain.(*testBlockChain) + blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}}) + blockChain.chainHeadFeed.Send(ChainHeadEvent{nil}) + // wait for handling events + <-time.After(500 * time.Millisecond) if pool.pending[from].Len() != 1 { t.Error("expected 1 pending tx, got", pool.pending[from].Len()) } @@ -453,7 +485,7 @@ func TestTransactionDropping(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000)) // Add some pending and some queued transactions @@ -518,7 +550,7 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.gasLimit = func() *big.Int { return big.NewInt(100) } + pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100) pool.lockedReset() if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { @@ -548,7 +580,7 @@ func TestTransactionPostponing(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000)) // Add a batch consecutive pending transactions for validation @@ -624,7 +656,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) pool.lockedReset() @@ -667,16 +699,17 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -757,19 +790,20 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // Create the pool to test the non-expiration enforcement db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.Lifetime = time.Second config.NoLocals = nolocals - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) @@ -821,7 +855,7 @@ func TestTransactionPendingLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) pool.lockedReset() @@ -853,7 +887,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool1.Stop() account1, _ := deriveSender(transaction(0, big.NewInt(0), key1)) - state1, _ := pool1.currentState() + state1, _ := pool1.blockChain.State() state1.AddBalance(account1, big.NewInt(1000000)) for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { @@ -866,7 +900,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool2.Stop() account2, _ := deriveSender(transaction(0, big.NewInt(0), key2)) - state2, _ := pool2.currentState() + state2, _ := pool2.blockChain.State() state2.AddBalance(account2, big.NewInt(1000000)) txns := []*types.Transaction{} @@ -900,15 +934,16 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -946,17 +981,18 @@ func TestTransactionCapClearsFromAll(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.AccountSlots = 2 config.AccountQueue = 2 config.GlobalSlots = 8 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() key, _ := crypto.GenerateKey() addr := crypto.PubkeyToAddress(key.PublicKey) @@ -980,15 +1016,16 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 0 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -1028,12 +1065,13 @@ func TestTransactionPoolRepricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1112,16 +1150,17 @@ func TestTransactionPoolUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 2 config.GlobalQueue = 2 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1199,14 +1238,15 @@ func TestTransactionReplacement(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a test account to add transactions with key, _ := crypto.GenerateKey() - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000)) // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) @@ -1278,19 +1318,20 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Create the original pool to inject transaction into the journal db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals config.Journal = journal config.Rejournal = time.Second - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - statedb, _ = pool.currentState() + statedb, _ = pool.blockChain.State() statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) @@ -1320,7 +1361,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() if queued != 0 { @@ -1344,7 +1386,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { time.Sleep(2 * config.Rejournal) pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() if pending != 0 { @@ -1377,7 +1420,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1403,7 +1446,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1424,7 +1467,7 @@ func BenchmarkPoolInsert(b *testing.B) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) txs := make(types.Transactions, b.N) @@ -1449,7 +1492,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) batches := make([]types.Transactions, b.N) -- cgit v1.2.3