aboutsummaryrefslogtreecommitdiffstats
path: root/xeth/whisper_filter.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-20 19:56:38 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-28 15:47:35 +0800
commit7948cc0029db76557d6540341bdfeb818ce32c65 (patch)
treed4547188632e4f53878cc0d06d1e4da840f35b07 /xeth/whisper_filter.go
parent5aa523e32bedd923c4075a21daefd1b4a512277c (diff)
downloadgo-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.gz
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.bz2
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.lz
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.xz
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.zst
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.zip
rpc, whisper, xeth: fix RPC message retrieval data race
Diffstat (limited to 'xeth/whisper_filter.go')
-rw-r--r--xeth/whisper_filter.go74
1 files changed, 66 insertions, 8 deletions
diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go
index 9d8a739b7..52e70e041 100644
--- a/xeth/whisper_filter.go
+++ b/xeth/whisper_filter.go
@@ -1,26 +1,84 @@
// Contains the external API side message filter for watching, pooling and polling
-// matched whisper messages.
+// matched whisper messages, also serializing data access to avoid duplications.
package xeth
-import "time"
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+)
// whisperFilter is the message cache matching a specific filter, accumulating
// inbound messages until the are requested by the client.
type whisperFilter struct {
- id int // Filter identifier
- cache []WhisperMessage // Cache of messages not yet polled
- timeout time.Time // Time when the last message batch was queries
+ id int // Filter identifier for old message retrieval
+ ref *Whisper // Whisper reference for old message retrieval
+
+ cache []WhisperMessage // Cache of messages not yet polled
+ skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication
+ update time.Time // Time of the last message query
+
+ lock sync.RWMutex // Lock protecting the filter internals
+}
+
+// newWhisperFilter creates a new serialized, poll based whisper topic filter.
+func newWhisperFilter(id int, ref *Whisper) *whisperFilter {
+ return &whisperFilter{
+ id: id,
+ ref: ref,
+
+ update: time.Now(),
+ skip: make(map[common.Hash]struct{}),
+ }
+}
+
+// messages retrieves all the cached messages from the entire pool matching the
+// filter, resetting the filter's change buffer.
+func (w *whisperFilter) messages() []WhisperMessage {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ w.cache = nil
+ w.update = time.Now()
+
+ w.skip = make(map[common.Hash]struct{})
+ messages := w.ref.Messages(w.id)
+ for _, message := range messages {
+ w.skip[message.ref.Hash] = struct{}{}
+ }
+ return messages
}
// insert injects a new batch of messages into the filter cache.
-func (w *whisperFilter) insert(msgs ...WhisperMessage) {
- w.cache = append(w.cache, msgs...)
+func (w *whisperFilter) insert(messages ...WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ for _, message := range messages {
+ if _, ok := w.skip[message.ref.Hash]; !ok {
+ w.cache = append(w.cache, messages...)
+ }
+ }
}
// retrieve fetches all the cached messages from the filter.
func (w *whisperFilter) retrieve() (messages []WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
messages, w.cache = w.cache, nil
- w.timeout = time.Now()
+ w.update = time.Now()
+
return
}
+
+// activity returns the last time instance when client requests were executed on
+// the filter.
+func (w *whisperFilter) activity() time.Time {
+ w.lock.RLock()
+ defer w.lock.RUnlock()
+
+ return w.update
+}