aboutsummaryrefslogtreecommitdiffstats
path: root/core/bloombits/matcher_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/bloombits/matcher_test.go')
-rw-r--r--core/bloombits/matcher_test.go283
1 files changed, 163 insertions, 120 deletions
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
index bef1491b8..fc49b43b8 100644
--- a/core/bloombits/matcher_test.go
+++ b/core/bloombits/matcher_test.go
@@ -13,6 +13,7 @@
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
package bloombits
import (
@@ -20,177 +21,219 @@ import (
"sync/atomic"
"testing"
"time"
-
- "github.com/ethereum/go-ethereum/core/types"
)
const testSectionSize = 4096
-func matcherTestVector(b uint, s uint64) []byte {
- r := make([]byte, testSectionSize/8)
- for i, _ := range r {
- var bb byte
- for bit := 0; bit < 8; bit++ {
- blockIdx := s*testSectionSize + uint64(i*8+bit)
- bb += bb
- if (blockIdx % uint64(b)) == 0 {
- bb++
- }
- }
- r[i] = bb
- }
- return r
+// Tests the matcher pipeline on a single continuous workflow without interrupts.
+func TestMatcherContinuous(t *testing.T) {
+ testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75)
+ testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81)
+ testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36)
}
-func expMatch1(idxs types.BloomIndexList, i uint64) bool {
- for _, ii := range idxs {
- if (i % uint64(ii)) != 0 {
- return false
- }
- }
- return true
+// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
+// with the aim of ensuring data items are requested only once.
+func TestMatcherIntermittent(t *testing.T) {
+ testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, true, 75)
+ testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, true, 81)
+ testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36)
}
-func expMatch2(idxs []types.BloomIndexList, i uint64) bool {
- for _, ii := range idxs {
- if expMatch1(ii, i) {
- return true
- }
+// Tests the matcher pipeline on random input to hopefully catch anomalies.
+func TestMatcherRandom(t *testing.T) {
+ for i := 0; i < 10; i++ {
+ testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 10000, 0)
+ testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 10000, 0)
+ testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 10000, 0)
+ testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 10000, 0)
+ testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 10000, 0)
}
- return false
}
-func expMatch3(idxs [][]types.BloomIndexList, i uint64) bool {
- for _, ii := range idxs {
- if !expMatch2(ii, i) {
- return false
+// makeRandomIndexes generates a random filter system, composed on multiple filter
+// criteria, each having one bloom list component for the address and arbitrarilly
+// many topic bloom list components.
+func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
+ res := make([][]bloomIndexes, len(lengths))
+ for i, topics := range lengths {
+ res[i] = make([]bloomIndexes, topics)
+ for j := 0; j < topics; j++ {
+ for k := 0; k < len(res[i][j]); k++ {
+ res[i][j][k] = uint(rand.Intn(max-1) + 2)
+ }
}
}
- return true
+ return res
}
-func testServeMatcher(m *Matcher, stop chan struct{}, cnt *uint32, maxRequestLen int) {
- // serve matcher with test vectors
- for i := 0; i < 10; i++ {
- go func() {
- for {
- select {
- case <-stop:
- return
- default:
- }
- b, ok := m.AllocSectionQueue()
- if !ok {
- return
- }
- if m.SectionCount(b) < maxRequestLen {
- time.Sleep(time.Microsecond * 100)
- }
- s := m.FetchSections(b, maxRequestLen)
- res := make([][]byte, len(s))
- for i, ss := range s {
- res[i] = matcherTestVector(b, ss)
- atomic.AddUint32(cnt, 1)
- }
- m.Deliver(b, s, res)
- }
- }()
+// testMatcherDiffBatches runs the given matches test in single-delivery and also
+// in batches delivery mode, verifying that all kinds of deliveries are handled
+// correctly withn.
+func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32) {
+ singleton := testMatcher(t, filter, blocks, intermittent, retrievals, 1)
+ batched := testMatcher(t, filter, blocks, intermittent, retrievals, 16)
+
+ if singleton != batched {
+ t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
}
}
-func testMatcher(t *testing.T, idxs [][]types.BloomIndexList, cnt uint64, stopOnMatches bool, expCount uint32) uint32 {
- count1 := testMatcherWithReqCount(t, idxs, cnt, stopOnMatches, expCount, 1)
- count16 := testMatcherWithReqCount(t, idxs, cnt, stopOnMatches, expCount, 16)
- if count1 != count16 {
- t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: request count mismatch, %v with maxReqCount = 1 vs. %v with maxReqCount = 16", idxs, cnt, stopOnMatches, count1, count16)
+// testMatcherBothModes runs the given matcher test in both continuous as well as
+// in intermittent mode, verifying that the request counts match each other.
+func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64, retrievals uint32) {
+ continuous := testMatcher(t, filter, blocks, false, retrievals, 16)
+ intermittent := testMatcher(t, filter, blocks, true, retrievals, 16)
+
+ if continuous != intermittent {
+ t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
}
- return count1
}
-func testMatcherWithReqCount(t *testing.T, idxs [][]types.BloomIndexList, cnt uint64, stopOnMatches bool, expCount uint32, maxReqCount int) uint32 {
- m := NewMatcher(testSectionSize, nil, nil)
+// testMatcher is a generic tester to run the given matcher test and return the
+// number of requests made for cross validation between different modes.
+func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
+ // Create a new matcher an simulate our explicit random bitsets
+ matcher := NewMatcher(testSectionSize, nil, nil)
- for _, idxss := range idxs {
- for _, idxs := range idxss {
- for _, idx := range idxs {
- m.newFetcher(idx)
+ matcher.addresses = filter[0]
+ matcher.topics = filter[1:]
+
+ for _, rule := range filter {
+ for _, topic := range rule {
+ for _, bit := range topic {
+ matcher.addScheduler(bit)
}
}
}
+ // Track the number of retrieval requests made
+ var requested uint32
- m.addresses = idxs[0]
- m.topics = idxs[1:]
- var reqCount uint32
+ // Start the matching session for the filter and the retriver goroutines
+ quit := make(chan struct{})
+ matches := make(chan uint64, 16)
- stop := make(chan struct{})
- chn := m.Start(0, cnt-1)
- testServeMatcher(m, stop, &reqCount, maxReqCount)
+ session, err := matcher.Start(0, blocks-1, matches)
+ if err != nil {
+ t.Fatalf("failed to stat matcher session: %v", err)
+ }
+ startRetrievers(session, quit, &requested, maxReqCount)
- for i := uint64(0); i < cnt; i++ {
- if expMatch3(idxs, i) {
- match, ok := <-chn
+ // Iterate over all the blocks and verify that the pipeline produces the correct matches
+ for i := uint64(0); i < blocks; i++ {
+ if expMatch3(filter, i) {
+ match, ok := <-matches
if !ok {
- t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: expected #%v, results channel closed", idxs, cnt, stopOnMatches, i)
+ t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
return 0
}
if match != i {
- t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: expected #%v, got #%v", idxs, cnt, stopOnMatches, i, match)
+ t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
}
- if stopOnMatches {
- m.Stop()
- close(stop)
- stop = make(chan struct{})
- chn = m.Start(i+1, cnt-1)
- testServeMatcher(m, stop, &reqCount, maxReqCount)
+ // If we're testing intermittent mode, abort and restart the pipeline
+ if intermittent {
+ session.Close(time.Second)
+ close(quit)
+
+ quit = make(chan struct{})
+ matches = make(chan uint64, 16)
+
+ session, err = matcher.Start(i+1, blocks-1, matches)
+ if err != nil {
+ t.Fatalf("failed to stat matcher session: %v", err)
+ }
+ startRetrievers(session, quit, &requested, maxReqCount)
}
}
}
- match, ok := <-chn
+ // Ensure the result channel is torn down after the last block
+ match, ok := <-matches
if ok {
- t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: expected closed channel, got #%v", idxs, cnt, stopOnMatches, match)
+ t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
- m.Stop()
- close(stop)
+ // Clean up the session and ensure we match the expected retrieval count
+ session.Close(time.Second)
+ close(quit)
- if expCount != 0 && expCount != reqCount {
- t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: request count mismatch, expected #%v, got #%v", idxs, cnt, stopOnMatches, expCount, reqCount)
+ if retrievals != 0 && requested != retrievals {
+ t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals)
}
+ return requested
+}
+
+// startRetrievers starts a batch of goroutines listening for section requests
+// and serving them.
+func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) {
+ requests := make(chan chan *Retrieval)
+
+ for i := 0; i < 10; i++ {
+ // Start a multiplexer to test multiple threaded execution
+ go session.Multiplex(batch, 100*time.Microsecond, requests)
- return reqCount
+ // Start a services to match the above multiplexer
+ go func() {
+ for {
+ // Wait for a service request or a shutdown
+ select {
+ case <-quit:
+ return
+
+ case request := <-requests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if rand.Int()%4 != 0 { // Handle occasional missing deliveries
+ task.Bitsets[i] = generateBitset(task.Bit, section)
+ atomic.AddUint32(retrievals, 1)
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+ }
}
-func testRandomIdxs(l []int, max int) [][]types.BloomIndexList {
- res := make([][]types.BloomIndexList, len(l))
- for i, ll := range l {
- res[i] = make([]types.BloomIndexList, ll)
- for j, _ := range res[i] {
- for k, _ := range res[i][j] {
- res[i][j][k] = uint(rand.Intn(max-1) + 2)
+// generateBitset generates the rotated bitset for the given bloom bit and section
+// numbers.
+func generateBitset(bit uint, section uint64) []byte {
+ bitset := make([]byte, testSectionSize/8)
+ for i := 0; i < len(bitset); i++ {
+ for b := 0; b < 8; b++ {
+ blockIdx := section*testSectionSize + uint64(i*8+b)
+ bitset[i] += bitset[i]
+ if (blockIdx % uint64(bit)) == 0 {
+ bitset[i]++
}
}
}
- return res
+ return bitset
}
-func TestMatcher(t *testing.T) {
- testMatcher(t, [][]types.BloomIndexList{{{10, 20, 30}}}, 100000, false, 75)
- testMatcher(t, [][]types.BloomIndexList{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81)
- testMatcher(t, [][]types.BloomIndexList{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36)
+func expMatch1(filter bloomIndexes, i uint64) bool {
+ for _, ii := range filter {
+ if (i % uint64(ii)) != 0 {
+ return false
+ }
+ }
+ return true
}
-func TestMatcherStopOnMatches(t *testing.T) {
- testMatcher(t, [][]types.BloomIndexList{{{10, 20, 30}}}, 100000, true, 75)
- testMatcher(t, [][]types.BloomIndexList{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36)
+func expMatch2(filter []bloomIndexes, i uint64) bool {
+ for _, ii := range filter {
+ if expMatch1(ii, i) {
+ return true
+ }
+ }
+ return false
}
-func TestMatcherRandom(t *testing.T) {
- for i := 0; i < 20; i++ {
- testMatcher(t, testRandomIdxs([]int{1}, 50), 100000, false, 0)
- testMatcher(t, testRandomIdxs([]int{3}, 50), 100000, false, 0)
- testMatcher(t, testRandomIdxs([]int{2, 2, 2}, 20), 100000, false, 0)
- testMatcher(t, testRandomIdxs([]int{5, 5, 5}, 50), 100000, false, 0)
- idxs := testRandomIdxs([]int{2, 2, 2}, 20)
- reqCount := testMatcher(t, idxs, 10000, false, 0)
- testMatcher(t, idxs, 10000, true, reqCount)
+func expMatch3(filter [][]bloomIndexes, i uint64) bool {
+ for _, ii := range filter {
+ if !expMatch2(ii, i) {
+ return false
+ }
}
+ return true
}