aboutsummaryrefslogtreecommitdiffstats
path: root/xeth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-20 19:56:38 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-28 15:47:35 +0800
commit7948cc0029db76557d6540341bdfeb818ce32c65 (patch)
treed4547188632e4f53878cc0d06d1e4da840f35b07 /xeth
parent5aa523e32bedd923c4075a21daefd1b4a512277c (diff)
downloadgo-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.gz
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.bz2
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.lz
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.xz
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.zst
go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.zip
rpc, whisper, xeth: fix RPC message retrieval data race
Diffstat (limited to 'xeth')
-rw-r--r--xeth/whisper_filter.go74
-rw-r--r--xeth/xeth.go13
2 files changed, 77 insertions, 10 deletions
diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go
index 9d8a739b7..52e70e041 100644
--- a/xeth/whisper_filter.go
+++ b/xeth/whisper_filter.go
@@ -1,26 +1,84 @@
// Contains the external API side message filter for watching, pooling and polling
-// matched whisper messages.
+// matched whisper messages, also serializing data access to avoid duplications.
package xeth
-import "time"
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+)
// whisperFilter is the message cache matching a specific filter, accumulating
// inbound messages until the are requested by the client.
type whisperFilter struct {
- id int // Filter identifier
- cache []WhisperMessage // Cache of messages not yet polled
- timeout time.Time // Time when the last message batch was queries
+ id int // Filter identifier for old message retrieval
+ ref *Whisper // Whisper reference for old message retrieval
+
+ cache []WhisperMessage // Cache of messages not yet polled
+ skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication
+ update time.Time // Time of the last message query
+
+ lock sync.RWMutex // Lock protecting the filter internals
+}
+
+// newWhisperFilter creates a new serialized, poll based whisper topic filter.
+func newWhisperFilter(id int, ref *Whisper) *whisperFilter {
+ return &whisperFilter{
+ id: id,
+ ref: ref,
+
+ update: time.Now(),
+ skip: make(map[common.Hash]struct{}),
+ }
+}
+
+// messages retrieves all the cached messages from the entire pool matching the
+// filter, resetting the filter's change buffer.
+func (w *whisperFilter) messages() []WhisperMessage {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ w.cache = nil
+ w.update = time.Now()
+
+ w.skip = make(map[common.Hash]struct{})
+ messages := w.ref.Messages(w.id)
+ for _, message := range messages {
+ w.skip[message.ref.Hash] = struct{}{}
+ }
+ return messages
}
// insert injects a new batch of messages into the filter cache.
-func (w *whisperFilter) insert(msgs ...WhisperMessage) {
- w.cache = append(w.cache, msgs...)
+func (w *whisperFilter) insert(messages ...WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ for _, message := range messages {
+ if _, ok := w.skip[message.ref.Hash]; !ok {
+ w.cache = append(w.cache, messages...)
+ }
+ }
}
// retrieve fetches all the cached messages from the filter.
func (w *whisperFilter) retrieve() (messages []WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
messages, w.cache = w.cache, nil
- w.timeout = time.Now()
+ w.update = time.Now()
+
return
}
+
+// activity returns the last time instance when client requests were executed on
+// the filter.
+func (w *whisperFilter) activity() time.Time {
+ w.lock.RLock()
+ defer w.lock.RUnlock()
+
+ return w.update
+}
diff --git a/xeth/xeth.go b/xeth/xeth.go
index e7e553036..8cc32c958 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -97,7 +97,7 @@ done:
}
for id, filter := range self.messages {
- if time.Since(filter.timeout) > filterTickerTime {
+ if time.Since(filter.activity()) > filterTickerTime {
self.Whisper().Unwatch(id)
delete(self.messages, id)
}
@@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int {
p.messages[id].insert(msg)
}
id = p.Whisper().Watch(to, from, topics, callback)
- p.messages[id] = &whisperFilter{timeout: time.Now()}
+ p.messages[id] = newWhisperFilter(id, p.Whisper())
return id
}
@@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage {
if self.messages[id] != nil {
return self.messages[id].retrieve()
}
+ return nil
+}
+
+func (self *XEth) Messages(id int) []WhisperMessage {
+ self.messagesMut.Lock()
+ defer self.messagesMut.Unlock()
+ if self.messages[id] != nil {
+ return self.messages[id].messages()
+ }
return nil
}