From 014d8d98370abf770b89f7987a92999133e3d4ea Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 27 Feb 2018 21:16:15 +0100 Subject: whisper: message filtering optimized --- whisper/whisperv6/filter.go | 66 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 11 deletions(-) (limited to 'whisper') diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go index eb0c65fa3..75c31209c 100644 --- a/whisper/whisperv6/filter.go +++ b/whisper/whisperv6/filter.go @@ -35,6 +35,7 @@ type Filter struct { PoW float64 // Proof of work as described in the Whisper spec AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization + id string // unique identifier Messages map[common.Hash]*ReceivedMessage mutex sync.RWMutex @@ -42,16 +43,21 @@ type Filter struct { // Filters represents a collection of filters type Filters struct { - watchers map[string]*Filter - whisper *Whisper - mutex sync.RWMutex + watchers map[string]*Filter + topicMatcher map[TopicType]map[*Filter]struct{} + allTopicsMatcher map[*Filter]struct{} + + whisper *Whisper + mutex sync.RWMutex } // NewFilters returns a newly created filter collection func NewFilters(w *Whisper) *Filters { return &Filters{ - watchers: make(map[string]*Filter), - whisper: w, + watchers: make(map[string]*Filter), + topicMatcher: make(map[TopicType]map[*Filter]struct{}), + allTopicsMatcher: make(map[*Filter]struct{}), + whisper: w, } } @@ -81,7 +87,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) { watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym) } + watcher.id = id fs.watchers[id] = watcher + fs.addTopicMatcher(watcher) return id, err } @@ -91,12 +99,49 @@ func (fs *Filters) Uninstall(id string) bool { fs.mutex.Lock() defer fs.mutex.Unlock() if fs.watchers[id] != nil { + fs.removeFromTopicMatchers(fs.watchers[id]) delete(fs.watchers, id) return true } return false } +// addTopicMatcher adds a filter to the topic matchers +func (fs *Filters) addTopicMatcher(watcher *Filter) { + if len(watcher.Topics) == 0 { + fs.allTopicsMatcher[watcher] = struct{}{} + } else { + for _, t := range watcher.Topics { + topic := BytesToTopic(t) + if fs.topicMatcher[topic] == nil { + fs.topicMatcher[topic] = make(map[*Filter]struct{}) + } + fs.topicMatcher[topic][watcher] = struct{}{} + } + } +} + +// removeFromTopicMatchers removes a filter from the topic matchers +func (fs *Filters) removeFromTopicMatchers(watcher *Filter) { + delete(fs.allTopicsMatcher, watcher) + for _, topic := range watcher.Topics { + delete(fs.topicMatcher[BytesToTopic(topic)], watcher) + } +} + +// getWatchersByTopic returns a slice containing the filters that +// match a specific topic +func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter { + res := make([]*Filter, 0, len(fs.allTopicsMatcher)) + for watcher, _ := range fs.allTopicsMatcher { + res = append(res, watcher) + } + for watcher, _ := range fs.topicMatcher[topic] { + res = append(res, watcher) + } + return res +} + // Get returns a filter from the collection with a specific ID func (fs *Filters) Get(id string) *Filter { fs.mutex.RLock() @@ -112,11 +157,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { fs.mutex.RLock() defer fs.mutex.RUnlock() - i := -1 // only used for logging info - for _, watcher := range fs.watchers { - i++ + candidates := fs.getWatchersByTopic(env.Topic) + for _, watcher := range candidates { if p2pMessage && !watcher.AllowP2P { - log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i)) + log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id)) continue } @@ -128,10 +172,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { if match { msg = env.Open(watcher) if msg == nil { - log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i) + log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id) } } else { - log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i) + log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id) } } -- cgit v1.2.3 From c733792be4b49ec47c5307b1a3b8fa116ea16933 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 27 Feb 2018 23:38:20 +0100 Subject: whsiper: refactoring --- whisper/whisperv6/filter.go | 29 ++++++----------------------- whisper/whisperv6/filter_test.go | 25 +++++++++++++++---------- whisper/whisperv6/whisper.go | 18 ------------------ whisper/whisperv6/whisper_test.go | 11 +---------- 4 files changed, 22 insertions(+), 61 deletions(-) (limited to 'whisper') diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go index 75c31209c..d7b71795d 100644 --- a/whisper/whisperv6/filter.go +++ b/whisper/whisperv6/filter.go @@ -238,16 +238,17 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) { // MatchMessage checks if the filter matches an already decrypted // message (i.e. a Message that has already been handled by -// MatchEnvelope when checked by a previous filter) +// MatchEnvelope when checked by a previous filter). +// Topics are not checked here, since this is done by topic matchers. func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { if f.PoW > 0 && msg.PoW < f.PoW { return false } if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() { - return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic) + return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) } else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() { - return f.SymKeyHash == msg.SymKeyHash && f.MatchTopic(msg.Topic) + return f.SymKeyHash == msg.SymKeyHash } return false } @@ -255,27 +256,9 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { // MatchEnvelope checks if it's worth decrypting the message. If // it returns `true`, client code is expected to attempt decrypting // the message and subsequently call MatchMessage. +// Topics are not checked here, since this is done by topic matchers. func (f *Filter) MatchEnvelope(envelope *Envelope) bool { - if f.PoW > 0 && envelope.pow < f.PoW { - return false - } - - return f.MatchTopic(envelope.Topic) -} - -// MatchTopic checks that the filter captures a given topic. -func (f *Filter) MatchTopic(topic TopicType) bool { - if len(f.Topics) == 0 { - // any topic matches - return true - } - - for _, bt := range f.Topics { - if matchSingleTopic(topic, bt) { - return true - } - } - return false + return f.PoW <= 0 || envelope.pow >= f.PoW } func matchSingleTopic(topic TopicType, bt []byte) bool { diff --git a/whisper/whisperv6/filter_test.go b/whisper/whisperv6/filter_test.go index e7230ef38..491e137bd 100644 --- a/whisper/whisperv6/filter_test.go +++ b/whisper/whisperv6/filter_test.go @@ -303,9 +303,8 @@ func TestMatchEnvelope(t *testing.T) { t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err) } - params.Topic[0] = 0xFF // ensure mismatch + params.Topic[0] = 0xFF // topic mismatch - // mismatch with pseudo-random data msg, err := NewSentMessage(params) if err != nil { t.Fatalf("failed to create new message with seed %d: %s.", seed, err) @@ -315,11 +314,13 @@ func TestMatchEnvelope(t *testing.T) { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } match := fsym.MatchEnvelope(env) - if match { + if !match { + // topic mismatch should have no affect, as topics are handled by topic matchers t.Fatalf("failed MatchEnvelope symmetric with seed %d.", seed) } match = fasym.MatchEnvelope(env) - if match { + if !match { + // topic mismatch should have no affect, as topics are handled by topic matchers t.Fatalf("failed MatchEnvelope asymmetric with seed %d.", seed) } @@ -396,7 +397,7 @@ func TestMatchEnvelope(t *testing.T) { // asymmetric + matching topic: match fasym.Topics[i] = fasym.Topics[i+1] match = fasym.MatchEnvelope(env) - if match { + if !match { t.Fatalf("failed MatchEnvelope(asymmetric + matching topic) with seed %d.", seed) } @@ -431,7 +432,8 @@ func TestMatchEnvelope(t *testing.T) { // filter with topic + envelope without topic: mismatch fasym.Topics = fsym.Topics match = fasym.MatchEnvelope(env) - if match { + if !match { + // topic mismatch should have no affect, as topics are handled by topic matchers t.Fatalf("failed MatchEnvelope(filter without topic + envelope without topic) with seed %d.", seed) } } @@ -487,7 +489,8 @@ func TestMatchMessageSym(t *testing.T) { // topic mismatch f.Topics[index][0]++ - if f.MatchMessage(msg) { + if !f.MatchMessage(msg) { + // topic mismatch should have no affect, as topics are handled by topic matchers t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed) } f.Topics[index][0]-- @@ -580,7 +583,8 @@ func TestMatchMessageAsym(t *testing.T) { // topic mismatch f.Topics[index][0]++ - if f.MatchMessage(msg) { + if !f.MatchMessage(msg) { + // topic mismatch should have no affect, as topics are handled by topic matchers t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed) } f.Topics[index][0]-- @@ -829,8 +833,9 @@ func TestVariableTopics(t *testing.T) { f.Topics[i][lastTopicByte]++ match = f.MatchEnvelope(env) - if match { - t.Fatalf("MatchEnvelope symmetric with seed %d, step %d: false positive.", seed, i) + if !match { + // topic mismatch should have no affect, as topics are handled by topic matchers + t.Fatalf("MatchEnvelope symmetric with seed %d, step %d.", seed, i) } } } diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index b0b601f03..81f59b3cc 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -928,24 +928,6 @@ func (whisper *Whisper) Envelopes() []*Envelope { return all } -// Messages iterates through all currently floating envelopes -// and retrieves all the messages, that this filter could decrypt. -func (whisper *Whisper) Messages(id string) []*ReceivedMessage { - result := make([]*ReceivedMessage, 0) - whisper.poolMu.RLock() - defer whisper.poolMu.RUnlock() - - if filter := whisper.filters.Get(id); filter != nil { - for _, env := range whisper.envelopes { - msg := filter.processEnvelope(env) - if msg != nil { - result = append(result, msg) - } - } - } - return result -} - // isEnvelopeCached checks if envelope with specific hash has already been received and cached. func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool { whisper.poolMu.Lock() diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go index 99e5f0bbb..3a219cd13 100644 --- a/whisper/whisperv6/whisper_test.go +++ b/whisper/whisperv6/whisper_test.go @@ -75,10 +75,6 @@ func TestWhisperBasic(t *testing.T) { if len(mail) != 0 { t.Fatalf("failed w.Envelopes().") } - m := w.Messages("non-existent") - if len(m) != 0 { - t.Fatalf("failed w.Messages.") - } derived := pbkdf2.Key([]byte(peerID), nil, 65356, aesKeyLength, sha256.New) if !validateDataIntegrity(derived, aesKeyLength) { @@ -593,7 +589,7 @@ func TestCustomization(t *testing.T) { } // check w.messages() - id, err := w.Subscribe(f) + _, err = w.Subscribe(f) if err != nil { t.Fatalf("failed subscribe with seed %d: %s.", seed, err) } @@ -602,11 +598,6 @@ func TestCustomization(t *testing.T) { if len(mail) > 0 { t.Fatalf("received premature mail") } - - mail = w.Messages(id) - if len(mail) != 2 { - t.Fatalf("failed to get whisper messages") - } } func TestSymmetricSendCycle(t *testing.T) { -- cgit v1.2.3 From d24d10a764457937f1bda72053c82d98ac95dd7a Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 28 Feb 2018 15:05:35 +0100 Subject: whisper: style fixes --- whisper/whisperv6/filter.go | 15 +++++++++------ whisper/whisperv6/filter_test.go | 12 +----------- 2 files changed, 10 insertions(+), 17 deletions(-) (limited to 'whisper') diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go index d7b71795d..e4171f85c 100644 --- a/whisper/whisperv6/filter.go +++ b/whisper/whisperv6/filter.go @@ -43,9 +43,10 @@ type Filter struct { // Filters represents a collection of filters type Filters struct { - watchers map[string]*Filter - topicMatcher map[TopicType]map[*Filter]struct{} - allTopicsMatcher map[*Filter]struct{} + watchers map[string]*Filter + + topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic + allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is whisper *Whisper mutex sync.RWMutex @@ -106,7 +107,9 @@ func (fs *Filters) Uninstall(id string) bool { return false } -// addTopicMatcher adds a filter to the topic matchers +// addTopicMatcher adds a filter to the topic matchers. +// If the filter's Topics array is empty, it will be tried on every topic. +// Otherwise, it will be tried on the topics specified. func (fs *Filters) addTopicMatcher(watcher *Filter) { if len(watcher.Topics) == 0 { fs.allTopicsMatcher[watcher] = struct{}{} @@ -133,10 +136,10 @@ func (fs *Filters) removeFromTopicMatchers(watcher *Filter) { // match a specific topic func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter { res := make([]*Filter, 0, len(fs.allTopicsMatcher)) - for watcher, _ := range fs.allTopicsMatcher { + for watcher := range fs.allTopicsMatcher { res = append(res, watcher) } - for watcher, _ := range fs.topicMatcher[topic] { + for watcher := range fs.topicMatcher[topic] { res = append(res, watcher) } return res diff --git a/whisper/whisperv6/filter_test.go b/whisper/whisperv6/filter_test.go index 491e137bd..0bb7986c3 100644 --- a/whisper/whisperv6/filter_test.go +++ b/whisper/whisperv6/filter_test.go @@ -313,16 +313,6 @@ func TestMatchEnvelope(t *testing.T) { if err != nil { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - match := fsym.MatchEnvelope(env) - if !match { - // topic mismatch should have no affect, as topics are handled by topic matchers - t.Fatalf("failed MatchEnvelope symmetric with seed %d.", seed) - } - match = fasym.MatchEnvelope(env) - if !match { - // topic mismatch should have no affect, as topics are handled by topic matchers - t.Fatalf("failed MatchEnvelope asymmetric with seed %d.", seed) - } // encrypt symmetrically i := mrand.Int() % 4 @@ -338,7 +328,7 @@ func TestMatchEnvelope(t *testing.T) { } // symmetric + matching topic: match - match = fsym.MatchEnvelope(env) + match := fsym.MatchEnvelope(env) if !match { t.Fatalf("failed MatchEnvelope() symmetric with seed %d.", seed) } -- cgit v1.2.3