aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /eth
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloadgo-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.bz2
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.lz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.xz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'eth')
-rw-r--r--eth/api_backend.go28
-rw-r--r--eth/backend.go4
-rw-r--r--eth/filters/filter.go4
-rw-r--r--eth/filters/filter_system.go98
-rw-r--r--eth/filters/filter_system_test.go117
-rw-r--r--eth/filters/filter_test.go34
-rw-r--r--eth/handler.go23
-rw-r--r--eth/handler_test.go2
-rw-r--r--eth/helper_test.go11
-rw-r--r--eth/protocol.go6
10 files changed, 247 insertions, 80 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 7ef7c030d..abf52326b 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -115,6 +115,30 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
}
+func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeRemovedTxEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainSideEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.eth.BlockChain().SubscribeLogsEvent(ch)
+}
+
func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}
@@ -151,6 +175,10 @@ func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
return b.eth.TxPool().Content()
}
+func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.eth.TxPool().SubscribeTxPreEvent(ch)
+}
+
func (b *EthApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}
diff --git a/eth/backend.go b/eth/backend.go
index 8a837f7b8..5f4f2097a 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -137,7 +137,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
- eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig)
+ eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
@@ -151,7 +151,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
- eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
+ eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
maxPeers := config.MaxPeers
if config.LightServ > 0 {
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f27b76929..f848bc6af 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -34,6 +34,10 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
}
// Filter can be used to retrieve and filter logs.
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ab0b7473e..00ade0ffb 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -54,6 +54,19 @@ const (
LastIndexSubscription
)
+const (
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
@@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
return
}
- switch e := ev.Data.(type) {
+ switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- case core.PendingLogsEvent:
- for _, f := range filters[PendingLogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
+ case *event.TypeMuxEvent:
+ switch muxe := e.Data.(type) {
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
}
case core.TxPreEvent:
for _, f := range filters[PendingTransactionsSubscription] {
- if ev.Time.After(f.created) {
- f.hashes <- e.Tx.Hash()
- }
+ f.hashes <- e.Tx.Hash()
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
- if ev.Time.After(f.created) {
- f.headers <- e.Block.Header()
- }
+ f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
})
@@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
- sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{})
+ sub = es.mux.Subscribe(core.PendingLogsEvent{})
+ // Subscribe TxPreEvent form txpool
+ txCh = make(chan core.TxPreEvent, txChanSize)
+ txSub = es.backend.SubscribeTxPreEvent(txCh)
+ // Subscribe RemovedLogsEvent
+ rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
+ rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
+ // Subscribe []*types.Log
+ logsCh = make(chan []*types.Log, logsChanSize)
+ logsSub = es.backend.SubscribeLogsEvent(logsCh)
+ // Subscribe ChainEvent
+ chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
+ chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)
+ // Unsubscribe all events
+ defer sub.Unsubscribe()
+ defer txSub.Unsubscribe()
+ defer rmLogsSub.Unsubscribe()
+ defer logsSub.Unsubscribe()
+ defer chainEvSub.Unsubscribe()
+
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
@@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() {
return
}
es.broadcast(index, ev)
+
+ // Handle subscribed events
+ case ev := <-txCh:
+ es.broadcast(index, ev)
+ case ev := <-rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-logsCh:
+ es.broadcast(index, ev)
+ case ev := <-chainEvCh:
+ es.broadcast(index, ev)
+
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
@@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() {
delete(index[f.typ], f.id)
}
close(f.err)
+
+ // System stopped
+ case <-txSub.Err():
+ return
+ case <-rmLogsSub.Err():
+ return
+ case <-logsSub.Err():
+ return
+ case <-chainEvSub.Err():
+ return
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 23e6d66e1..fcc888b8c 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -34,8 +34,12 @@ import (
)
type testBackend struct {
- mux *event.TypeMux
- db ethdb.Database
+ mux *event.TypeMux
+ db ethdb.Database
+ txFeed *event.Feed
+ rmLogsFeed *event.Feed
+ logsFeed *event.Feed
+ chainFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
return core.GetBlockReceipts(b.db, blockHash, num), nil
}
+func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.txFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.rmLogsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.logsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.chainFeed.Subscribe(ch)
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) {
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
- mux.Post(e)
+ chainFeed.Send(e)
}
<-sub0.Err()
@@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(1 * time.Second)
for _, tx := range transactions {
ev := core.TxPreEvent{Tx: tx}
- mux.Post(ev)
+ txFeed.Send(ev)
}
+ timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
@@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) {
if len(hashes) >= len(transactions) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
+ if len(hashes) != len(transactions) {
+ t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
+ return
+ }
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
@@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) {
}
}
-// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
- if err := mux.Post(allLogs); err != nil {
- t.Fatal(err)
+ if nsend := logsFeed.Send(allLogs); nsend == 0 {
+ t.Fatal("Shoud have at least one subscription")
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
@@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) {
for i, tt := range testCases {
var fetched []*types.Log
+ timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
@@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) {
if len(fetched) >= len(tt.expected) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
@@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) {
}
}
-// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
+ // allLogs are type of core.PendingLogsEvent
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index b6cfd4bbc..3244c04d7 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -49,14 +49,18 @@ func BenchmarkMipmaps(b *testing.B) {
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- addr2 = common.BytesToAddress([]byte("jeff"))
- addr3 = common.BytesToAddress([]byte("ethereum"))
- addr4 = common.BytesToAddress([]byte("random addresses please"))
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ addr2 = common.BytesToAddress([]byte("jeff"))
+ addr3 = common.BytesToAddress([]byte("ethereum"))
+ addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
@@ -119,11 +123,15 @@ func TestFilters(t *testing.T) {
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr = crypto.PubkeyToAddress(key1.PublicKey)
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))
diff --git a/eth/handler.go b/eth/handler.go
index e6a9c86d7..9d230a4ad 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -45,6 +45,10 @@ import (
const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
)
var (
@@ -78,7 +82,8 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
- txSub *event.TypeMuxSubscription
+ txCh chan core.TxPreEvent
+ txSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@@ -200,7 +205,8 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start() {
// broadcast transactions
- pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+ pm.txCh = make(chan core.TxPreEvent, txChanSize)
+ pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
@@ -724,10 +730,15 @@ func (self *ProtocolManager) minedBroadcastLoop() {
}
func (self *ProtocolManager) txBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.txSub.Chan() {
- event := obj.Data.(core.TxPreEvent)
- self.BroadcastTx(event.Tx.Hash(), event.Tx)
+ for {
+ select {
+ case event := <-self.txCh:
+ self.BroadcastTx(event.Tx.Hash(), event.Tx)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-self.txSub.Err():
+ return
+ }
}
}
diff --git a/eth/handler_test.go b/eth/handler_test.go
index ca9c9e1b4..aba277444 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -474,7 +474,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
gspec = &core.Genesis{Config: config}
genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{})
+ blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{})
)
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db)
if err != nil {
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 546478a3e..f1dab9528 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -59,7 +59,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}},
}
genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
+ blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{})
)
chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator)
if _, err := blockchain.InsertChain(chain); err != nil {
@@ -88,8 +88,9 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
// testTxPool is a fake, helper transaction pool for testing purposes
type testTxPool struct {
- pool []*types.Transaction // Collection of all transactions
- added chan<- []*types.Transaction // Notification channel for new transactions
+ txFeed event.Feed
+ pool []*types.Transaction // Collection of all transactions
+ added chan<- []*types.Transaction // Notification channel for new transactions
lock sync.RWMutex // Protects the transaction pool
}
@@ -124,6 +125,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
return batches, nil
}
+func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return p.txFeed.Subscribe(ch)
+}
+
// newTestTransaction create a new dummy transaction.
func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize))
diff --git a/eth/protocol.go b/eth/protocol.go
index 376e4663e..2c41376fa 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -22,7 +22,9 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -100,6 +102,10 @@ type txPool interface {
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending() (map[common.Address]types.Transactions, error)
+
+ // SubscribeTxPreEvent should return an event subscription of
+ // TxPreEvent and send events to the given channel.
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
}
// statusData is the network packet for the status message.