aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-20 19:56:38 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-28 15:47:35 +0800
commit7948cc0029db76557d6540341bdfeb818ce32c65 (patch)
treed4547188632e4f53878cc0d06d1e4da840f35b07
parent5aa523e32bedd923c4075a21daefd1b4a512277c (diff)
downloaddexon-7948cc0029db76557d6540341bdfeb818ce32c65.tar
dexon-7948cc0029db76557d6540341bdfeb818ce32c65.tar.gz
dexon-7948cc0029db76557d6540341bdfeb818ce32c65.tar.bz2
dexon-7948cc0029db76557d6540341bdfeb818ce32c65.tar.lz
dexon-7948cc0029db76557d6540341bdfeb818ce32c65.tar.xz
dexon-7948cc0029db76557d6540341bdfeb818ce32c65.tar.zst
dexon-7948cc0029db76557d6540341bdfeb818ce32c65.zip
rpc, whisper, xeth: fix RPC message retrieval data race
-rw-r--r--rpc/api.go2
-rw-r--r--whisper/envelope.go1
-rw-r--r--whisper/envelope_test.go36
-rw-r--r--whisper/message.go6
-rw-r--r--xeth/whisper_filter.go74
-rw-r--r--xeth/xeth.go13
6 files changed, 119 insertions, 13 deletions
diff --git a/rpc/api.go b/rpc/api.go
index 614e08764..a73188f07 100644
--- a/rpc/api.go
+++ b/rpc/api.go
@@ -467,7 +467,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
}
- *reply = api.xeth().Whisper().Messages(args.Id)
+ *reply = api.xeth().Messages(args.Id)
// case "eth_register":
// // Placeholder for actual type
diff --git a/whisper/envelope.go b/whisper/envelope.go
index ba3e4ccc9..c1d84df78 100644
--- a/whisper/envelope.go
+++ b/whisper/envelope.go
@@ -73,6 +73,7 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
message := &Message{
Flags: data[0],
Sent: int64(self.Expiry - self.TTL),
+ Hash: self.Hash(),
}
data = data[1:]
diff --git a/whisper/envelope_test.go b/whisper/envelope_test.go
new file mode 100644
index 000000000..ed1f08365
--- /dev/null
+++ b/whisper/envelope_test.go
@@ -0,0 +1,36 @@
+package whisper
+
+import (
+ "bytes"
+ "testing"
+)
+
+func TestEnvelopeOpen(t *testing.T) {
+ payload := []byte("hello world")
+ message := NewMessage(payload)
+
+ envelope, err := message.Wrap(DefaultPoW, Options{})
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ opened, err := envelope.Open(nil)
+ if err != nil {
+ t.Fatalf("failed to open envelope: %v.", err)
+ }
+ if opened.Flags != message.Flags {
+ t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags)
+ }
+ if bytes.Compare(opened.Signature, message.Signature) != 0 {
+ t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature)
+ }
+ if bytes.Compare(opened.Payload, message.Payload) != 0 {
+ t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload)
+ }
+ if opened.Sent != message.Sent {
+ t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent)
+ }
+
+ if opened.Hash != envelope.Hash() {
+ t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash())
+ }
+}
diff --git a/whisper/message.go b/whisper/message.go
index 07c673567..69d85b894 100644
--- a/whisper/message.go
+++ b/whisper/message.go
@@ -8,12 +8,13 @@ import (
"math/rand"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
-// Message represents an end-user data packet to trasmit through the Whisper
+// Message represents an end-user data packet to transmit through the Whisper
// protocol. These are wrapped into Envelopes that need not be understood by
// intermediate nodes, just forwarded.
type Message struct {
@@ -22,7 +23,8 @@ type Message struct {
Payload []byte
Sent int64
- To *ecdsa.PublicKey
+ To *ecdsa.PublicKey // Message recipient (identity used to decode the message)
+ Hash common.Hash // Message envelope hash to act as a unique id in de-duplication
}
// Options specifies the exact way a message should be wrapped into an Envelope.
diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go
index 9d8a739b7..52e70e041 100644
--- a/xeth/whisper_filter.go
+++ b/xeth/whisper_filter.go
@@ -1,26 +1,84 @@
// Contains the external API side message filter for watching, pooling and polling
-// matched whisper messages.
+// matched whisper messages, also serializing data access to avoid duplications.
package xeth
-import "time"
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+)
// whisperFilter is the message cache matching a specific filter, accumulating
// inbound messages until the are requested by the client.
type whisperFilter struct {
- id int // Filter identifier
- cache []WhisperMessage // Cache of messages not yet polled
- timeout time.Time // Time when the last message batch was queries
+ id int // Filter identifier for old message retrieval
+ ref *Whisper // Whisper reference for old message retrieval
+
+ cache []WhisperMessage // Cache of messages not yet polled
+ skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication
+ update time.Time // Time of the last message query
+
+ lock sync.RWMutex // Lock protecting the filter internals
+}
+
+// newWhisperFilter creates a new serialized, poll based whisper topic filter.
+func newWhisperFilter(id int, ref *Whisper) *whisperFilter {
+ return &whisperFilter{
+ id: id,
+ ref: ref,
+
+ update: time.Now(),
+ skip: make(map[common.Hash]struct{}),
+ }
+}
+
+// messages retrieves all the cached messages from the entire pool matching the
+// filter, resetting the filter's change buffer.
+func (w *whisperFilter) messages() []WhisperMessage {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ w.cache = nil
+ w.update = time.Now()
+
+ w.skip = make(map[common.Hash]struct{})
+ messages := w.ref.Messages(w.id)
+ for _, message := range messages {
+ w.skip[message.ref.Hash] = struct{}{}
+ }
+ return messages
}
// insert injects a new batch of messages into the filter cache.
-func (w *whisperFilter) insert(msgs ...WhisperMessage) {
- w.cache = append(w.cache, msgs...)
+func (w *whisperFilter) insert(messages ...WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ for _, message := range messages {
+ if _, ok := w.skip[message.ref.Hash]; !ok {
+ w.cache = append(w.cache, messages...)
+ }
+ }
}
// retrieve fetches all the cached messages from the filter.
func (w *whisperFilter) retrieve() (messages []WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
messages, w.cache = w.cache, nil
- w.timeout = time.Now()
+ w.update = time.Now()
+
return
}
+
+// activity returns the last time instance when client requests were executed on
+// the filter.
+func (w *whisperFilter) activity() time.Time {
+ w.lock.RLock()
+ defer w.lock.RUnlock()
+
+ return w.update
+}
diff --git a/xeth/xeth.go b/xeth/xeth.go
index e7e553036..8cc32c958 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -97,7 +97,7 @@ done:
}
for id, filter := range self.messages {
- if time.Since(filter.timeout) > filterTickerTime {
+ if time.Since(filter.activity()) > filterTickerTime {
self.Whisper().Unwatch(id)
delete(self.messages, id)
}
@@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int {
p.messages[id].insert(msg)
}
id = p.Whisper().Watch(to, from, topics, callback)
- p.messages[id] = &whisperFilter{timeout: time.Now()}
+ p.messages[id] = newWhisperFilter(id, p.Whisper())
return id
}
@@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage {
if self.messages[id] != nil {
return self.messages[id].retrieve()
}
+ return nil
+}
+
+func (self *XEth) Messages(id int) []WhisperMessage {
+ self.messagesMut.Lock()
+ defer self.messagesMut.Unlock()
+ if self.messages[id] != nil {
+ return self.messages[id].messages()
+ }
return nil
}