diff options
Diffstat (limited to 'whisper/whisper.go')
-rw-r--r-- | whisper/whisper.go | 248 |
1 files changed, 138 insertions, 110 deletions
diff --git a/whisper/whisper.go b/whisper/whisper.go index a4ec943e8..f51f14a9f 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -2,7 +2,6 @@ package whisper import ( "crypto/ecdsa" - "errors" "sync" "time" @@ -17,12 +16,16 @@ import ( ) const ( - statusMsg = 0x0 - envelopesMsg = 0x01 - whisperVersion = 0x02 + statusMsg = 0x00 + envelopesMsg = 0x01 + + protocolVersion uint64 = 0x02 + protocolName = "shh" signatureFlag = byte(1 << 7) signatureLength = 65 + + expirationTicks = 800 * time.Millisecond ) const ( @@ -42,9 +45,9 @@ type Whisper struct { protocol p2p.Protocol filters *filter.Filters - mmu sync.RWMutex - messages map[common.Hash]*Envelope - expiry map[uint32]*set.SetNonTS + mmu sync.RWMutex // Message mutex to sync the below pool + messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node + expiry map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) quit chan struct{} @@ -63,8 +66,8 @@ func New() *Whisper { // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ - Name: "shh", - Version: uint(whisperVersion), + Name: protocolName, + Version: uint(protocolVersion), Length: 2, Run: whisper.msgHandler, } @@ -72,42 +75,74 @@ func New() *Whisper { return whisper } -func (self *Whisper) Version() uint { - return self.protocol.Version -} - -func (self *Whisper) Start() { - glog.V(logger.Info).Infoln("Whisper started") - go self.update() -} - -func (self *Whisper) Stop() { - close(self.quit) +// Protocol returns the whisper sub-protocol handler for this particular client. +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol } -func (self *Whisper) Send(envelope *Envelope) error { - return self.add(envelope) +// Version returns the whisper sub-protocols version number. +func (self *Whisper) Version() uint { + return self.protocol.Version } +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { key, err := crypto.GenerateKey() if err != nil { panic(err) } - self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key return key } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { return self.keys[string(crypto.FromECDSAPub(key))] != nil } +// GetIdentity retrieves the private key of the specified public identity. func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { return self.keys[string(crypto.FromECDSAPub(key))] } +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(options Filter) int { + filter := filter.Generic{ + Str1: string(crypto.FromECDSAPub(options.To)), + Str2: string(crypto.FromECDSAPub(options.From)), + Data: NewTopicSet(options.Topics), + Fn: func(data interface{}) { + options.Fn(data.(*Message)) + }, + } + return self.filters.Install(filter) +} + +// Unwatch removes an installed message handler. +func (self *Whisper) Unwatch(id int) { + self.filters.Uninstall(id) +} + +// Send injects a message into the whisper send queue, to be distributed in the +// network in the coming cycles. +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + +func (self *Whisper) Start() { + glog.V(logger.Info).Infoln("Whisper started") + go self.update() +} + +func (self *Whisper) Stop() { + close(self.quit) + glog.V(logger.Info).Infoln("Whisper stopped") +} + // func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { // k := string(crypto.FromECDSAPub(key)) // if _, ok := self.keys[k]; ok { @@ -117,22 +152,7 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { // return false // } -func (self *Whisper) Watch(opts Filter) int { - return self.filters.Install(filter.Generic{ - Str1: string(crypto.FromECDSAPub(opts.To)), - Str2: string(crypto.FromECDSAPub(opts.From)), - Data: NewTopicSet(opts.Topics), - Fn: func(data interface{}) { - opts.Fn(data.(*Message)) - }, - }) -} - -func (self *Whisper) Unwatch(id int) { - self.filters.Uninstall(id) -} - -func (self *Whisper) Messages(id int) (messages []*Message) { +/*func (self *Whisper) Messages(id int) (messages []*Message) { filter := self.filters.Get(id) if filter != nil { for _, e := range self.messages { @@ -146,6 +166,36 @@ func (self *Whisper) Messages(id int) (messages []*Message) { } return +}*/ + +// add inserts a new envelope into the message pool to be distributed within the +// whisper network. It also inserts the envelope into the expiration pool at the +// appropriate time-stamp. +func (self *Whisper) add(envelope *Envelope) error { + self.mmu.Lock() + defer self.mmu.Unlock() + + // Insert the message into the tracked pool + hash := envelope.Hash() + if _, ok := self.messages[hash]; ok { + glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + return nil + } + self.messages[hash] = envelope + + // Insert the message into the expiration pool for later removal + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + if !self.expiry[envelope.Expiry].Has(hash) { + self.expiry[envelope.Expiry].Add(hash) + + // Notify the local node of a message arrival + go self.postEvent(envelope) + } + glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope) + + return nil } // Main handler for passing whisper messages to whisper peer objects @@ -182,53 +232,76 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { } } -// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. -func (self *Whisper) add(envelope *Envelope) error { - if !envelope.valid() { - return errors.New("invalid pow provided for envelope") +// postEvent opens an envelope with the configured identities and delivers the +// message upstream from application processing. +func (self *Whisper) postEvent(envelope *Envelope) { + if message := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics), message) } +} - self.mmu.Lock() - defer self.mmu.Unlock() - - hash := envelope.Hash() - self.messages[hash] = envelope - if self.expiry[envelope.Expiry] == nil { - self.expiry[envelope.Expiry] = set.NewNonTS() +// open tries to decrypt a whisper envelope with all the configured identities, +// returning the decrypted message and the key used to achieve it. If not keys +// are configured, open will return the payload as if non encrypted. +func (self *Whisper) open(envelope *Envelope) *Message { + // Short circuit if no identity is set, and assume clear-text + if len(self.keys) == 0 { + if message, err := envelope.Open(nil); err == nil { + return message + } } - - if !self.expiry[envelope.Expiry].Has(hash) { - self.expiry[envelope.Expiry].Add(hash) - go self.postEvent(envelope) + // Iterate over the keys and try to decrypt the message + for _, key := range self.keys { + message, err := envelope.Open(key) + if err == nil || err == ecies.ErrInvalidPublicKey { + message.To = &key.PublicKey + return message + } } - glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope) - + // Failed to decrypt, don't return anything return nil } +// createFilter creates a message filter to check against installed handlers. +func createFilter(message *Message, topics []Topic) filter.Filter { + return filter.Generic{ + Str1: string(crypto.FromECDSAPub(message.To)), + Str2: string(crypto.FromECDSAPub(message.Recover())), + Data: NewTopicSet(topics), + } +} + +// update loops until the lifetime of the whisper node, updating its internal +// state by expiring stale messages from the pool. func (self *Whisper) update() { - expire := time.NewTicker(800 * time.Millisecond) -out: + // Start a ticker to check for expirations + expire := time.NewTicker(expirationTicks) + + // Repeat updates until termination is requested for { select { case <-expire.C: self.expire() + case <-self.quit: - break out + return } } } +// expire iterates over all the expiration timestamps, removing all stale +// messages from the pools. func (self *Whisper) expire() { self.mmu.Lock() defer self.mmu.Unlock() now := uint32(time.Now().Unix()) for then, hashSet := range self.expiry { + // Short circuit if a future time if then > now { continue } - + // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { delete(self.messages, v.(common.Hash)) return true @@ -237,59 +310,14 @@ func (self *Whisper) expire() { } } -func (self *Whisper) envelopes() (envelopes []*Envelope) { +// envelopes retrieves all the messages currently pooled by the node. +func (self *Whisper) envelopes() []*Envelope { self.mmu.RLock() defer self.mmu.RUnlock() - envelopes = make([]*Envelope, len(self.messages)) - i := 0 + envelopes := make([]*Envelope, 0, len(self.messages)) for _, envelope := range self.messages { - envelopes[i] = envelope - i++ - } - - return -} - -func (self *Whisper) Protocol() p2p.Protocol { - return self.protocol -} - -// postEvent opens an envelope with the configured identities and delivers the -// message upstream from application processing. -func (self *Whisper) postEvent(envelope *Envelope) { - if message := self.open(envelope); message != nil { - self.filters.Notify(createFilter(message, envelope.Topics), message) - } -} - -// open tries to decrypt a whisper envelope with all the configured identities, -// returning the decrypted message and the key used to achieve it. If not keys -// are configured, open will return the payload as if non encrypted. -func (self *Whisper) open(envelope *Envelope) *Message { - // Short circuit if no identity is set, and assume clear-text - if len(self.keys) == 0 { - if message, err := envelope.Open(nil); err == nil { - return message - } - } - // Iterate over the keys and try to decrypt the message - for _, key := range self.keys { - message, err := envelope.Open(key) - if err == nil || err == ecies.ErrInvalidPublicKey { - message.To = &key.PublicKey - return message - } - } - // Failed to decrypt, don't return anything - return nil -} - -// createFilter creates a message filter to check against installed handlers. -func createFilter(message *Message, topics []Topic) filter.Filter { - return filter.Generic{ - Str1: string(crypto.FromECDSAPub(message.To)), - Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: NewTopicSet(topics), + envelopes = append(envelopes, envelope) } + return envelopes } |