diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-28 16:54:05 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-28 16:54:05 +0800 |
commit | a05c420371aa56657b86ba3dce6ebb087adb708d (patch) | |
tree | 3e0853dcff756c503764d0f73c1757c585be2aa9 /xeth | |
parent | 182d484aa70bcd5b22117f02333b1fd3b1535dcb (diff) | |
parent | 978ffd3097242a5faeb7b23b9c72590170004dc6 (diff) | |
download | go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.gz go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.bz2 go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.lz go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.xz go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.zst go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.zip |
Merge pull request #738 from karalabe/whisper-cleanup
Whisper cleanup, part 3
Diffstat (limited to 'xeth')
-rw-r--r-- | xeth/whisper.go | 149 | ||||
-rw-r--r-- | xeth/whisper_filter.go | 84 | ||||
-rw-r--r-- | xeth/whisper_message.go | 37 | ||||
-rw-r--r-- | xeth/xeth.go | 68 |
4 files changed, 227 insertions, 111 deletions
diff --git a/xeth/whisper.go b/xeth/whisper.go index 342910b5c..edb62c748 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -1,7 +1,9 @@ +// Contains the external API to the whisper sub-protocol. + package xeth import ( - "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -12,109 +14,92 @@ import ( var qlogger = logger.NewLogger("XSHH") +// Whisper represents the API wrapper around the internal whisper implementation. type Whisper struct { *whisper.Whisper } +// NewWhisper wraps an internal whisper client into an external API version. func NewWhisper(w *whisper.Whisper) *Whisper { return &Whisper{w} } -func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { - if priority == 0 { - priority = 1000 - } - - if ttl == 0 { - ttl = 100 - } - - pk := crypto.ToECDSAPub(common.FromHex(from)) - if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 { - msg := whisper.NewMessage(common.FromHex(payload)) - envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{ - TTL: time.Duration(ttl) * time.Second, - To: crypto.ToECDSAPub(common.FromHex(to)), - From: key, - Topics: whisper.NewTopicsFromStrings(topics...), - }) - - if err != nil { - return err - } - - if err := self.Whisper.Send(envelope); err != nil { - return err - } - } else { - return errors.New("unmatched pub / priv for seal") - } - - return nil -} - +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() string { - key := self.Whisper.NewIdentity() - - return common.ToHex(crypto.FromECDSAPub(&key.PublicKey)) + identity := self.Whisper.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)) } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSAPub(common.FromHex(key))) } -// func (self *Whisper) RemoveIdentity(key string) bool { -// return self.Whisper.RemoveIdentity(crypto.ToECDSAPub(common.FromHex(key))) -// } - -func (self *Whisper) Watch(opts *Options) int { - filter := whisper.Filter{ - To: crypto.ToECDSAPub(common.FromHex(opts.To)), - From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.NewTopicsFromStrings(opts.Topics...), +// Post injects a message into the whisper network for distribution. +func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Decode the topic strings + topicsDecoded := make([][]byte, len(topics)) + for i, topic := range topics { + topicsDecoded[i] = common.FromHex(topic) } - - var i int - filter.Fn = func(msg *whisper.Message) { - opts.Fn(NewWhisperMessage(msg)) + // Construct the whisper message and transmission options + message := whisper.NewMessage(common.FromHex(payload)) + options := whisper.Options{ + To: crypto.ToECDSAPub(common.FromHex(to)), + TTL: time.Duration(ttl) * time.Second, + Topics: whisper.NewTopics(topicsDecoded...), } - - i = self.Whisper.Watch(filter) - - return i -} - -func (self *Whisper) Messages(id int) (messages []WhisperMessage) { - msgs := self.Whisper.Messages(id) - messages = make([]WhisperMessage, len(msgs)) - for i, message := range msgs { - messages[i] = NewWhisperMessage(message) + if len(from) != 0 { + if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { + options.From = key + } else { + return fmt.Errorf("unknown identity to send from: %s", from) + } } - - return + // Wrap and send the message + pow := time.Duration(priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return err + } + if err := self.Whisper.Send(envelope); err != nil { + return err + } + return nil } -type Options struct { - To string - From string - Topics []string - Fn func(msg WhisperMessage) +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMessage)) int { + // Decode the topic strings + topicsDecoded := make([][][]byte, len(topics)) + for i, condition := range topics { + topicsDecoded[i] = make([][]byte, len(condition)) + for j, topic := range condition { + topicsDecoded[i][j] = common.FromHex(topic) + } + } + // Assemble and inject the filter into the whisper client + filter := whisper.Filter{ + To: crypto.ToECDSAPub(common.FromHex(to)), + From: crypto.ToECDSAPub(common.FromHex(from)), + Topics: whisper.NewFilterTopics(topicsDecoded...), + } + filter.Fn = func(message *whisper.Message) { + fn(NewWhisperMessage(message)) + } + return self.Whisper.Watch(filter) } -type WhisperMessage struct { - ref *whisper.Message - Payload string `json:"payload"` - To string `json:"to"` - From string `json:"from"` - Sent int64 `json:"sent"` -} +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []WhisperMessage { + pool := self.Whisper.Messages(id) -func NewWhisperMessage(msg *whisper.Message) WhisperMessage { - return WhisperMessage{ - ref: msg, - Payload: common.ToHex(msg.Payload), - From: common.ToHex(crypto.FromECDSAPub(msg.Recover())), - To: common.ToHex(crypto.FromECDSAPub(msg.To)), - Sent: msg.Sent, + messages := make([]WhisperMessage, len(pool)) + for i, message := range pool { + messages[i] = NewWhisperMessage(message) } + return messages } diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go new file mode 100644 index 000000000..52e70e041 --- /dev/null +++ b/xeth/whisper_filter.go @@ -0,0 +1,84 @@ +// Contains the external API side message filter for watching, pooling and polling +// matched whisper messages, also serializing data access to avoid duplications. + +package xeth + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + messages, w.cache = w.cache, nil + w.update = time.Now() + + return +} + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go new file mode 100644 index 000000000..c8195cec1 --- /dev/null +++ b/xeth/whisper_message.go @@ -0,0 +1,37 @@ +// Contains the external API representation of a whisper message. + +package xeth + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/whisper" +) + +// WhisperMessage is the external API representation of a whisper.Message. +type WhisperMessage struct { + ref *whisper.Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` + TTL int64 `json:"ttl"` + Hash string `json:"hash"` +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *whisper.Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent.Unix(), + TTL: int64(message.TTL / time.Second), + Hash: common.ToHex(message.Hash.Bytes()), + } +} diff --git a/xeth/xeth.go b/xeth/xeth.go index 693acb910..e4040d9d8 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -452,35 +452,61 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +// NewWhisperFilter creates and registers a new message filter to watch for +// inbound whisper messages. All parameters at this point are assumed to be +// HEX encoded. +func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { + // Pre-define the id to be filled later var id int - opts.Fn = func(msg WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + // Callback to delegate core whisper messages to this xeth filter + callback := func(msg WhisperMessage) { + p.messagesMut.RLock() // Only read lock to the filter pool + defer p.messagesMut.RUnlock() + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) - p.messages[id] = &whisperFilter{timeout: time.Now()} + // Initialize the core whisper filter and wrap into xeth + id = p.Whisper().Watch(to, from, topics, callback) + + p.messagesMut.Lock() + p.messages[id] = newWhisperFilter(id, p.Whisper()) + p.messagesMut.Unlock() + return id } +// UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + if _, ok := p.messages[id]; ok { delete(p.messages, id) return true } - return false } -func (self *XEth) MessagesChanged(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessages retrieves all the known messages that match a specific filter. +func (self *XEth) WhisperMessages(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].messages() } + return nil +} + +// WhisperMessagesChanged retrieves all the new messages matched by a filter +// since the last retrieval +func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() + if self.messages[id] != nil { + return self.messages[id].retrieve() + } return nil } @@ -731,22 +757,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time |