From 3563c59b12b0b8b5fd15847bf97d71dfd8416207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 17 Apr 2015 16:45:44 +0300 Subject: rpc, whisper, xeth: polish whisper RPC interface --- rpc/api.go | 17 +++--- whisper/whisper.go | 13 ++--- xeth/whisper.go | 135 +++++++++++++++++++----------------------------- xeth/whisper_filter.go | 26 ++++++++++ xeth/whisper_message.go | 31 +++++++++++ xeth/xeth.go | 27 +++------- 6 files changed, 129 insertions(+), 120 deletions(-) create mode 100644 xeth/whisper_filter.go create mode 100644 xeth/whisper_message.go diff --git a/rpc/api.go b/rpc/api.go index 5930a4c7b..614e08764 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -408,18 +408,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = newHexData(res) case "shh_version": *reply = api.xeth().WhisperVersion() + case "shh_post": args := new(WhisperMessageArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - err := api.xeth().Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl) if err != nil { return err } - *reply = true + case "shh_newIdentity": *reply = api.xeth().Whisper().NewIdentity() // case "shh_removeIdentity": @@ -434,32 +434,35 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err return err } *reply = api.xeth().Whisper().HasIdentity(args.Identity) + case "shh_newGroup", "shh_addToGroup": return NewNotImplementedError(req.Method) + case "shh_newFilter": args := new(WhisperFilterArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - opts := new(xeth.Options) - // opts.From = args.From - opts.To = args.To - opts.Topics = args.Topics - id := api.xeth().NewWhisperFilter(opts) + id := api.xeth().NewWhisperFilter(args.To, args.From, args.Topics) *reply = newHexNum(big.NewInt(int64(id)).Bytes()) + case "shh_uninstallFilter": args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().UninstallWhisperFilter(args.Id) + case "shh_getFilterChanges": + // Retrieve all the new messages arrived since the last request args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().MessagesChanged(args.Id) + case "shh_getMessages": + // Retrieve all the cached messages matching a specific, existing filter args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err diff --git a/whisper/whisper.go b/whisper/whisper.go index 9317fad50..59a1a63c4 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -58,6 +58,8 @@ type Whisper struct { quit chan struct{} } +// New creates a Whisper client ready to communicate through the Ethereum P2P +// network. func New() *Whisper { whisper := &Whisper{ filters: filter.New(), @@ -148,7 +150,7 @@ func (self *Whisper) Stop() { glog.V(logger.Info).Infoln("Whisper stopped") } -// Messages retrieves the currently pooled messages matching a filter id. +// Messages retrieves all the currently pooled messages matching a filter id. func (self *Whisper) Messages(id int) []*Message { messages := make([]*Message, 0) if filter := self.filters.Get(id); filter != nil { @@ -163,15 +165,6 @@ func (self *Whisper) Messages(id int) []*Message { return messages } -// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { -// k := string(crypto.FromECDSAPub(key)) -// if _, ok := self.keys[k]; ok { -// delete(self.keys, k) -// return true -// } -// return false -// } - // handlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/xeth/whisper.go b/xeth/whisper.go index 342910b5c..386897f39 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -1,7 +1,9 @@ +// Contains the external API to the whisper sub-protocol. + package xeth import ( - "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -12,109 +14,78 @@ import ( var qlogger = logger.NewLogger("XSHH") +// Whisper represents the API wrapper around the internal whisper implementation. type Whisper struct { *whisper.Whisper } +// NewWhisper wraps an internal whisper client into an external API version. func NewWhisper(w *whisper.Whisper) *Whisper { return &Whisper{w} } -func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { - if priority == 0 { - priority = 1000 - } - - if ttl == 0 { - ttl = 100 - } - - pk := crypto.ToECDSAPub(common.FromHex(from)) - if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 { - msg := whisper.NewMessage(common.FromHex(payload)) - envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{ - TTL: time.Duration(ttl) * time.Second, - To: crypto.ToECDSAPub(common.FromHex(to)), - From: key, - Topics: whisper.NewTopicsFromStrings(topics...), - }) - - if err != nil { - return err - } - - if err := self.Whisper.Send(envelope); err != nil { - return err - } - } else { - return errors.New("unmatched pub / priv for seal") - } - - return nil -} - +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() string { - key := self.Whisper.NewIdentity() - - return common.ToHex(crypto.FromECDSAPub(&key.PublicKey)) + identity := self.Whisper.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)) } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSAPub(common.FromHex(key))) } -// func (self *Whisper) RemoveIdentity(key string) bool { -// return self.Whisper.RemoveIdentity(crypto.ToECDSAPub(common.FromHex(key))) -// } - -func (self *Whisper) Watch(opts *Options) int { - filter := whisper.Filter{ - To: crypto.ToECDSAPub(common.FromHex(opts.To)), - From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.NewTopicsFromStrings(opts.Topics...), +// Post injects a message into the whisper network for distribution. +func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Construct the whisper message and transmission options + message := whisper.NewMessage(common.FromHex(payload)) + options := whisper.Options{ + To: crypto.ToECDSAPub(common.FromHex(to)), + TTL: time.Duration(ttl) * time.Second, + Topics: whisper.NewTopicsFromStrings(topics...), } - - var i int - filter.Fn = func(msg *whisper.Message) { - opts.Fn(NewWhisperMessage(msg)) + if len(from) != 0 { + if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { + options.From = key + } else { + return fmt.Errorf("unknown identity to send from: %s", from) + } } - - i = self.Whisper.Watch(filter) - - return i -} - -func (self *Whisper) Messages(id int) (messages []WhisperMessage) { - msgs := self.Whisper.Messages(id) - messages = make([]WhisperMessage, len(msgs)) - for i, message := range msgs { - messages[i] = NewWhisperMessage(message) + // Wrap and send the message + pow := time.Duration(priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return err } - - return + if err := self.Whisper.Send(envelope); err != nil { + return err + } + return nil } -type Options struct { - To string - From string - Topics []string - Fn func(msg WhisperMessage) +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(to, from string, topics []string, fn func(WhisperMessage)) int { + filter := whisper.Filter{ + To: crypto.ToECDSAPub(common.FromHex(to)), + From: crypto.ToECDSAPub(common.FromHex(from)), + Topics: whisper.NewTopicsFromStrings(topics...), + } + filter.Fn = func(message *whisper.Message) { + fn(NewWhisperMessage(message)) + } + return self.Whisper.Watch(filter) } -type WhisperMessage struct { - ref *whisper.Message - Payload string `json:"payload"` - To string `json:"to"` - From string `json:"from"` - Sent int64 `json:"sent"` -} +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []WhisperMessage { + pool := self.Whisper.Messages(id) -func NewWhisperMessage(msg *whisper.Message) WhisperMessage { - return WhisperMessage{ - ref: msg, - Payload: common.ToHex(msg.Payload), - From: common.ToHex(crypto.FromECDSAPub(msg.Recover())), - To: common.ToHex(crypto.FromECDSAPub(msg.To)), - Sent: msg.Sent, + messages := make([]WhisperMessage, len(pool)) + for i, message := range pool { + messages[i] = NewWhisperMessage(message) } + return messages } diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go new file mode 100644 index 000000000..9d8a739b7 --- /dev/null +++ b/xeth/whisper_filter.go @@ -0,0 +1,26 @@ +// Contains the external API side message filter for watching, pooling and polling +// matched whisper messages. + +package xeth + +import "time" + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier + cache []WhisperMessage // Cache of messages not yet polled + timeout time.Time // Time when the last message batch was queries +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(msgs ...WhisperMessage) { + w.cache = append(w.cache, msgs...) +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + messages, w.cache = w.cache, nil + w.timeout = time.Now() + return +} diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go new file mode 100644 index 000000000..14796cfbc --- /dev/null +++ b/xeth/whisper_message.go @@ -0,0 +1,31 @@ +// Contains the external API representation of a whisper message. + +package xeth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/whisper" +) + +// WhisperMessage is the external API representation of a whisper.Message. +type WhisperMessage struct { + ref *whisper.Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *whisper.Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent, + } +} diff --git a/xeth/xeth.go b/xeth/xeth.go index 693acb910..e7e553036 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,14 +452,15 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { var id int - opts.Fn = func(msg WhisperMessage) { + callback := func(msg WhisperMessage) { p.messagesMut.Lock() defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) + id = p.Whisper().Watch(to, from, topics, callback) p.messages[id] = &whisperFilter{timeout: time.Now()} return id } @@ -478,7 +479,7 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { defer self.messagesMut.Unlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].retrieve() } return nil @@ -731,22 +732,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time -- cgit v1.2.3