aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/filters/filter_system_test.go')
-rw-r--r--eth/filters/filter_system_test.go117
1 files changed, 88 insertions, 29 deletions
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 23e6d66e1..fcc888b8c 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -34,8 +34,12 @@ import (
)
type testBackend struct {
- mux *event.TypeMux
- db ethdb.Database
+ mux *event.TypeMux
+ db ethdb.Database
+ txFeed *event.Feed
+ rmLogsFeed *event.Feed
+ logsFeed *event.Feed
+ chainFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
return core.GetBlockReceipts(b.db, blockHash, num), nil
}
+func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.txFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.rmLogsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.logsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.chainFeed.Subscribe(ch)
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) {
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
- mux.Post(e)
+ chainFeed.Send(e)
}
<-sub0.Err()
@@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(1 * time.Second)
for _, tx := range transactions {
ev := core.TxPreEvent{Tx: tx}
- mux.Post(ev)
+ txFeed.Send(ev)
}
+ timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
@@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) {
if len(hashes) >= len(transactions) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
+ if len(hashes) != len(transactions) {
+ t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
+ return
+ }
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
@@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) {
}
}
-// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
- if err := mux.Post(allLogs); err != nil {
- t.Fatal(err)
+ if nsend := logsFeed.Send(allLogs); nsend == 0 {
+ t.Fatal("Shoud have at least one subscription")
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
@@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) {
for i, tt := range testCases {
var fetched []*types.Log
+ timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
@@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) {
if len(fetched) >= len(tt.expected) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
@@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) {
}
}
-// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
+ // allLogs are type of core.PendingLogsEvent
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)