From 7948cc0029db76557d6540341bdfeb818ce32c65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 20 Apr 2015 14:56:38 +0300 Subject: rpc, whisper, xeth: fix RPC message retrieval data race --- xeth/whisper_filter.go | 74 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 8 deletions(-) (limited to 'xeth/whisper_filter.go') diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go index 9d8a739b7..52e70e041 100644 --- a/xeth/whisper_filter.go +++ b/xeth/whisper_filter.go @@ -1,26 +1,84 @@ // Contains the external API side message filter for watching, pooling and polling -// matched whisper messages. +// matched whisper messages, also serializing data access to avoid duplications. package xeth -import "time" +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) // whisperFilter is the message cache matching a specific filter, accumulating // inbound messages until the are requested by the client. type whisperFilter struct { - id int // Filter identifier - cache []WhisperMessage // Cache of messages not yet polled - timeout time.Time // Time when the last message batch was queries + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages } // insert injects a new batch of messages into the filter cache. -func (w *whisperFilter) insert(msgs ...WhisperMessage) { - w.cache = append(w.cache, msgs...) +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } } // retrieve fetches all the cached messages from the filter. func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + messages, w.cache = w.cache, nil - w.timeout = time.Now() + w.update = time.Now() + return } + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} -- cgit v1.2.3