diff options
Diffstat (limited to 'eth/filters/filter_system_test.go')
-rw-r--r-- | eth/filters/filter_system_test.go | 349 |
1 files changed, 279 insertions, 70 deletions
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 72824cb08..9e6fde1c6 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -17,101 +17,310 @@ package filters import ( + "math/big" + "reflect" "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" ) -func TestCallbacks(t *testing.T) { +var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + api = NewPublicFilterAPI(db, mux) +) + +// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. +// It creates multiple subscriptions: +// - one at the start and should receive all posted chain events and a second (blockHashes) +// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2]) +// - one that is created after the second cutoff moment (blockHashes[cutoff2:]) +func TestBlockSubscription(t *testing.T) { + t.Parallel() + var ( - mux event.TypeMux - fs = NewFilterSystem(&mux) - blockDone = make(chan struct{}) - txDone = make(chan struct{}) - logDone = make(chan struct{}) - removedLogDone = make(chan struct{}) - pendingLogDone = make(chan struct{}) + genesis = core.WriteGenesisBlockForTesting(db) + chain, _ = core.GenerateChain(nil, genesis, db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) - blockFilter := &Filter{ - BlockCallback: func(*types.Block, vm.Logs) { - close(blockDone) - }, - } - txFilter := &Filter{ - TransactionCallback: func(*types.Transaction) { - close(txDone) - }, + for _, blk := range chain { + chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk}) } - logFilter := &Filter{ - LogCallback: func(l *vm.Log, oob bool) { - if !oob { - close(logDone) + + chan0 := make(chan *types.Header) + sub0 := api.events.SubscribeNewHeads(chan0) + chan1 := make(chan *types.Header) + sub1 := api.events.SubscribeNewHeads(chan1) + + go func() { // simulate client + i1, i2 := 0, 0 + for i1 != len(chainEvents) || i2 != len(chainEvents) { + select { + case header := <-chan0: + if chainEvents[i1].Hash != header.Hash() { + t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash()) + } + i1++ + case header := <-chan1: + if chainEvents[i2].Hash != header.Hash() { + t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash()) + } + i2++ } - }, + } + + sub0.Unsubscribe() + sub1.Unsubscribe() + }() + + time.Sleep(1 * time.Second) + for _, e := range chainEvents { + mux.Post(e) } - removedLogFilter := &Filter{ - LogCallback: func(l *vm.Log, oob bool) { - if oob { - close(removedLogDone) - } - }, + + <-sub0.Err() + <-sub1.Err() +} + +// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux. +func TestPendingTxFilter(t *testing.T) { + t.Parallel() + + var ( + transactions = []*types.Transaction{ + types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + } + + hashes []common.Hash + ) + + fid0 := api.NewPendingTransactionFilter() + + time.Sleep(1 * time.Second) + for _, tx := range transactions { + ev := core.TxPreEvent{Tx: tx} + mux.Post(ev) } - pendingLogFilter := &Filter{ - LogCallback: func(*vm.Log, bool) { - close(pendingLogDone) - }, + + for { + h := api.GetFilterChanges(fid0).([]common.Hash) + hashes = append(hashes, h...) + + if len(hashes) >= len(transactions) { + break + } + + time.Sleep(100 * time.Millisecond) + } + + for i := range hashes { + if hashes[i] != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + } + } +} + +// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. +func TestLogFilter(t *testing.T) { + t.Parallel() + + var ( + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") + secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") + thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") + notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") + firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") + + allLogs = vm.Logs{ + // Note, these are used for comparison of the test cases. + vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0), + vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1), + vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1), + vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 2), + vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3), + } + + testCases = []struct { + crit FilterCriteria + expected vm.Logs + id rpc.ID + }{ + // match all + {FilterCriteria{}, allLogs, ""}, + // match none due to no matching addresses + {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, + // match logs based on addresses, ignore topics + {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, + // match none due to no matching topics (match with address) + {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, + // match logs based on addresses and topics + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, + // match logs based on multiple addresses and "or" topics + {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, + // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes + {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""}, + } + + err error + ) + + // create all filters + for i := range testCases { + testCases[i].id = api.NewFilter(testCases[i].crit) } - fs.Add(blockFilter, ChainFilter) - fs.Add(txFilter, PendingTxFilter) - fs.Add(logFilter, LogFilter) - fs.Add(removedLogFilter, LogFilter) - fs.Add(pendingLogFilter, PendingLogFilter) - - mux.Post(core.ChainEvent{}) - mux.Post(core.TxPreEvent{}) - mux.Post(vm.Logs{&vm.Log{}}) - mux.Post(core.RemovedLogsEvent{Logs: vm.Logs{&vm.Log{}}}) - mux.Post(core.PendingLogsEvent{Logs: vm.Logs{&vm.Log{}}}) - - const dura = 5 * time.Second - failTimer := time.NewTimer(dura) - select { - case <-blockDone: - case <-failTimer.C: - t.Error("block filter failed to trigger (timeout)") + // raise events + time.Sleep(1 * time.Second) + if err = mux.Post(allLogs); err != nil { + t.Fatal(err) } - failTimer.Reset(dura) - select { - case <-txDone: - case <-failTimer.C: - t.Error("transaction filter failed to trigger (timeout)") + for i, tt := range testCases { + var fetched []Log + for { // fetch all expected logs + fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...) + if len(fetched) >= len(tt.expected) { + break + } + + time.Sleep(100 * time.Millisecond) + } + + if len(fetched) != len(tt.expected) { + t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + return + } + + for l := range fetched { + if fetched[l].Removed { + t.Errorf("expected log not to be removed for log %d in case %d", l, i) + } + if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { + t.Errorf("invalid log on index %d for case %d", l, i) + } + + } } +} - failTimer.Reset(dura) - select { - case <-logDone: - case <-failTimer.C: - t.Error("log filter failed to trigger (timeout)") +// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux. +func TestPendingLogsSubscription(t *testing.T) { + t.Parallel() + + var ( + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") + secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") + thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") + notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") + firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") + forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") + notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") + + allLogs = []core.PendingLogsEvent{ + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 2)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 4)}}, + core.PendingLogsEvent{Logs: vm.Logs{ + vm.NewLog(thirdAddress, []common.Hash{firstTopic}, []byte(""), 5), + vm.NewLog(thirdAddress, []common.Hash{thirdTopic}, []byte(""), 5), + vm.NewLog(thirdAddress, []common.Hash{forthTopic}, []byte(""), 5), + vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 5), + }}, + } + + convertLogs = func(pl []core.PendingLogsEvent) vm.Logs { + var logs vm.Logs + for _, l := range pl { + logs = append(logs, l.Logs...) + } + return logs + } + + testCases = []struct { + crit FilterCriteria + expected vm.Logs + c chan []Log + sub *Subscription + }{ + // match all + {FilterCriteria{}, convertLogs(allLogs), nil, nil}, + // match none due to no matching addresses + {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{[]common.Hash{}}}, vm.Logs{}, nil, nil}, + // match logs based on addresses, ignore topics + {FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + // match none due to no matching topics (match with address) + {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, nil, nil}, + // match logs based on addresses and topics + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil}, + // match logs based on multiple addresses and "or" topics + {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil}, + // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes + {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + // multiple pending logs, should match only 2 topics from the logs in block 5 + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, forthTopic}}}, vm.Logs{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, + } + ) + + // create all subscriptions, this ensures all subscriptions are created before the events are posted. + // on slow machines this could otherwise lead to missing events when the subscription is created after + // (some) events are posted. + for i := range testCases { + testCases[i].c = make(chan []Log) + testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c) } - failTimer.Reset(dura) - select { - case <-removedLogDone: - case <-failTimer.C: - t.Error("removed log filter failed to trigger (timeout)") + for n, test := range testCases { + i := n + tt := test + go func() { + var fetched []Log + fetchLoop: + for { + logs := <-tt.c + fetched = append(fetched, logs...) + if len(fetched) >= len(tt.expected) { + break fetchLoop + } + } + + if len(fetched) != len(tt.expected) { + t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + } + + for l := range fetched { + if fetched[l].Removed { + t.Errorf("expected log not to be removed for log %d in case %d", l, i) + } + if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { + t.Errorf("invalid log on index %d for case %d", l, i) + } + } + }() } - failTimer.Reset(dura) - select { - case <-pendingLogDone: - case <-failTimer.C: - t.Error("pending log filter failed to trigger (timeout)") + // raise events + time.Sleep(1 * time.Second) + for _, l := range allLogs { + if err := mux.Post(l); err != nil { + t.Fatal(err) + } } } |