diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-04-20 19:56:38 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-04-28 15:47:35 +0800 |
commit | 7948cc0029db76557d6540341bdfeb818ce32c65 (patch) | |
tree | d4547188632e4f53878cc0d06d1e4da840f35b07 /xeth | |
parent | 5aa523e32bedd923c4075a21daefd1b4a512277c (diff) | |
download | go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.gz go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.bz2 go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.lz go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.xz go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.tar.zst go-tangerine-7948cc0029db76557d6540341bdfeb818ce32c65.zip |
rpc, whisper, xeth: fix RPC message retrieval data race
Diffstat (limited to 'xeth')
-rw-r--r-- | xeth/whisper_filter.go | 74 | ||||
-rw-r--r-- | xeth/xeth.go | 13 |
2 files changed, 77 insertions, 10 deletions
diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go index 9d8a739b7..52e70e041 100644 --- a/xeth/whisper_filter.go +++ b/xeth/whisper_filter.go @@ -1,26 +1,84 @@ // Contains the external API side message filter for watching, pooling and polling -// matched whisper messages. +// matched whisper messages, also serializing data access to avoid duplications. package xeth -import "time" +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) // whisperFilter is the message cache matching a specific filter, accumulating // inbound messages until the are requested by the client. type whisperFilter struct { - id int // Filter identifier - cache []WhisperMessage // Cache of messages not yet polled - timeout time.Time // Time when the last message batch was queries + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages } // insert injects a new batch of messages into the filter cache. -func (w *whisperFilter) insert(msgs ...WhisperMessage) { - w.cache = append(w.cache, msgs...) +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } } // retrieve fetches all the cached messages from the filter. func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + messages, w.cache = w.cache, nil - w.timeout = time.Now() + w.update = time.Now() + return } + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} diff --git a/xeth/xeth.go b/xeth/xeth.go index e7e553036..8cc32c958 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { p.messages[id].insert(msg) } id = p.Whisper().Watch(to, from, topics, callback) - p.messages[id] = &whisperFilter{timeout: time.Now()} + p.messages[id] = newWhisperFilter(id, p.Whisper()) return id } @@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { if self.messages[id] != nil { return self.messages[id].retrieve() } + return nil +} + +func (self *XEth) Messages(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + if self.messages[id] != nil { + return self.messages[id].messages() + } return nil } |