diff options
-rw-r--r-- | rpc/api.go | 2 | ||||
-rw-r--r-- | whisper/envelope.go | 1 | ||||
-rw-r--r-- | whisper/envelope_test.go | 36 | ||||
-rw-r--r-- | whisper/message.go | 6 | ||||
-rw-r--r-- | xeth/whisper_filter.go | 74 | ||||
-rw-r--r-- | xeth/xeth.go | 13 |
6 files changed, 119 insertions, 13 deletions
diff --git a/rpc/api.go b/rpc/api.go index 614e08764..a73188f07 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -467,7 +467,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().Whisper().Messages(args.Id) + *reply = api.xeth().Messages(args.Id) // case "eth_register": // // Placeholder for actual type diff --git a/whisper/envelope.go b/whisper/envelope.go index ba3e4ccc9..c1d84df78 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -73,6 +73,7 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { message := &Message{ Flags: data[0], Sent: int64(self.Expiry - self.TTL), + Hash: self.Hash(), } data = data[1:] diff --git a/whisper/envelope_test.go b/whisper/envelope_test.go new file mode 100644 index 000000000..ed1f08365 --- /dev/null +++ b/whisper/envelope_test.go @@ -0,0 +1,36 @@ +package whisper + +import ( + "bytes" + "testing" +) + +func TestEnvelopeOpen(t *testing.T) { + payload := []byte("hello world") + message := NewMessage(payload) + + envelope, err := message.Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v.", err) + } + if opened.Flags != message.Flags { + t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) + } + if bytes.Compare(opened.Signature, message.Signature) != 0 { + t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature) + } + if bytes.Compare(opened.Payload, message.Payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) + } + if opened.Sent != message.Sent { + t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) + } + + if opened.Hash != envelope.Hash() { + t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) + } +} diff --git a/whisper/message.go b/whisper/message.go index 07c673567..69d85b894 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -8,12 +8,13 @@ import ( "math/rand" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) -// Message represents an end-user data packet to trasmit through the Whisper +// Message represents an end-user data packet to transmit through the Whisper // protocol. These are wrapped into Envelopes that need not be understood by // intermediate nodes, just forwarded. type Message struct { @@ -22,7 +23,8 @@ type Message struct { Payload []byte Sent int64 - To *ecdsa.PublicKey + To *ecdsa.PublicKey // Message recipient (identity used to decode the message) + Hash common.Hash // Message envelope hash to act as a unique id in de-duplication } // Options specifies the exact way a message should be wrapped into an Envelope. diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go index 9d8a739b7..52e70e041 100644 --- a/xeth/whisper_filter.go +++ b/xeth/whisper_filter.go @@ -1,26 +1,84 @@ // Contains the external API side message filter for watching, pooling and polling -// matched whisper messages. +// matched whisper messages, also serializing data access to avoid duplications. package xeth -import "time" +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) // whisperFilter is the message cache matching a specific filter, accumulating // inbound messages until the are requested by the client. type whisperFilter struct { - id int // Filter identifier - cache []WhisperMessage // Cache of messages not yet polled - timeout time.Time // Time when the last message batch was queries + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages } // insert injects a new batch of messages into the filter cache. -func (w *whisperFilter) insert(msgs ...WhisperMessage) { - w.cache = append(w.cache, msgs...) +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } } // retrieve fetches all the cached messages from the filter. func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + messages, w.cache = w.cache, nil - w.timeout = time.Now() + w.update = time.Now() + return } + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} diff --git a/xeth/xeth.go b/xeth/xeth.go index e7e553036..8cc32c958 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { p.messages[id].insert(msg) } id = p.Whisper().Watch(to, from, topics, callback) - p.messages[id] = &whisperFilter{timeout: time.Now()} + p.messages[id] = newWhisperFilter(id, p.Whisper()) return id } @@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { if self.messages[id] != nil { return self.messages[id].retrieve() } + return nil +} + +func (self *XEth) Messages(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + if self.messages[id] != nil { + return self.messages[id].messages() + } return nil } |