diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api_backend.go | 28 | ||||
-rw-r--r-- | eth/backend.go | 4 | ||||
-rw-r--r-- | eth/filters/filter.go | 4 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 98 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 117 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 34 | ||||
-rw-r--r-- | eth/handler.go | 23 | ||||
-rw-r--r-- | eth/handler_test.go | 2 | ||||
-rw-r--r-- | eth/helper_test.go | 11 | ||||
-rw-r--r-- | eth/protocol.go | 6 |
10 files changed, 247 insertions, 80 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go index 7ef7c030d..abf52326b 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -115,6 +115,30 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil } +func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedTxEvent(ch) +} + +func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainHeadEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainSideEvent(ch) +} + +func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.BlockChain().SubscribeLogsEvent(ch) +} + func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) } @@ -151,6 +175,10 @@ func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.TxPool().Content() } +func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.eth.TxPool().SubscribeTxPreEvent(ch) +} + func (b *EthApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/eth/backend.go b/eth/backend.go index 8a837f7b8..5f4f2097a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -137,7 +137,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig) + eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } @@ -151,7 +151,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) maxPeers := config.MaxPeers if config.LightServ > 0 { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index f27b76929..f848bc6af 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -34,6 +34,10 @@ type Backend interface { EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription } // Filter can be used to retrieve and filter logs. diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ab0b7473e..00ade0ffb 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -54,6 +54,19 @@ const ( LastIndexSubscription ) +const ( + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. + rmLogsChanSize = 10 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + var ( ErrInvalidSubscriptionID = errors.New("invalid id") ) @@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) { +func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { if ev == nil { return } - switch e := ev.Data.(type) { + switch e := ev.(type) { case []*types.Log: if len(e) > 0 { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } } case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - case core.PendingLogsEvent: - for _, f := range filters[PendingLogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs + case *event.TypeMuxEvent: + switch muxe := e.Data.(type) { + case core.PendingLogsEvent: + for _, f := range filters[PendingLogsSubscription] { + if e.Time.After(f.created) { + if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } } } } case core.TxPreEvent: for _, f := range filters[PendingTransactionsSubscription] { - if ev.Time.After(f.created) { - f.hashes <- e.Tx.Hash() - } + f.hashes <- e.Tx.Hash() } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { - if ev.Time.After(f.created) { - f.headers <- e.Block.Header() - } + f.headers <- e.Block.Header() } if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } }) @@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { var ( index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{}) + sub = es.mux.Subscribe(core.PendingLogsEvent{}) + // Subscribe TxPreEvent form txpool + txCh = make(chan core.TxPreEvent, txChanSize) + txSub = es.backend.SubscribeTxPreEvent(txCh) + // Subscribe RemovedLogsEvent + rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) + rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) + // Subscribe []*types.Log + logsCh = make(chan []*types.Log, logsChanSize) + logsSub = es.backend.SubscribeLogsEvent(logsCh) + // Subscribe ChainEvent + chainEvCh = make(chan core.ChainEvent, chainEvChanSize) + chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) ) + // Unsubscribe all events + defer sub.Unsubscribe() + defer txSub.Unsubscribe() + defer rmLogsSub.Unsubscribe() + defer logsSub.Unsubscribe() + defer chainEvSub.Unsubscribe() + for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } @@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() { return } es.broadcast(index, ev) + + // Handle subscribed events + case ev := <-txCh: + es.broadcast(index, ev) + case ev := <-rmLogsCh: + es.broadcast(index, ev) + case ev := <-logsCh: + es.broadcast(index, ev) + case ev := <-chainEvCh: + es.broadcast(index, ev) + case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() { delete(index[f.typ], f.id) } close(f.err) + + // System stopped + case <-txSub.Err(): + return + case <-rmLogsSub.Err(): + return + case <-logsSub.Err(): + return + case <-chainEvSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 23e6d66e1..fcc888b8c 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -34,8 +34,12 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database + mux *event.TypeMux + db ethdb.Database + txFeed *event.Feed + rmLogsFeed *event.Feed + logsFeed *event.Feed + chainFeed *event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t return core.GetBlockReceipts(b.db, blockHash, num), nil } +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.txFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.rmLogsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.logsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.chainFeed.Subscribe(ch) +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) { var ( mux = new(event.TypeMux) db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} api = NewPublicFilterAPI(backend, false) genesis = new(core.Genesis).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) @@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) { time.Sleep(1 * time.Second) for _, e := range chainEvents { - mux.Post(e) + chainFeed.Send(e) } <-sub0.Err() @@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(1 * time.Second) for _, tx := range transactions { ev := core.TxPreEvent{Tx: tx} - mux.Post(ev) + txFeed.Send(ev) } + timeout := time.Now().Add(1 * time.Second) for { results, err := api.GetFilterChanges(fid0) if err != nil { @@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) { if len(hashes) >= len(transactions) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } + if len(hashes) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + return + } for i := range hashes { if hashes[i] != transactions[i].Hash() { t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) @@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) { } } -// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. +// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed. func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) { // raise events time.Sleep(1 * time.Second) - if err := mux.Post(allLogs); err != nil { - t.Fatal(err) + if nsend := logsFeed.Send(allLogs); nsend == 0 { + t.Fatal("Shoud have at least one subscription") } if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { t.Fatal(err) @@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) { for i, tt := range testCases { var fetched []*types.Log + timeout := time.Now().Add(1 * time.Second) for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { @@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) { if len(fetched) >= len(tt.expected) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } @@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux. +// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) { // raise events time.Sleep(1 * time.Second) + // allLogs are type of core.PendingLogsEvent for _, l := range allLogs { if err := mux.Post(l); err != nil { t.Fatal(err) diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b6cfd4bbc..3244c04d7 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -49,14 +49,18 @@ func BenchmarkMipmaps(b *testing.B) { defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() @@ -119,11 +123,15 @@ func TestFilters(t *testing.T) { defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) diff --git a/eth/handler.go b/eth/handler.go index e6a9c86d7..9d230a4ad 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,6 +45,10 @@ import ( const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 ) var ( @@ -78,7 +82,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txSub *event.TypeMuxSubscription + txCh chan core.TxPreEvent + txSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -200,7 +205,8 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start() { // broadcast transactions - pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + pm.txCh = make(chan core.TxPreEvent, txChanSize) + pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) @@ -724,10 +730,15 @@ func (self *ProtocolManager) minedBroadcastLoop() { } func (self *ProtocolManager) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.Data.(core.TxPreEvent) - self.BroadcastTx(event.Tx.Hash(), event.Tx) + for { + select { + case event := <-self.txCh: + self.BroadcastTx(event.Tx.Hash(), event.Tx) + + // Err() channel will be closed when unsubscribing. + case <-self.txSub.Err(): + return + } } } diff --git a/eth/handler_test.go b/eth/handler_test.go index ca9c9e1b4..aba277444 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -474,7 +474,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{}) ) pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db) if err != nil { diff --git a/eth/helper_test.go b/eth/helper_test.go index 546478a3e..f1dab9528 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -59,7 +59,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, } genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) ) chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { @@ -88,8 +88,9 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i // testTxPool is a fake, helper transaction pool for testing purposes type testTxPool struct { - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + txFeed event.Feed + pool []*types.Transaction // Collection of all transactions + added chan<- []*types.Transaction // Notification channel for new transactions lock sync.RWMutex // Protects the transaction pool } @@ -124,6 +125,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } +func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return p.txFeed.Subscribe(ch) +} + // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize)) diff --git a/eth/protocol.go b/eth/protocol.go index 376e4663e..2c41376fa 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -22,7 +22,9 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" ) @@ -100,6 +102,10 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. Pending() (map[common.Address]types.Transactions, error) + + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription } // statusData is the network packet for the status message. |