diff options
Diffstat (limited to 'whisper/whisper.go')
-rw-r--r-- | whisper/whisper.go | 57 |
1 files changed, 29 insertions, 28 deletions
diff --git a/whisper/whisper.go b/whisper/whisper.go index 9317fad50..a48e1e380 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -58,6 +58,8 @@ type Whisper struct { quit chan struct{} } +// New creates a Whisper client ready to communicate through the Ethereum P2P +// network. func New() *Whisper { whisper := &Whisper{ filters: filter.New(), @@ -116,11 +118,11 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. func (self *Whisper) Watch(options Filter) int { - filter := filter.Generic{ - Str1: string(crypto.FromECDSAPub(options.To)), - Str2: string(crypto.FromECDSAPub(options.From)), - Data: newTopicSet(options.Topics), - Fn: func(data interface{}) { + filter := filterer{ + to: string(crypto.FromECDSAPub(options.To)), + from: string(crypto.FromECDSAPub(options.From)), + matcher: newTopicMatcher(options.Topics...), + fn: func(data interface{}) { options.Fn(data.(*Message)) }, } @@ -148,7 +150,7 @@ func (self *Whisper) Stop() { glog.V(logger.Info).Infoln("Whisper stopped") } -// Messages retrieves the currently pooled messages matching a filter id. +// Messages retrieves all the currently pooled messages matching a filter id. func (self *Whisper) Messages(id int) []*Message { messages := make([]*Message, 0) if filter := self.filters.Get(id); filter != nil { @@ -163,27 +165,12 @@ func (self *Whisper) Messages(id int) []*Message { return messages } -// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { -// k := string(crypto.FromECDSAPub(key)) -// if _, ok := self.keys[k]; ok { -// delete(self.keys, k) -// return true -// } -// return false -// } - // handlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - // Create, initialize and start the whisper peer - whisperPeer, err := newPeer(self, peer, rw) - if err != nil { - return err - } - whisperPeer.start() - defer whisperPeer.stop() + // Create the new peer and start tracking it + whisperPeer := newPeer(self, peer, rw) - // Start tracking the active peer self.peerMu.Lock() self.peers[whisperPeer] = struct{}{} self.peerMu.Unlock() @@ -193,6 +180,14 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { delete(self.peers, whisperPeer) self.peerMu.Unlock() }() + + // Run the peer handshake and state updates + if err := whisperPeer.handshake(); err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + // Read and process inbound messages directly to merge into client-global state for { // Fetch the next packet and decode the contained envelopes @@ -267,9 +262,11 @@ func (self *Whisper) open(envelope *Envelope) *Message { // Iterate over the keys and try to decrypt the message for _, key := range self.keys { message, err := envelope.Open(key) - if err == nil || err == ecies.ErrInvalidPublicKey { + if err == nil { message.To = &key.PublicKey return message + } else if err == ecies.ErrInvalidPublicKey { + return message } } // Failed to decrypt, don't return anything @@ -278,10 +275,14 @@ func (self *Whisper) open(envelope *Envelope) *Message { // createFilter creates a message filter to check against installed handlers. func createFilter(message *Message, topics []Topic) filter.Filter { - return filter.Generic{ - Str1: string(crypto.FromECDSAPub(message.To)), - Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: newTopicSet(topics), + matcher := make([][]Topic, len(topics)) + for i, topic := range topics { + matcher[i] = []Topic{topic} + } + return filterer{ + to: string(crypto.FromECDSAPub(message.To)), + from: string(crypto.FromECDSAPub(message.Recover())), + matcher: newTopicMatcher(matcher...), } } |