aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisper.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisper.go')
-rw-r--r--whisper/whisper.go57
1 files changed, 29 insertions, 28 deletions
diff --git a/whisper/whisper.go b/whisper/whisper.go
index 9317fad50..a48e1e380 100644
--- a/whisper/whisper.go
+++ b/whisper/whisper.go
@@ -58,6 +58,8 @@ type Whisper struct {
quit chan struct{}
}
+// New creates a Whisper client ready to communicate through the Ethereum P2P
+// network.
func New() *Whisper {
whisper := &Whisper{
filters: filter.New(),
@@ -116,11 +118,11 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
// Watch installs a new message handler to run in case a matching packet arrives
// from the whisper network.
func (self *Whisper) Watch(options Filter) int {
- filter := filter.Generic{
- Str1: string(crypto.FromECDSAPub(options.To)),
- Str2: string(crypto.FromECDSAPub(options.From)),
- Data: newTopicSet(options.Topics),
- Fn: func(data interface{}) {
+ filter := filterer{
+ to: string(crypto.FromECDSAPub(options.To)),
+ from: string(crypto.FromECDSAPub(options.From)),
+ matcher: newTopicMatcher(options.Topics...),
+ fn: func(data interface{}) {
options.Fn(data.(*Message))
},
}
@@ -148,7 +150,7 @@ func (self *Whisper) Stop() {
glog.V(logger.Info).Infoln("Whisper stopped")
}
-// Messages retrieves the currently pooled messages matching a filter id.
+// Messages retrieves all the currently pooled messages matching a filter id.
func (self *Whisper) Messages(id int) []*Message {
messages := make([]*Message, 0)
if filter := self.filters.Get(id); filter != nil {
@@ -163,27 +165,12 @@ func (self *Whisper) Messages(id int) []*Message {
return messages
}
-// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
-// k := string(crypto.FromECDSAPub(key))
-// if _, ok := self.keys[k]; ok {
-// delete(self.keys, k)
-// return true
-// }
-// return false
-// }
-
// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
// connection is negotiated.
func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
- // Create, initialize and start the whisper peer
- whisperPeer, err := newPeer(self, peer, rw)
- if err != nil {
- return err
- }
- whisperPeer.start()
- defer whisperPeer.stop()
+ // Create the new peer and start tracking it
+ whisperPeer := newPeer(self, peer, rw)
- // Start tracking the active peer
self.peerMu.Lock()
self.peers[whisperPeer] = struct{}{}
self.peerMu.Unlock()
@@ -193,6 +180,14 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
delete(self.peers, whisperPeer)
self.peerMu.Unlock()
}()
+
+ // Run the peer handshake and state updates
+ if err := whisperPeer.handshake(); err != nil {
+ return err
+ }
+ whisperPeer.start()
+ defer whisperPeer.stop()
+
// Read and process inbound messages directly to merge into client-global state
for {
// Fetch the next packet and decode the contained envelopes
@@ -267,9 +262,11 @@ func (self *Whisper) open(envelope *Envelope) *Message {
// Iterate over the keys and try to decrypt the message
for _, key := range self.keys {
message, err := envelope.Open(key)
- if err == nil || err == ecies.ErrInvalidPublicKey {
+ if err == nil {
message.To = &key.PublicKey
return message
+ } else if err == ecies.ErrInvalidPublicKey {
+ return message
}
}
// Failed to decrypt, don't return anything
@@ -278,10 +275,14 @@ func (self *Whisper) open(envelope *Envelope) *Message {
// createFilter creates a message filter to check against installed handlers.
func createFilter(message *Message, topics []Topic) filter.Filter {
- return filter.Generic{
- Str1: string(crypto.FromECDSAPub(message.To)),
- Str2: string(crypto.FromECDSAPub(message.Recover())),
- Data: newTopicSet(topics),
+ matcher := make([][]Topic, len(topics))
+ for i, topic := range topics {
+ matcher[i] = []Topic{topic}
+ }
+ return filterer{
+ to: string(crypto.FromECDSAPub(message.To)),
+ from: string(crypto.FromECDSAPub(message.Recover())),
+ matcher: newTopicMatcher(matcher...),
}
}