aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuillaume Ballet <gballet@gmail.com>2018-03-01 00:28:09 +0800
committerGitHub <noreply@github.com>2018-03-01 00:28:09 +0800
commit9b4e182ce5c0a82799cc3c455444c18c664fa996 (patch)
treeccafc915e6f3721b771ebed6d8bf3f7f5ef847bf
parent52bb0a1ec7f7b6c64a21291c1ca784763e09608d (diff)
parentd24d10a764457937f1bda72053c82d98ac95dd7a (diff)
downloaddexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.tar
dexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.tar.gz
dexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.tar.bz2
dexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.tar.lz
dexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.tar.xz
dexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.tar.zst
dexon-9b4e182ce5c0a82799cc3c455444c18c664fa996.zip
Merge pull request #16210 from gluk256/288-filter-optimization
whisper: message filtering optimization Only run the message through filters who registered their interest.
-rw-r--r--whisper/whisperv6/filter.go96
-rw-r--r--whisper/whisperv6/filter_test.go29
-rw-r--r--whisper/whisperv6/whisper.go18
-rw-r--r--whisper/whisperv6/whisper_test.go11
4 files changed, 76 insertions, 78 deletions
diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go
index eb0c65fa3..e4171f85c 100644
--- a/whisper/whisperv6/filter.go
+++ b/whisper/whisperv6/filter.go
@@ -35,6 +35,7 @@ type Filter struct {
PoW float64 // Proof of work as described in the Whisper spec
AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
+ id string // unique identifier
Messages map[common.Hash]*ReceivedMessage
mutex sync.RWMutex
@@ -43,15 +44,21 @@ type Filter struct {
// Filters represents a collection of filters
type Filters struct {
watchers map[string]*Filter
- whisper *Whisper
- mutex sync.RWMutex
+
+ topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic
+ allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is
+
+ whisper *Whisper
+ mutex sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters {
return &Filters{
- watchers: make(map[string]*Filter),
- whisper: w,
+ watchers: make(map[string]*Filter),
+ topicMatcher: make(map[TopicType]map[*Filter]struct{}),
+ allTopicsMatcher: make(map[*Filter]struct{}),
+ whisper: w,
}
}
@@ -81,7 +88,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
+ watcher.id = id
fs.watchers[id] = watcher
+ fs.addTopicMatcher(watcher)
return id, err
}
@@ -91,12 +100,51 @@ func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock()
defer fs.mutex.Unlock()
if fs.watchers[id] != nil {
+ fs.removeFromTopicMatchers(fs.watchers[id])
delete(fs.watchers, id)
return true
}
return false
}
+// addTopicMatcher adds a filter to the topic matchers.
+// If the filter's Topics array is empty, it will be tried on every topic.
+// Otherwise, it will be tried on the topics specified.
+func (fs *Filters) addTopicMatcher(watcher *Filter) {
+ if len(watcher.Topics) == 0 {
+ fs.allTopicsMatcher[watcher] = struct{}{}
+ } else {
+ for _, t := range watcher.Topics {
+ topic := BytesToTopic(t)
+ if fs.topicMatcher[topic] == nil {
+ fs.topicMatcher[topic] = make(map[*Filter]struct{})
+ }
+ fs.topicMatcher[topic][watcher] = struct{}{}
+ }
+ }
+}
+
+// removeFromTopicMatchers removes a filter from the topic matchers
+func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
+ delete(fs.allTopicsMatcher, watcher)
+ for _, topic := range watcher.Topics {
+ delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
+ }
+}
+
+// getWatchersByTopic returns a slice containing the filters that
+// match a specific topic
+func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
+ res := make([]*Filter, 0, len(fs.allTopicsMatcher))
+ for watcher := range fs.allTopicsMatcher {
+ res = append(res, watcher)
+ }
+ for watcher := range fs.topicMatcher[topic] {
+ res = append(res, watcher)
+ }
+ return res
+}
+
// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock()
@@ -112,11 +160,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
- i := -1 // only used for logging info
- for _, watcher := range fs.watchers {
- i++
+ candidates := fs.getWatchersByTopic(env.Topic)
+ for _, watcher := range candidates {
if p2pMessage && !watcher.AllowP2P {
- log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i))
+ log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
continue
}
@@ -128,10 +175,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match {
msg = env.Open(watcher)
if msg == nil {
- log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
+ log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
}
} else {
- log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
+ log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
}
}
@@ -194,16 +241,17 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) {
// MatchMessage checks if the filter matches an already decrypted
// message (i.e. a Message that has already been handled by
-// MatchEnvelope when checked by a previous filter)
+// MatchEnvelope when checked by a previous filter).
+// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
if f.PoW > 0 && msg.PoW < f.PoW {
return false
}
if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() {
- return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic)
+ return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst)
} else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() {
- return f.SymKeyHash == msg.SymKeyHash && f.MatchTopic(msg.Topic)
+ return f.SymKeyHash == msg.SymKeyHash
}
return false
}
@@ -211,27 +259,9 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
// MatchEnvelope checks if it's worth decrypting the message. If
// it returns `true`, client code is expected to attempt decrypting
// the message and subsequently call MatchMessage.
+// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
- if f.PoW > 0 && envelope.pow < f.PoW {
- return false
- }
-
- return f.MatchTopic(envelope.Topic)
-}
-
-// MatchTopic checks that the filter captures a given topic.
-func (f *Filter) MatchTopic(topic TopicType) bool {
- if len(f.Topics) == 0 {
- // any topic matches
- return true
- }
-
- for _, bt := range f.Topics {
- if matchSingleTopic(topic, bt) {
- return true
- }
- }
- return false
+ return f.PoW <= 0 || envelope.pow >= f.PoW
}
func matchSingleTopic(topic TopicType, bt []byte) bool {
diff --git a/whisper/whisperv6/filter_test.go b/whisper/whisperv6/filter_test.go
index e7230ef38..0bb7986c3 100644
--- a/whisper/whisperv6/filter_test.go
+++ b/whisper/whisperv6/filter_test.go
@@ -303,9 +303,8 @@ func TestMatchEnvelope(t *testing.T) {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
}
- params.Topic[0] = 0xFF // ensure mismatch
+ params.Topic[0] = 0xFF // topic mismatch
- // mismatch with pseudo-random data
msg, err := NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
@@ -314,14 +313,6 @@ func TestMatchEnvelope(t *testing.T) {
if err != nil {
t.Fatalf("failed Wrap with seed %d: %s.", seed, err)
}
- match := fsym.MatchEnvelope(env)
- if match {
- t.Fatalf("failed MatchEnvelope symmetric with seed %d.", seed)
- }
- match = fasym.MatchEnvelope(env)
- if match {
- t.Fatalf("failed MatchEnvelope asymmetric with seed %d.", seed)
- }
// encrypt symmetrically
i := mrand.Int() % 4
@@ -337,7 +328,7 @@ func TestMatchEnvelope(t *testing.T) {
}
// symmetric + matching topic: match
- match = fsym.MatchEnvelope(env)
+ match := fsym.MatchEnvelope(env)
if !match {
t.Fatalf("failed MatchEnvelope() symmetric with seed %d.", seed)
}
@@ -396,7 +387,7 @@ func TestMatchEnvelope(t *testing.T) {
// asymmetric + matching topic: match
fasym.Topics[i] = fasym.Topics[i+1]
match = fasym.MatchEnvelope(env)
- if match {
+ if !match {
t.Fatalf("failed MatchEnvelope(asymmetric + matching topic) with seed %d.", seed)
}
@@ -431,7 +422,8 @@ func TestMatchEnvelope(t *testing.T) {
// filter with topic + envelope without topic: mismatch
fasym.Topics = fsym.Topics
match = fasym.MatchEnvelope(env)
- if match {
+ if !match {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
t.Fatalf("failed MatchEnvelope(filter without topic + envelope without topic) with seed %d.", seed)
}
}
@@ -487,7 +479,8 @@ func TestMatchMessageSym(t *testing.T) {
// topic mismatch
f.Topics[index][0]++
- if f.MatchMessage(msg) {
+ if !f.MatchMessage(msg) {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed)
}
f.Topics[index][0]--
@@ -580,7 +573,8 @@ func TestMatchMessageAsym(t *testing.T) {
// topic mismatch
f.Topics[index][0]++
- if f.MatchMessage(msg) {
+ if !f.MatchMessage(msg) {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed)
}
f.Topics[index][0]--
@@ -829,8 +823,9 @@ func TestVariableTopics(t *testing.T) {
f.Topics[i][lastTopicByte]++
match = f.MatchEnvelope(env)
- if match {
- t.Fatalf("MatchEnvelope symmetric with seed %d, step %d: false positive.", seed, i)
+ if !match {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
+ t.Fatalf("MatchEnvelope symmetric with seed %d, step %d.", seed, i)
}
}
}
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index b0b601f03..81f59b3cc 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -928,24 +928,6 @@ func (whisper *Whisper) Envelopes() []*Envelope {
return all
}
-// Messages iterates through all currently floating envelopes
-// and retrieves all the messages, that this filter could decrypt.
-func (whisper *Whisper) Messages(id string) []*ReceivedMessage {
- result := make([]*ReceivedMessage, 0)
- whisper.poolMu.RLock()
- defer whisper.poolMu.RUnlock()
-
- if filter := whisper.filters.Get(id); filter != nil {
- for _, env := range whisper.envelopes {
- msg := filter.processEnvelope(env)
- if msg != nil {
- result = append(result, msg)
- }
- }
- }
- return result
-}
-
// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool {
whisper.poolMu.Lock()
diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go
index 99e5f0bbb..3a219cd13 100644
--- a/whisper/whisperv6/whisper_test.go
+++ b/whisper/whisperv6/whisper_test.go
@@ -75,10 +75,6 @@ func TestWhisperBasic(t *testing.T) {
if len(mail) != 0 {
t.Fatalf("failed w.Envelopes().")
}
- m := w.Messages("non-existent")
- if len(m) != 0 {
- t.Fatalf("failed w.Messages.")
- }
derived := pbkdf2.Key([]byte(peerID), nil, 65356, aesKeyLength, sha256.New)
if !validateDataIntegrity(derived, aesKeyLength) {
@@ -593,7 +589,7 @@ func TestCustomization(t *testing.T) {
}
// check w.messages()
- id, err := w.Subscribe(f)
+ _, err = w.Subscribe(f)
if err != nil {
t.Fatalf("failed subscribe with seed %d: %s.", seed, err)
}
@@ -602,11 +598,6 @@ func TestCustomization(t *testing.T) {
if len(mail) > 0 {
t.Fatalf("received premature mail")
}
-
- mail = w.Messages(id)
- if len(mail) != 2 {
- t.Fatalf("failed to get whisper messages")
- }
}
func TestSymmetricSendCycle(t *testing.T) {