diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-04-17 21:45:44 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-04-28 15:47:35 +0800 |
commit | 3563c59b12b0b8b5fd15847bf97d71dfd8416207 (patch) | |
tree | f059a2bcbd7edb25c595bbad30514828fc1a5282 /xeth | |
parent | 182d484aa70bcd5b22117f02333b1fd3b1535dcb (diff) | |
download | dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.tar dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.tar.gz dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.tar.bz2 dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.tar.lz dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.tar.xz dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.tar.zst dexon-3563c59b12b0b8b5fd15847bf97d71dfd8416207.zip |
rpc, whisper, xeth: polish whisper RPC interface
Diffstat (limited to 'xeth')
-rw-r--r-- | xeth/whisper.go | 135 | ||||
-rw-r--r-- | xeth/whisper_filter.go | 26 | ||||
-rw-r--r-- | xeth/whisper_message.go | 31 | ||||
-rw-r--r-- | xeth/xeth.go | 27 |
4 files changed, 116 insertions, 103 deletions
diff --git a/xeth/whisper.go b/xeth/whisper.go index 342910b5c..386897f39 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -1,7 +1,9 @@ +// Contains the external API to the whisper sub-protocol. + package xeth import ( - "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -12,109 +14,78 @@ import ( var qlogger = logger.NewLogger("XSHH") +// Whisper represents the API wrapper around the internal whisper implementation. type Whisper struct { *whisper.Whisper } +// NewWhisper wraps an internal whisper client into an external API version. func NewWhisper(w *whisper.Whisper) *Whisper { return &Whisper{w} } -func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { - if priority == 0 { - priority = 1000 - } - - if ttl == 0 { - ttl = 100 - } - - pk := crypto.ToECDSAPub(common.FromHex(from)) - if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 { - msg := whisper.NewMessage(common.FromHex(payload)) - envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{ - TTL: time.Duration(ttl) * time.Second, - To: crypto.ToECDSAPub(common.FromHex(to)), - From: key, - Topics: whisper.NewTopicsFromStrings(topics...), - }) - - if err != nil { - return err - } - - if err := self.Whisper.Send(envelope); err != nil { - return err - } - } else { - return errors.New("unmatched pub / priv for seal") - } - - return nil -} - +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() string { - key := self.Whisper.NewIdentity() - - return common.ToHex(crypto.FromECDSAPub(&key.PublicKey)) + identity := self.Whisper.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)) } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSAPub(common.FromHex(key))) } -// func (self *Whisper) RemoveIdentity(key string) bool { -// return self.Whisper.RemoveIdentity(crypto.ToECDSAPub(common.FromHex(key))) -// } - -func (self *Whisper) Watch(opts *Options) int { - filter := whisper.Filter{ - To: crypto.ToECDSAPub(common.FromHex(opts.To)), - From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.NewTopicsFromStrings(opts.Topics...), +// Post injects a message into the whisper network for distribution. +func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Construct the whisper message and transmission options + message := whisper.NewMessage(common.FromHex(payload)) + options := whisper.Options{ + To: crypto.ToECDSAPub(common.FromHex(to)), + TTL: time.Duration(ttl) * time.Second, + Topics: whisper.NewTopicsFromStrings(topics...), } - - var i int - filter.Fn = func(msg *whisper.Message) { - opts.Fn(NewWhisperMessage(msg)) + if len(from) != 0 { + if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { + options.From = key + } else { + return fmt.Errorf("unknown identity to send from: %s", from) + } } - - i = self.Whisper.Watch(filter) - - return i -} - -func (self *Whisper) Messages(id int) (messages []WhisperMessage) { - msgs := self.Whisper.Messages(id) - messages = make([]WhisperMessage, len(msgs)) - for i, message := range msgs { - messages[i] = NewWhisperMessage(message) + // Wrap and send the message + pow := time.Duration(priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return err } - - return + if err := self.Whisper.Send(envelope); err != nil { + return err + } + return nil } -type Options struct { - To string - From string - Topics []string - Fn func(msg WhisperMessage) +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(to, from string, topics []string, fn func(WhisperMessage)) int { + filter := whisper.Filter{ + To: crypto.ToECDSAPub(common.FromHex(to)), + From: crypto.ToECDSAPub(common.FromHex(from)), + Topics: whisper.NewTopicsFromStrings(topics...), + } + filter.Fn = func(message *whisper.Message) { + fn(NewWhisperMessage(message)) + } + return self.Whisper.Watch(filter) } -type WhisperMessage struct { - ref *whisper.Message - Payload string `json:"payload"` - To string `json:"to"` - From string `json:"from"` - Sent int64 `json:"sent"` -} +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []WhisperMessage { + pool := self.Whisper.Messages(id) -func NewWhisperMessage(msg *whisper.Message) WhisperMessage { - return WhisperMessage{ - ref: msg, - Payload: common.ToHex(msg.Payload), - From: common.ToHex(crypto.FromECDSAPub(msg.Recover())), - To: common.ToHex(crypto.FromECDSAPub(msg.To)), - Sent: msg.Sent, + messages := make([]WhisperMessage, len(pool)) + for i, message := range pool { + messages[i] = NewWhisperMessage(message) } + return messages } diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go new file mode 100644 index 000000000..9d8a739b7 --- /dev/null +++ b/xeth/whisper_filter.go @@ -0,0 +1,26 @@ +// Contains the external API side message filter for watching, pooling and polling +// matched whisper messages. + +package xeth + +import "time" + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier + cache []WhisperMessage // Cache of messages not yet polled + timeout time.Time // Time when the last message batch was queries +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(msgs ...WhisperMessage) { + w.cache = append(w.cache, msgs...) +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + messages, w.cache = w.cache, nil + w.timeout = time.Now() + return +} diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go new file mode 100644 index 000000000..14796cfbc --- /dev/null +++ b/xeth/whisper_message.go @@ -0,0 +1,31 @@ +// Contains the external API representation of a whisper message. + +package xeth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/whisper" +) + +// WhisperMessage is the external API representation of a whisper.Message. +type WhisperMessage struct { + ref *whisper.Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *whisper.Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent, + } +} diff --git a/xeth/xeth.go b/xeth/xeth.go index 693acb910..e7e553036 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,14 +452,15 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { var id int - opts.Fn = func(msg WhisperMessage) { + callback := func(msg WhisperMessage) { p.messagesMut.Lock() defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) + id = p.Whisper().Watch(to, from, topics, callback) p.messages[id] = &whisperFilter{timeout: time.Now()} return id } @@ -478,7 +479,7 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { defer self.messagesMut.Unlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].retrieve() } return nil @@ -731,22 +732,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time |