aboutsummaryrefslogtreecommitdiffstats
path: root/core/bloombits/matcher_test.go
blob: 4a31854c581a67c4be19a6e1e3dd808095d0dbef (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package bloombits

import (
    "context"
    "math/rand"
    "sync/atomic"
    "testing"
    "time"

    "github.com/ethereum/go-ethereum/common"
)

const testSectionSize = 4096

// Tests that wildcard filter rules (nil) can be specified and are handled well.
func TestMatcherWildcards(t *testing.T) {
    matcher := NewMatcher(testSectionSize, [][][]byte{
        {common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
        {common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()},       // Default hash is not a wildcard
        {common.Hash{0x01}.Bytes()},                              // Plain rule, sanity check
        {common.Hash{0x01}.Bytes(), nil},                         // Wildcard suffix, drop rule
        {nil, common.Hash{0x01}.Bytes()},                         // Wildcard prefix, drop rule
        {nil, nil},                                               // Wildcard combo, drop rule
        {},                                                       // Inited wildcard rule, drop rule
        nil,                                                      // Proper wildcard rule, drop rule
    })
    if len(matcher.filters) != 3 {
        t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3)
    }
    if len(matcher.filters[0]) != 2 {
        t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2)
    }
    if len(matcher.filters[1]) != 2 {
        t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2)
    }
    if len(matcher.filters[2]) != 1 {
        t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1)
    }
}

// Tests the matcher pipeline on a single continuous workflow without interrupts.
func TestMatcherContinuous(t *testing.T) {
    testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75)
    testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81)
    testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36)
}

// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
// with the aim of ensuring data items are requested only once.
func TestMatcherIntermittent(t *testing.T) {
    testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, true, 75)
    testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, true, 81)
    testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36)
}

// Tests the matcher pipeline on random input to hopefully catch anomalies.
func TestMatcherRandom(t *testing.T) {
    for i := 0; i < 10; i++ {
        testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 10000, 0)
        testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 10000, 0)
        testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 10000, 0)
        testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 10000, 0)
        testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 10000, 0)
    }
}

// Tests that matching on everything doesn't crash (special case internally).
func TestWildcardMatcher(t *testing.T) {
    testMatcherBothModes(t, nil, 10000, 0)
}

// makeRandomIndexes generates a random filter system, composed on multiple filter
// criteria, each having one bloom list component for the address and arbitrarily
// many topic bloom list components.
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
    res := make([][]bloomIndexes, len(lengths))
    for i, topics := range lengths {
        res[i] = make([]bloomIndexes, topics)
        for j := 0; j < topics; j++ {
            for k := 0; k < len(res[i][j]); k++ {
                res[i][j][k] = uint(rand.Intn(max-1) + 2)
            }
        }
    }
    return res
}

// testMatcherDiffBatches runs the given matches test in single-delivery and also
// in batches delivery mode, verifying that all kinds of deliveries are handled
// correctly withn.
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32) {
    singleton := testMatcher(t, filter, blocks, intermittent, retrievals, 1)
    batched := testMatcher(t, filter, blocks, intermittent, retrievals, 16)

    if singleton != batched {
        t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
    }
}

// testMatcherBothModes runs the given matcher test in both continuous as well as
// in intermittent mode, verifying that the request counts match each other.
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64, retrievals uint32) {
    continuous := testMatcher(t, filter, blocks, false, retrievals, 16)
    intermittent := testMatcher(t, filter, blocks, true, retrievals, 16)

    if continuous != intermittent {
        t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
    }
}

// testMatcher is a generic tester to run the given matcher test and return the
// number of requests made for cross validation between different modes.
func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
    // Create a new matcher an simulate our explicit random bitsets
    matcher := NewMatcher(testSectionSize, nil)
    matcher.filters = filter

    for _, rule := range filter {
        for _, topic := range rule {
            for _, bit := range topic {
                matcher.addScheduler(bit)
            }
        }
    }
    // Track the number of retrieval requests made
    var requested uint32

    // Start the matching session for the filter and the retriver goroutines
    quit := make(chan struct{})
    matches := make(chan uint64, 16)

    session, err := matcher.Start(context.Background(), 0, blocks-1, matches)
    if err != nil {
        t.Fatalf("failed to stat matcher session: %v", err)
    }
    startRetrievers(session, quit, &requested, maxReqCount)

    // Iterate over all the blocks and verify that the pipeline produces the correct matches
    for i := uint64(0); i < blocks; i++ {
        if expMatch3(filter, i) {
            match, ok := <-matches
            if !ok {
                t.Errorf("filter = %v  blocks = %v  intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
                return 0
            }
            if match != i {
                t.Errorf("filter = %v  blocks = %v  intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
            }
            // If we're testing intermittent mode, abort and restart the pipeline
            if intermittent {
                session.Close()
                close(quit)

                quit = make(chan struct{})
                matches = make(chan uint64, 16)

                session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
                if err != nil {
                    t.Fatalf("failed to stat matcher session: %v", err)
                }
                startRetrievers(session, quit, &requested, maxReqCount)
            }
        }
    }
    // Ensure the result channel is torn down after the last block
    match, ok := <-matches
    if ok {
        t.Errorf("filter = %v  blocks = %v  intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
    }
    // Clean up the session and ensure we match the expected retrieval count
    session.Close()
    close(quit)

    if retrievals != 0 && requested != retrievals {
        t.Errorf("filter = %v  blocks = %v  intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals)
    }
    return requested
}

// startRetrievers starts a batch of goroutines listening for section requests
// and serving them.
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) {
    requests := make(chan chan *Retrieval)

    for i := 0; i < 10; i++ {
        // Start a multiplexer to test multiple threaded execution
        go session.Multiplex(batch, 100*time.Microsecond, requests)

        // Start a services to match the above multiplexer
        go func() {
            for {
                // Wait for a service request or a shutdown
                select {
                case <-quit:
                    return

                case request := <-requests:
                    task := <-request

                    task.Bitsets = make([][]byte, len(task.Sections))
                    for i, section := range task.Sections {
                        if rand.Int()%4 != 0 { // Handle occasional missing deliveries
                            task.Bitsets[i] = generateBitset(task.Bit, section)
                            atomic.AddUint32(retrievals, 1)
                        }
                    }
                    request <- task
                }
            }
        }()
    }
}

// generateBitset generates the rotated bitset for the given bloom bit and section
// numbers.
func generateBitset(bit uint, section uint64) []byte {
    bitset := make([]byte, testSectionSize/8)
    for i := 0; i < len(bitset); i++ {
        for b := 0; b < 8; b++ {
            blockIdx := section*testSectionSize + uint64(i*8+b)
            bitset[i] += bitset[i]
            if (blockIdx % uint64(bit)) == 0 {
                bitset[i]++
            }
        }
    }
    return bitset
}

func expMatch1(filter bloomIndexes, i uint64) bool {
    for _, ii := range filter {
        if (i % uint64(ii)) != 0 {
            return false
        }
    }
    return true
}

func expMatch2(filter []bloomIndexes, i uint64) bool {
    for _, ii := range filter {
        if expMatch1(ii, i) {
            return true
        }
    }
    return false
}

func expMatch3(filter [][]bloomIndexes, i uint64) bool {
    for _, ii := range filter {
        if !expMatch2(ii, i) {
            return false
        }
    }
    return true
}