From 3563c59b12b0b8b5fd15847bf97d71dfd8416207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 17 Apr 2015 16:45:44 +0300 Subject: rpc, whisper, xeth: polish whisper RPC interface --- xeth/whisper.go | 135 +++++++++++++++++++----------------------------- xeth/whisper_filter.go | 26 ++++++++++ xeth/whisper_message.go | 31 +++++++++++ xeth/xeth.go | 27 +++------- 4 files changed, 116 insertions(+), 103 deletions(-) create mode 100644 xeth/whisper_filter.go create mode 100644 xeth/whisper_message.go (limited to 'xeth') diff --git a/xeth/whisper.go b/xeth/whisper.go index 342910b5c..386897f39 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -1,7 +1,9 @@ +// Contains the external API to the whisper sub-protocol. + package xeth import ( - "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -12,109 +14,78 @@ import ( var qlogger = logger.NewLogger("XSHH") +// Whisper represents the API wrapper around the internal whisper implementation. type Whisper struct { *whisper.Whisper } +// NewWhisper wraps an internal whisper client into an external API version. func NewWhisper(w *whisper.Whisper) *Whisper { return &Whisper{w} } -func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { - if priority == 0 { - priority = 1000 - } - - if ttl == 0 { - ttl = 100 - } - - pk := crypto.ToECDSAPub(common.FromHex(from)) - if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 { - msg := whisper.NewMessage(common.FromHex(payload)) - envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{ - TTL: time.Duration(ttl) * time.Second, - To: crypto.ToECDSAPub(common.FromHex(to)), - From: key, - Topics: whisper.NewTopicsFromStrings(topics...), - }) - - if err != nil { - return err - } - - if err := self.Whisper.Send(envelope); err != nil { - return err - } - } else { - return errors.New("unmatched pub / priv for seal") - } - - return nil -} - +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() string { - key := self.Whisper.NewIdentity() - - return common.ToHex(crypto.FromECDSAPub(&key.PublicKey)) + identity := self.Whisper.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)) } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSAPub(common.FromHex(key))) } -// func (self *Whisper) RemoveIdentity(key string) bool { -// return self.Whisper.RemoveIdentity(crypto.ToECDSAPub(common.FromHex(key))) -// } - -func (self *Whisper) Watch(opts *Options) int { - filter := whisper.Filter{ - To: crypto.ToECDSAPub(common.FromHex(opts.To)), - From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.NewTopicsFromStrings(opts.Topics...), +// Post injects a message into the whisper network for distribution. +func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Construct the whisper message and transmission options + message := whisper.NewMessage(common.FromHex(payload)) + options := whisper.Options{ + To: crypto.ToECDSAPub(common.FromHex(to)), + TTL: time.Duration(ttl) * time.Second, + Topics: whisper.NewTopicsFromStrings(topics...), } - - var i int - filter.Fn = func(msg *whisper.Message) { - opts.Fn(NewWhisperMessage(msg)) + if len(from) != 0 { + if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { + options.From = key + } else { + return fmt.Errorf("unknown identity to send from: %s", from) + } } - - i = self.Whisper.Watch(filter) - - return i -} - -func (self *Whisper) Messages(id int) (messages []WhisperMessage) { - msgs := self.Whisper.Messages(id) - messages = make([]WhisperMessage, len(msgs)) - for i, message := range msgs { - messages[i] = NewWhisperMessage(message) + // Wrap and send the message + pow := time.Duration(priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return err } - - return + if err := self.Whisper.Send(envelope); err != nil { + return err + } + return nil } -type Options struct { - To string - From string - Topics []string - Fn func(msg WhisperMessage) +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(to, from string, topics []string, fn func(WhisperMessage)) int { + filter := whisper.Filter{ + To: crypto.ToECDSAPub(common.FromHex(to)), + From: crypto.ToECDSAPub(common.FromHex(from)), + Topics: whisper.NewTopicsFromStrings(topics...), + } + filter.Fn = func(message *whisper.Message) { + fn(NewWhisperMessage(message)) + } + return self.Whisper.Watch(filter) } -type WhisperMessage struct { - ref *whisper.Message - Payload string `json:"payload"` - To string `json:"to"` - From string `json:"from"` - Sent int64 `json:"sent"` -} +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []WhisperMessage { + pool := self.Whisper.Messages(id) -func NewWhisperMessage(msg *whisper.Message) WhisperMessage { - return WhisperMessage{ - ref: msg, - Payload: common.ToHex(msg.Payload), - From: common.ToHex(crypto.FromECDSAPub(msg.Recover())), - To: common.ToHex(crypto.FromECDSAPub(msg.To)), - Sent: msg.Sent, + messages := make([]WhisperMessage, len(pool)) + for i, message := range pool { + messages[i] = NewWhisperMessage(message) } + return messages } diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go new file mode 100644 index 000000000..9d8a739b7 --- /dev/null +++ b/xeth/whisper_filter.go @@ -0,0 +1,26 @@ +// Contains the external API side message filter for watching, pooling and polling +// matched whisper messages. + +package xeth + +import "time" + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier + cache []WhisperMessage // Cache of messages not yet polled + timeout time.Time // Time when the last message batch was queries +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(msgs ...WhisperMessage) { + w.cache = append(w.cache, msgs...) +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + messages, w.cache = w.cache, nil + w.timeout = time.Now() + return +} diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go new file mode 100644 index 000000000..14796cfbc --- /dev/null +++ b/xeth/whisper_message.go @@ -0,0 +1,31 @@ +// Contains the external API representation of a whisper message. + +package xeth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/whisper" +) + +// WhisperMessage is the external API representation of a whisper.Message. +type WhisperMessage struct { + ref *whisper.Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *whisper.Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent, + } +} diff --git a/xeth/xeth.go b/xeth/xeth.go index 693acb910..e7e553036 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,14 +452,15 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { var id int - opts.Fn = func(msg WhisperMessage) { + callback := func(msg WhisperMessage) { p.messagesMut.Lock() defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) + id = p.Whisper().Watch(to, from, topics, callback) p.messages[id] = &whisperFilter{timeout: time.Now()} return id } @@ -478,7 +479,7 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { defer self.messagesMut.Unlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].retrieve() } return nil @@ -731,22 +732,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time -- cgit v1.2.3 From 7948cc0029db76557d6540341bdfeb818ce32c65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 20 Apr 2015 14:56:38 +0300 Subject: rpc, whisper, xeth: fix RPC message retrieval data race --- xeth/whisper_filter.go | 74 ++++++++++++++++++++++++++++++++++++++++++++------ xeth/xeth.go | 13 +++++++-- 2 files changed, 77 insertions(+), 10 deletions(-) (limited to 'xeth') diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go index 9d8a739b7..52e70e041 100644 --- a/xeth/whisper_filter.go +++ b/xeth/whisper_filter.go @@ -1,26 +1,84 @@ // Contains the external API side message filter for watching, pooling and polling -// matched whisper messages. +// matched whisper messages, also serializing data access to avoid duplications. package xeth -import "time" +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) // whisperFilter is the message cache matching a specific filter, accumulating // inbound messages until the are requested by the client. type whisperFilter struct { - id int // Filter identifier - cache []WhisperMessage // Cache of messages not yet polled - timeout time.Time // Time when the last message batch was queries + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages } // insert injects a new batch of messages into the filter cache. -func (w *whisperFilter) insert(msgs ...WhisperMessage) { - w.cache = append(w.cache, msgs...) +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } } // retrieve fetches all the cached messages from the filter. func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + messages, w.cache = w.cache, nil - w.timeout = time.Now() + w.update = time.Now() + return } + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} diff --git a/xeth/xeth.go b/xeth/xeth.go index e7e553036..8cc32c958 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { p.messages[id].insert(msg) } id = p.Whisper().Watch(to, from, topics, callback) - p.messages[id] = &whisperFilter{timeout: time.Now()} + p.messages[id] = newWhisperFilter(id, p.Whisper()) return id } @@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { if self.messages[id] != nil { return self.messages[id].retrieve() } + return nil +} + +func (self *XEth) Messages(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + if self.messages[id] != nil { + return self.messages[id].messages() + } return nil } -- cgit v1.2.3 From 7f48eb8737878e352a65475382532db26f9fbc52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Apr 2015 11:43:11 +0300 Subject: whisper, xeth/whisper: surface TTL and hash to the API --- xeth/whisper_message.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'xeth') diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go index 14796cfbc..c8195cec1 100644 --- a/xeth/whisper_message.go +++ b/xeth/whisper_message.go @@ -3,6 +3,8 @@ package xeth import ( + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/whisper" @@ -16,6 +18,8 @@ type WhisperMessage struct { To string `json:"to"` From string `json:"from"` Sent int64 `json:"sent"` + TTL int64 `json:"ttl"` + Hash string `json:"hash"` } // NewWhisperMessage converts an internal message into an API version. @@ -26,6 +30,8 @@ func NewWhisperMessage(message *whisper.Message) WhisperMessage { Payload: common.ToHex(message.Payload), From: common.ToHex(crypto.FromECDSAPub(message.Recover())), To: common.ToHex(crypto.FromECDSAPub(message.To)), - Sent: message.Sent, + Sent: message.Sent.Unix(), + TTL: int64(message.TTL / time.Second), + Hash: common.ToHex(message.Hash.Bytes()), } } -- cgit v1.2.3 From ae4bfc3cfb3f1debad9dd0211950ce09038ffa90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Apr 2015 18:31:08 +0300 Subject: rpc, ui/qt/qwhisper, whisper, xeth: introduce complex topic filters --- xeth/whisper.go | 4 ++-- xeth/xeth.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'xeth') diff --git a/xeth/whisper.go b/xeth/whisper.go index 386897f39..25c4af3b1 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -67,11 +67,11 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. -func (self *Whisper) Watch(to, from string, topics []string, fn func(WhisperMessage)) int { +func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMessage)) int { filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(to)), From: crypto.ToECDSAPub(common.FromHex(from)), - Topics: whisper.NewTopicsFromStrings(topics...), + Topics: whisper.NewTopicFilterFromStrings(topics...), } filter.Fn = func(message *whisper.Message) { fn(NewWhisperMessage(message)) diff --git a/xeth/xeth.go b/xeth/xeth.go index 8cc32c958..ea6ae9950 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,7 +452,7 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { +func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { var id int callback := func(msg WhisperMessage) { p.messagesMut.Lock() -- cgit v1.2.3 From db615a85ec000dab7f73a6d4b1b46428ba4acdee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 12:50:48 +0300 Subject: ui/qt/qwhisper, whisper, xeth: polish topic filter, fix wildcards --- xeth/whisper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'xeth') diff --git a/xeth/whisper.go b/xeth/whisper.go index 25c4af3b1..36c6ca63f 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -71,7 +71,7 @@ func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMe filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(to)), From: crypto.ToECDSAPub(common.FromHex(from)), - Topics: whisper.NewTopicFilterFromStrings(topics...), + Topics: whisper.NewFilterTopicsFromStrings(topics...), } filter.Fn = func(message *whisper.Message) { fn(NewWhisperMessage(message)) -- cgit v1.2.3 From 70ded4cbf06d19993d829d843a27002cf181c619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 17:25:54 +0300 Subject: xeth: fix un-decoded whisper RPC topic string bug --- xeth/whisper.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'xeth') diff --git a/xeth/whisper.go b/xeth/whisper.go index 36c6ca63f..edb62c748 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -39,12 +39,17 @@ func (self *Whisper) HasIdentity(key string) bool { // Post injects a message into the whisper network for distribution. func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Decode the topic strings + topicsDecoded := make([][]byte, len(topics)) + for i, topic := range topics { + topicsDecoded[i] = common.FromHex(topic) + } // Construct the whisper message and transmission options message := whisper.NewMessage(common.FromHex(payload)) options := whisper.Options{ To: crypto.ToECDSAPub(common.FromHex(to)), TTL: time.Duration(ttl) * time.Second, - Topics: whisper.NewTopicsFromStrings(topics...), + Topics: whisper.NewTopics(topicsDecoded...), } if len(from) != 0 { if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { @@ -68,10 +73,19 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMessage)) int { + // Decode the topic strings + topicsDecoded := make([][][]byte, len(topics)) + for i, condition := range topics { + topicsDecoded[i] = make([][]byte, len(condition)) + for j, topic := range condition { + topicsDecoded[i][j] = common.FromHex(topic) + } + } + // Assemble and inject the filter into the whisper client filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(to)), From: crypto.ToECDSAPub(common.FromHex(from)), - Topics: whisper.NewFilterTopicsFromStrings(topics...), + Topics: whisper.NewFilterTopics(topicsDecoded...), } filter.Fn = func(message *whisper.Message) { fn(NewWhisperMessage(message)) -- cgit v1.2.3 From 978ffd3097242a5faeb7b23b9c72590170004dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 18:35:50 +0300 Subject: rpc, xeth: finish cleaning up xeth --- xeth/xeth.go | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) (limited to 'xeth') diff --git a/xeth/xeth.go b/xeth/xeth.go index ea6ae9950..e4040d9d8 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,44 +452,60 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } +// NewWhisperFilter creates and registers a new message filter to watch for +// inbound whisper messages. All parameters at this point are assumed to be +// HEX encoded. func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { + // Pre-define the id to be filled later var id int - callback := func(msg WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() + // Callback to delegate core whisper messages to this xeth filter + callback := func(msg WhisperMessage) { + p.messagesMut.RLock() // Only read lock to the filter pool + defer p.messagesMut.RUnlock() p.messages[id].insert(msg) } + // Initialize the core whisper filter and wrap into xeth id = p.Whisper().Watch(to, from, topics, callback) + + p.messagesMut.Lock() p.messages[id] = newWhisperFilter(id, p.Whisper()) + p.messagesMut.Unlock() + return id } +// UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + if _, ok := p.messages[id]; ok { delete(p.messages, id) return true } - return false } -func (self *XEth) MessagesChanged(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessages retrieves all the known messages that match a specific filter. +func (self *XEth) WhisperMessages(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].retrieve() + return self.messages[id].messages() } return nil } -func (self *XEth) Messages(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessagesChanged retrieves all the new messages matched by a filter +// since the last retrieval +func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].messages() + return self.messages[id].retrieve() } return nil } -- cgit v1.2.3