aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVlad <gluk256@gmail.com>2018-02-28 04:16:15 +0800
committerVlad <gluk256@gmail.com>2018-02-28 04:16:15 +0800
commit014d8d98370abf770b89f7987a92999133e3d4ea (patch)
tree1a1e0f43688d5bd908d112fde25577f66e1cb0fb
parent4c845bdc271c46706ea4af248a1efec8683dc53a (diff)
downloaddexon-014d8d98370abf770b89f7987a92999133e3d4ea.tar
dexon-014d8d98370abf770b89f7987a92999133e3d4ea.tar.gz
dexon-014d8d98370abf770b89f7987a92999133e3d4ea.tar.bz2
dexon-014d8d98370abf770b89f7987a92999133e3d4ea.tar.lz
dexon-014d8d98370abf770b89f7987a92999133e3d4ea.tar.xz
dexon-014d8d98370abf770b89f7987a92999133e3d4ea.tar.zst
dexon-014d8d98370abf770b89f7987a92999133e3d4ea.zip
whisper: message filtering optimized
-rw-r--r--whisper/whisperv6/filter.go66
1 files changed, 55 insertions, 11 deletions
diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go
index eb0c65fa3..75c31209c 100644
--- a/whisper/whisperv6/filter.go
+++ b/whisper/whisperv6/filter.go
@@ -35,6 +35,7 @@ type Filter struct {
PoW float64 // Proof of work as described in the Whisper spec
AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
+ id string // unique identifier
Messages map[common.Hash]*ReceivedMessage
mutex sync.RWMutex
@@ -42,16 +43,21 @@ type Filter struct {
// Filters represents a collection of filters
type Filters struct {
- watchers map[string]*Filter
- whisper *Whisper
- mutex sync.RWMutex
+ watchers map[string]*Filter
+ topicMatcher map[TopicType]map[*Filter]struct{}
+ allTopicsMatcher map[*Filter]struct{}
+
+ whisper *Whisper
+ mutex sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters {
return &Filters{
- watchers: make(map[string]*Filter),
- whisper: w,
+ watchers: make(map[string]*Filter),
+ topicMatcher: make(map[TopicType]map[*Filter]struct{}),
+ allTopicsMatcher: make(map[*Filter]struct{}),
+ whisper: w,
}
}
@@ -81,7 +87,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
+ watcher.id = id
fs.watchers[id] = watcher
+ fs.addTopicMatcher(watcher)
return id, err
}
@@ -91,12 +99,49 @@ func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock()
defer fs.mutex.Unlock()
if fs.watchers[id] != nil {
+ fs.removeFromTopicMatchers(fs.watchers[id])
delete(fs.watchers, id)
return true
}
return false
}
+// addTopicMatcher adds a filter to the topic matchers
+func (fs *Filters) addTopicMatcher(watcher *Filter) {
+ if len(watcher.Topics) == 0 {
+ fs.allTopicsMatcher[watcher] = struct{}{}
+ } else {
+ for _, t := range watcher.Topics {
+ topic := BytesToTopic(t)
+ if fs.topicMatcher[topic] == nil {
+ fs.topicMatcher[topic] = make(map[*Filter]struct{})
+ }
+ fs.topicMatcher[topic][watcher] = struct{}{}
+ }
+ }
+}
+
+// removeFromTopicMatchers removes a filter from the topic matchers
+func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
+ delete(fs.allTopicsMatcher, watcher)
+ for _, topic := range watcher.Topics {
+ delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
+ }
+}
+
+// getWatchersByTopic returns a slice containing the filters that
+// match a specific topic
+func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
+ res := make([]*Filter, 0, len(fs.allTopicsMatcher))
+ for watcher, _ := range fs.allTopicsMatcher {
+ res = append(res, watcher)
+ }
+ for watcher, _ := range fs.topicMatcher[topic] {
+ res = append(res, watcher)
+ }
+ return res
+}
+
// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock()
@@ -112,11 +157,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
- i := -1 // only used for logging info
- for _, watcher := range fs.watchers {
- i++
+ candidates := fs.getWatchersByTopic(env.Topic)
+ for _, watcher := range candidates {
if p2pMessage && !watcher.AllowP2P {
- log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i))
+ log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
continue
}
@@ -128,10 +172,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match {
msg = env.Open(watcher)
if msg == nil {
- log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
+ log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
}
} else {
- log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
+ log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
}
}