aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /eth/filters
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloadgo-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.bz2
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.lz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.xz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'eth/filters')
-rw-r--r--eth/filters/filter.go4
-rw-r--r--eth/filters/filter_system.go98
-rw-r--r--eth/filters/filter_system_test.go117
-rw-r--r--eth/filters/filter_test.go34
4 files changed, 185 insertions, 68 deletions
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f27b76929..f848bc6af 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -34,6 +34,10 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
}
// Filter can be used to retrieve and filter logs.
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ab0b7473e..00ade0ffb 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -54,6 +54,19 @@ const (
LastIndexSubscription
)
+const (
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
@@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
return
}
- switch e := ev.Data.(type) {
+ switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- case core.PendingLogsEvent:
- for _, f := range filters[PendingLogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
+ case *event.TypeMuxEvent:
+ switch muxe := e.Data.(type) {
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
}
case core.TxPreEvent:
for _, f := range filters[PendingTransactionsSubscription] {
- if ev.Time.After(f.created) {
- f.hashes <- e.Tx.Hash()
- }
+ f.hashes <- e.Tx.Hash()
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
- if ev.Time.After(f.created) {
- f.headers <- e.Block.Header()
- }
+ f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
})
@@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
- sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{})
+ sub = es.mux.Subscribe(core.PendingLogsEvent{})
+ // Subscribe TxPreEvent form txpool
+ txCh = make(chan core.TxPreEvent, txChanSize)
+ txSub = es.backend.SubscribeTxPreEvent(txCh)
+ // Subscribe RemovedLogsEvent
+ rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
+ rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
+ // Subscribe []*types.Log
+ logsCh = make(chan []*types.Log, logsChanSize)
+ logsSub = es.backend.SubscribeLogsEvent(logsCh)
+ // Subscribe ChainEvent
+ chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
+ chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)
+ // Unsubscribe all events
+ defer sub.Unsubscribe()
+ defer txSub.Unsubscribe()
+ defer rmLogsSub.Unsubscribe()
+ defer logsSub.Unsubscribe()
+ defer chainEvSub.Unsubscribe()
+
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
@@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() {
return
}
es.broadcast(index, ev)
+
+ // Handle subscribed events
+ case ev := <-txCh:
+ es.broadcast(index, ev)
+ case ev := <-rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-logsCh:
+ es.broadcast(index, ev)
+ case ev := <-chainEvCh:
+ es.broadcast(index, ev)
+
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
@@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() {
delete(index[f.typ], f.id)
}
close(f.err)
+
+ // System stopped
+ case <-txSub.Err():
+ return
+ case <-rmLogsSub.Err():
+ return
+ case <-logsSub.Err():
+ return
+ case <-chainEvSub.Err():
+ return
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 23e6d66e1..fcc888b8c 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -34,8 +34,12 @@ import (
)
type testBackend struct {
- mux *event.TypeMux
- db ethdb.Database
+ mux *event.TypeMux
+ db ethdb.Database
+ txFeed *event.Feed
+ rmLogsFeed *event.Feed
+ logsFeed *event.Feed
+ chainFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
return core.GetBlockReceipts(b.db, blockHash, num), nil
}
+func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.txFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.rmLogsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.logsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.chainFeed.Subscribe(ch)
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) {
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
- mux.Post(e)
+ chainFeed.Send(e)
}
<-sub0.Err()
@@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(1 * time.Second)
for _, tx := range transactions {
ev := core.TxPreEvent{Tx: tx}
- mux.Post(ev)
+ txFeed.Send(ev)
}
+ timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
@@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) {
if len(hashes) >= len(transactions) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
+ if len(hashes) != len(transactions) {
+ t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
+ return
+ }
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
@@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) {
}
}
-// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
- if err := mux.Post(allLogs); err != nil {
- t.Fatal(err)
+ if nsend := logsFeed.Send(allLogs); nsend == 0 {
+ t.Fatal("Shoud have at least one subscription")
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
@@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) {
for i, tt := range testCases {
var fetched []*types.Log
+ timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
@@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) {
if len(fetched) >= len(tt.expected) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
@@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) {
}
}
-// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
+ // allLogs are type of core.PendingLogsEvent
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index b6cfd4bbc..3244c04d7 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -49,14 +49,18 @@ func BenchmarkMipmaps(b *testing.B) {
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- addr2 = common.BytesToAddress([]byte("jeff"))
- addr3 = common.BytesToAddress([]byte("ethereum"))
- addr4 = common.BytesToAddress([]byte("random addresses please"))
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ addr2 = common.BytesToAddress([]byte("jeff"))
+ addr3 = common.BytesToAddress([]byte("ethereum"))
+ addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
@@ -119,11 +123,15 @@ func TestFilters(t *testing.T) {
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr = crypto.PubkeyToAddress(key1.PublicKey)
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))