diff options
Diffstat (limited to 'whisper/whisperv6/whisper.go')
-rw-r--r-- | whisper/whisperv6/whisper.go | 505 |
1 files changed, 243 insertions, 262 deletions
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index bc89aadcc..880cced09 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -19,7 +19,6 @@ package whisperv6 import ( "bytes" "crypto/ecdsa" - crand "crypto/rand" "crypto/sha256" "fmt" "math" @@ -39,6 +38,8 @@ import ( set "gopkg.in/fatih/set.v0" ) +// Statistics holds several message-related counter for analytics +// purposes. type Statistics struct { messagesCleared int memoryCleared int @@ -81,6 +82,8 @@ type Whisper struct { syncAllowance int // maximum time in seconds allowed to process the whisper-related messages + lightClient bool // indicates is this node is pure light client (does not forward any messages) + statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node @@ -130,8 +133,8 @@ func New(cfg *Config) *Whisper { } // MinPow returns the PoW value required by this node. -func (w *Whisper) MinPow() float64 { - val, exist := w.settings.Load(minPowIdx) +func (whisper *Whisper) MinPow() float64 { + val, exist := whisper.settings.Load(minPowIdx) if !exist || val == nil { return DefaultMinimumPoW } @@ -146,8 +149,8 @@ func (w *Whisper) MinPow() float64 { // MinPowTolerance returns the value of minimum PoW which is tolerated for a limited // time after PoW was changed. If sufficient time have elapsed or no change of PoW // have ever occurred, the return value will be the same as return value of MinPow(). -func (w *Whisper) MinPowTolerance() float64 { - val, exist := w.settings.Load(minPowToleranceIdx) +func (whisper *Whisper) MinPowTolerance() float64 { + val, exist := whisper.settings.Load(minPowToleranceIdx) if !exist || val == nil { return DefaultMinimumPoW } @@ -158,8 +161,8 @@ func (w *Whisper) MinPowTolerance() float64 { // The nodes are required to send only messages that match the advertised bloom filter. // If a message does not match the bloom, it will tantamount to spam, and the peer will // be disconnected. -func (w *Whisper) BloomFilter() []byte { - val, exist := w.settings.Load(bloomFilterIdx) +func (whisper *Whisper) BloomFilter() []byte { + val, exist := whisper.settings.Load(bloomFilterIdx) if !exist || val == nil { return nil } @@ -170,8 +173,8 @@ func (w *Whisper) BloomFilter() []byte { // time after new bloom was advertised to the peers. If sufficient time have elapsed // or no change of bloom filter have ever occurred, the return value will be the same // as return value of BloomFilter(). -func (w *Whisper) BloomFilterTolerance() []byte { - val, exist := w.settings.Load(bloomFilterToleranceIdx) +func (whisper *Whisper) BloomFilterTolerance() []byte { + val, exist := whisper.settings.Load(bloomFilterToleranceIdx) if !exist || val == nil { return nil } @@ -179,24 +182,24 @@ func (w *Whisper) BloomFilterTolerance() []byte { } // MaxMessageSize returns the maximum accepted message size. -func (w *Whisper) MaxMessageSize() uint32 { - val, _ := w.settings.Load(maxMsgSizeIdx) +func (whisper *Whisper) MaxMessageSize() uint32 { + val, _ := whisper.settings.Load(maxMsgSizeIdx) return val.(uint32) } // Overflow returns an indication if the message queue is full. -func (w *Whisper) Overflow() bool { - val, _ := w.settings.Load(overflowIdx) +func (whisper *Whisper) Overflow() bool { + val, _ := whisper.settings.Load(overflowIdx) return val.(bool) } // APIs returns the RPC descriptors the Whisper implementation offers -func (w *Whisper) APIs() []rpc.API { +func (whisper *Whisper) APIs() []rpc.API { return []rpc.API{ { Namespace: ProtocolName, Version: ProtocolVersionStr, - Service: NewPublicWhisperAPI(w), + Service: NewPublicWhisperAPI(whisper), Public: true, }, } @@ -204,77 +207,77 @@ func (w *Whisper) APIs() []rpc.API { // RegisterServer registers MailServer interface. // MailServer will process all the incoming messages with p2pRequestCode. -func (w *Whisper) RegisterServer(server MailServer) { - w.mailServer = server +func (whisper *Whisper) RegisterServer(server MailServer) { + whisper.mailServer = server } // Protocols returns the whisper sub-protocols ran by this particular client. -func (w *Whisper) Protocols() []p2p.Protocol { - return []p2p.Protocol{w.protocol} +func (whisper *Whisper) Protocols() []p2p.Protocol { + return []p2p.Protocol{whisper.protocol} } // Version returns the whisper sub-protocols version number. -func (w *Whisper) Version() uint { - return w.protocol.Version +func (whisper *Whisper) Version() uint { + return whisper.protocol.Version } // SetMaxMessageSize sets the maximal message size allowed by this node -func (w *Whisper) SetMaxMessageSize(size uint32) error { +func (whisper *Whisper) SetMaxMessageSize(size uint32) error { if size > MaxMessageSize { return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) } - w.settings.Store(maxMsgSizeIdx, size) + whisper.settings.Store(maxMsgSizeIdx, size) return nil } // SetBloomFilter sets the new bloom filter -func (w *Whisper) SetBloomFilter(bloom []byte) error { - if len(bloom) != bloomFilterSize { +func (whisper *Whisper) SetBloomFilter(bloom []byte) error { + if len(bloom) != BloomFilterSize { return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) } - b := make([]byte, bloomFilterSize) + b := make([]byte, BloomFilterSize) copy(b, bloom) - w.settings.Store(bloomFilterIdx, b) - w.notifyPeersAboutBloomFilterChange(b) + whisper.settings.Store(bloomFilterIdx, b) + whisper.notifyPeersAboutBloomFilterChange(b) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(w.syncAllowance) * time.Second) - w.settings.Store(bloomFilterToleranceIdx, b) + time.Sleep(time.Duration(whisper.syncAllowance) * time.Second) + whisper.settings.Store(bloomFilterToleranceIdx, b) }() return nil } // SetMinimumPoW sets the minimal PoW required by this node -func (w *Whisper) SetMinimumPoW(val float64) error { +func (whisper *Whisper) SetMinimumPoW(val float64) error { if val < 0.0 { return fmt.Errorf("invalid PoW: %f", val) } - w.settings.Store(minPowIdx, val) - w.notifyPeersAboutPowRequirementChange(val) + whisper.settings.Store(minPowIdx, val) + whisper.notifyPeersAboutPowRequirementChange(val) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(w.syncAllowance) * time.Second) - w.settings.Store(minPowToleranceIdx, val) + time.Sleep(time.Duration(whisper.syncAllowance) * time.Second) + whisper.settings.Store(minPowToleranceIdx, val) }() return nil } -// SetMinimumPoW sets the minimal PoW in test environment -func (w *Whisper) SetMinimumPowTest(val float64) { - w.settings.Store(minPowIdx, val) - w.notifyPeersAboutPowRequirementChange(val) - w.settings.Store(minPowToleranceIdx, val) +// SetMinimumPowTest sets the minimal PoW in test environment +func (whisper *Whisper) SetMinimumPowTest(val float64) { + whisper.settings.Store(minPowIdx, val) + whisper.notifyPeersAboutPowRequirementChange(val) + whisper.settings.Store(minPowToleranceIdx, val) } -func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { - arr := w.getPeers() +func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { + arr := whisper.getPeers() for _, p := range arr { err := p.notifyAboutPowRequirementChange(pow) if err != nil { @@ -287,8 +290,8 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { } } -func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { - arr := w.getPeers() +func (whisper *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { + arr := whisper.getPeers() for _, p := range arr { err := p.notifyAboutBloomFilterChange(bloom) if err != nil { @@ -301,23 +304,23 @@ func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { } } -func (w *Whisper) getPeers() []*Peer { - arr := make([]*Peer, len(w.peers)) +func (whisper *Whisper) getPeers() []*Peer { + arr := make([]*Peer, len(whisper.peers)) i := 0 - w.peerMu.Lock() - for p := range w.peers { + whisper.peerMu.Lock() + for p := range whisper.peers { arr[i] = p i++ } - w.peerMu.Unlock() + whisper.peerMu.Unlock() return arr } // getPeer retrieves peer by ID -func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { - w.peerMu.Lock() - defer w.peerMu.Unlock() - for p := range w.peers { +func (whisper *Whisper) getPeer(peerID []byte) (*Peer, error) { + whisper.peerMu.Lock() + defer whisper.peerMu.Unlock() + for p := range whisper.peers { id := p.peer.ID() if bytes.Equal(peerID, id[:]) { return p, nil @@ -328,8 +331,8 @@ func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { // AllowP2PMessagesFromPeer marks specific peer trusted, // which will allow it to send historic (expired) messages. -func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { - p, err := w.getPeer(peerID) +func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { + p, err := whisper.getPeer(peerID) if err != nil { return err } @@ -342,8 +345,8 @@ func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { // request and respond with a number of peer-to-peer messages (possibly expired), // which are not supposed to be forwarded any further. // The whisper protocol is agnostic of the format and contents of envelope. -func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { - p, err := w.getPeer(peerID) +func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { + p, err := whisper.getPeer(peerID) if err != nil { return err } @@ -352,22 +355,22 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) err } // SendP2PMessage sends a peer-to-peer message to a specific peer. -func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { - p, err := w.getPeer(peerID) +func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { + p, err := whisper.getPeer(peerID) if err != nil { return err } - return w.SendP2PDirect(p, envelope) + return whisper.SendP2PDirect(p, envelope) } // SendP2PDirect sends a peer-to-peer message to a specific peer. -func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { +func (whisper *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { return p2p.Send(peer.ws, p2pMessageCode, envelope) } // NewKeyPair generates a new cryptographic identity for the client, and injects // it into the known identities for message decryption. Returns ID of the new key pair. -func (w *Whisper) NewKeyPair() (string, error) { +func (whisper *Whisper) NewKeyPair() (string, error) { key, err := crypto.GenerateKey() if err != nil || !validatePrivateKey(key) { key, err = crypto.GenerateKey() // retry once @@ -384,55 +387,55 @@ func (w *Whisper) NewKeyPair() (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.privateKeys[id] != nil { + if whisper.privateKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - w.privateKeys[id] = key + whisper.privateKeys[id] = key return id, nil } // DeleteKeyPair deletes the specified key if it exists. -func (w *Whisper) DeleteKeyPair(key string) bool { - w.keyMu.Lock() - defer w.keyMu.Unlock() +func (whisper *Whisper) DeleteKeyPair(key string) bool { + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.privateKeys[key] != nil { - delete(w.privateKeys, key) + if whisper.privateKeys[key] != nil { + delete(whisper.privateKeys, key) return true } return false } // AddKeyPair imports a asymmetric private key and returns it identifier. -func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { +func (whisper *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { id, err := GenerateRandomID() if err != nil { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - w.privateKeys[id] = key - w.keyMu.Unlock() + whisper.keyMu.Lock() + whisper.privateKeys[id] = key + whisper.keyMu.Unlock() return id, nil } // HasKeyPair checks if the the whisper node is configured with the private key // of the specified public pair. -func (w *Whisper) HasKeyPair(id string) bool { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - return w.privateKeys[id] != nil +func (whisper *Whisper) HasKeyPair(id string) bool { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + return whisper.privateKeys[id] != nil } // GetPrivateKey retrieves the private key of the specified identity. -func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - key := w.privateKeys[id] +func (whisper *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + key := whisper.privateKeys[id] if key == nil { return nil, fmt.Errorf("invalid id") } @@ -441,12 +444,11 @@ func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { // GenerateSymKey generates a random symmetric key and stores it under id, // which is then returned. Will be used in the future for session key exchange. -func (w *Whisper) GenerateSymKey() (string, error) { - key := make([]byte, aesKeyLength) - _, err := crand.Read(key) +func (whisper *Whisper) GenerateSymKey() (string, error) { + key, err := generateSecureRandomData(aesKeyLength) if err != nil { return "", err - } else if !validateSymmetricKey(key) { + } else if !validateDataIntegrity(key, aesKeyLength) { return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data") } @@ -455,18 +457,18 @@ func (w *Whisper) GenerateSymKey() (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.symKeys[id] != nil { + if whisper.symKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - w.symKeys[id] = key + whisper.symKeys[id] = key return id, nil } // AddSymKeyDirect stores the key, and returns its id. -func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) { +func (whisper *Whisper) AddSymKeyDirect(key []byte) (string, error) { if len(key) != aesKeyLength { return "", fmt.Errorf("wrong key size: %d", len(key)) } @@ -476,23 +478,23 @@ func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.symKeys[id] != nil { + if whisper.symKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - w.symKeys[id] = key + whisper.symKeys[id] = key return id, nil } // AddSymKeyFromPassword generates the key from password, stores it, and returns its id. -func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) { +func (whisper *Whisper) AddSymKeyFromPassword(password string) (string, error) { id, err := GenerateRandomID() if err != nil { return "", fmt.Errorf("failed to generate ID: %s", err) } - if w.HasSymKey(id) { + if whisper.HasSymKey(id) { return "", fmt.Errorf("failed to generate unique ID") } @@ -503,81 +505,81 @@ func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) { return "", err } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() // double check is necessary, because deriveKeyMaterial() is very slow - if w.symKeys[id] != nil { + if whisper.symKeys[id] != nil { return "", fmt.Errorf("critical error: failed to generate unique ID") } - w.symKeys[id] = derived + whisper.symKeys[id] = derived return id, nil } // HasSymKey returns true if there is a key associated with the given id. // Otherwise returns false. -func (w *Whisper) HasSymKey(id string) bool { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - return w.symKeys[id] != nil +func (whisper *Whisper) HasSymKey(id string) bool { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + return whisper.symKeys[id] != nil } // DeleteSymKey deletes the key associated with the name string if it exists. -func (w *Whisper) DeleteSymKey(id string) bool { - w.keyMu.Lock() - defer w.keyMu.Unlock() - if w.symKeys[id] != nil { - delete(w.symKeys, id) +func (whisper *Whisper) DeleteSymKey(id string) bool { + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() + if whisper.symKeys[id] != nil { + delete(whisper.symKeys, id) return true } return false } // GetSymKey returns the symmetric key associated with the given id. -func (w *Whisper) GetSymKey(id string) ([]byte, error) { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - if w.symKeys[id] != nil { - return w.symKeys[id], nil +func (whisper *Whisper) GetSymKey(id string) ([]byte, error) { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + if whisper.symKeys[id] != nil { + return whisper.symKeys[id], nil } return nil, fmt.Errorf("non-existent key ID") } // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (w *Whisper) Subscribe(f *Filter) (string, error) { - s, err := w.filters.Install(f) +func (whisper *Whisper) Subscribe(f *Filter) (string, error) { + s, err := whisper.filters.Install(f) if err == nil { - w.updateBloomFilter(f) + whisper.updateBloomFilter(f) } return s, err } // updateBloomFilter recalculates the new value of bloom filter, // and informs the peers if necessary. -func (w *Whisper) updateBloomFilter(f *Filter) { - aggregate := make([]byte, bloomFilterSize) +func (whisper *Whisper) updateBloomFilter(f *Filter) { + aggregate := make([]byte, BloomFilterSize) for _, t := range f.Topics { top := BytesToTopic(t) b := TopicToBloom(top) aggregate = addBloom(aggregate, b) } - if !bloomFilterMatch(w.BloomFilter(), aggregate) { + if !BloomFilterMatch(whisper.BloomFilter(), aggregate) { // existing bloom filter must be updated - aggregate = addBloom(w.BloomFilter(), aggregate) - w.SetBloomFilter(aggregate) + aggregate = addBloom(whisper.BloomFilter(), aggregate) + whisper.SetBloomFilter(aggregate) } } // GetFilter returns the filter by id. -func (w *Whisper) GetFilter(id string) *Filter { - return w.filters.Get(id) +func (whisper *Whisper) GetFilter(id string) *Filter { + return whisper.filters.Get(id) } // Unsubscribe removes an installed message handler. -func (w *Whisper) Unsubscribe(id string) error { - ok := w.filters.Uninstall(id) +func (whisper *Whisper) Unsubscribe(id string) error { + ok := whisper.filters.Uninstall(id) if !ok { return fmt.Errorf("Unsubscribe: Invalid ID") } @@ -586,12 +588,9 @@ func (w *Whisper) Unsubscribe(id string) error { // Send injects a message into the whisper send queue, to be distributed in the // network in the coming cycles. -func (w *Whisper) Send(envelope *Envelope) error { - ok, err := w.add(envelope) - if err != nil { - return err - } - if !ok { +func (whisper *Whisper) Send(envelope *Envelope) error { + ok, err := whisper.add(envelope, false) + if err == nil && !ok { return fmt.Errorf("failed to add envelope") } return err @@ -599,13 +598,13 @@ func (w *Whisper) Send(envelope *Envelope) error { // Start implements node.Service, starting the background data propagation thread // of the Whisper protocol. -func (w *Whisper) Start(*p2p.Server) error { +func (whisper *Whisper) Start(*p2p.Server) error { log.Info("started whisper v." + ProtocolVersionStr) - go w.update() + go whisper.update() numCPU := runtime.NumCPU() for i := 0; i < numCPU; i++ { - go w.processQueue() + go whisper.processQueue() } return nil @@ -613,26 +612,26 @@ func (w *Whisper) Start(*p2p.Server) error { // Stop implements node.Service, stopping the background data propagation thread // of the Whisper protocol. -func (w *Whisper) Stop() error { - close(w.quit) +func (whisper *Whisper) Stop() error { + close(whisper.quit) log.Info("whisper stopped") return nil } // HandlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. -func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { +func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { // Create the new peer and start tracking it - whisperPeer := newPeer(wh, peer, rw) + whisperPeer := newPeer(whisper, peer, rw) - wh.peerMu.Lock() - wh.peers[whisperPeer] = struct{}{} - wh.peerMu.Unlock() + whisper.peerMu.Lock() + whisper.peers[whisperPeer] = struct{}{} + whisper.peerMu.Unlock() defer func() { - wh.peerMu.Lock() - delete(wh.peers, whisperPeer) - wh.peerMu.Unlock() + whisper.peerMu.Lock() + delete(whisper.peers, whisperPeer) + whisper.peerMu.Unlock() }() // Run the peer handshake and state updates @@ -642,11 +641,11 @@ func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { whisperPeer.start() defer whisperPeer.stop() - return wh.runMessageLoop(whisperPeer, rw) + return whisper.runMessageLoop(whisperPeer, rw) } // runMessageLoop reads and processes inbound messages directly to merge into client-global state. -func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { +func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { for { // fetch the next packet packet, err := rw.ReadMsg() @@ -654,7 +653,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("message loop", "peer", p.peer.ID(), "err", err) return err } - if packet.Size > wh.MaxMessageSize() { + if packet.Size > whisper.MaxMessageSize() { log.Warn("oversized message received", "peer", p.peer.ID()) return errors.New("oversized message received") } @@ -673,7 +672,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { trouble := false for _, env := range envelopes { - cached, err := wh.add(env) + cached, err := whisper.add(env, whisper.lightClient) if err != nil { trouble = true log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) @@ -702,7 +701,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { case bloomFilterExCode: var bloom []byte err := packet.Decode(&bloom) - if err == nil && len(bloom) != bloomFilterSize { + if err == nil && len(bloom) != BloomFilterSize { err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) } @@ -710,11 +709,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid bloom filter exchange message") } - if isFullNode(bloom) { - p.bloomFilter = nil - } else { - p.bloomFilter = bloom - } + p.setBloomFilter(bloom) case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and @@ -726,17 +721,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid direct message") } - wh.postEvent(&envelope, true) + whisper.postEvent(&envelope, true) } case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. - if wh.mailServer != nil { + if whisper.mailServer != nil { var request Envelope if err := packet.Decode(&request); err != nil { log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid p2p request") } - wh.mailServer.DeliverMail(p, &request) + whisper.mailServer.DeliverMail(p, &request) } default: // New message types might be implemented in the future versions of Whisper. @@ -750,128 +745,127 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. -func (wh *Whisper) add(envelope *Envelope) (bool, error) { +// param isP2P indicates whether the message is peer-to-peer (should not be forwarded). +func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { now := uint32(time.Now().Unix()) sent := envelope.Expiry - envelope.TTL if sent > now { if sent-DefaultSyncAllowance > now { return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) - } else { - // recalculate PoW, adjusted for the time difference, plus one second for latency - envelope.calculatePoW(sent - now + 1) } + // recalculate PoW, adjusted for the time difference, plus one second for latency + envelope.calculatePoW(sent - now + 1) } if envelope.Expiry < now { if envelope.Expiry+DefaultSyncAllowance*2 < now { return false, fmt.Errorf("very old message") - } else { - log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error } + log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) + return false, nil // drop envelope without error } - if uint32(envelope.size()) > wh.MaxMessageSize() { + if uint32(envelope.size()) > whisper.MaxMessageSize() { return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } - if envelope.PoW() < wh.MinPow() { + if envelope.PoW() < whisper.MinPow() { // maybe the value was recently changed, and the peers did not adjust yet. // in this case the previous value is retrieved by MinPowTolerance() // for a short period of peer synchronization. - if envelope.PoW() < wh.MinPowTolerance() { + if envelope.PoW() < whisper.MinPowTolerance() { return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) } } - if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) { + if !BloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) { // maybe the value was recently changed, and the peers did not adjust yet. // in this case the previous value is retrieved by BloomFilterTolerance() // for a short period of peer synchronization. - if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) { + if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) { return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x", - envelope.Hash().Hex(), wh.BloomFilter(), envelope.Bloom(), envelope.Topic) + envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic) } } hash := envelope.Hash() - wh.poolMu.Lock() - _, alreadyCached := wh.envelopes[hash] + whisper.poolMu.Lock() + _, alreadyCached := whisper.envelopes[hash] if !alreadyCached { - wh.envelopes[hash] = envelope - if wh.expirations[envelope.Expiry] == nil { - wh.expirations[envelope.Expiry] = set.NewNonTS() + whisper.envelopes[hash] = envelope + if whisper.expirations[envelope.Expiry] == nil { + whisper.expirations[envelope.Expiry] = set.NewNonTS() } - if !wh.expirations[envelope.Expiry].Has(hash) { - wh.expirations[envelope.Expiry].Add(hash) + if !whisper.expirations[envelope.Expiry].Has(hash) { + whisper.expirations[envelope.Expiry].Add(hash) } } - wh.poolMu.Unlock() + whisper.poolMu.Unlock() if alreadyCached { log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) } else { log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) - wh.statsMu.Lock() - wh.stats.memoryUsed += envelope.size() - wh.statsMu.Unlock() - wh.postEvent(envelope, false) // notify the local node about the new message - if wh.mailServer != nil { - wh.mailServer.Archive(envelope) + whisper.statsMu.Lock() + whisper.stats.memoryUsed += envelope.size() + whisper.statsMu.Unlock() + whisper.postEvent(envelope, isP2P) // notify the local node about the new message + if whisper.mailServer != nil { + whisper.mailServer.Archive(envelope) } } return true, nil } // postEvent queues the message for further processing. -func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { +func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) { if isP2P { - w.p2pMsgQueue <- envelope + whisper.p2pMsgQueue <- envelope } else { - w.checkOverflow() - w.messageQueue <- envelope + whisper.checkOverflow() + whisper.messageQueue <- envelope } } // checkOverflow checks if message queue overflow occurs and reports it if necessary. -func (w *Whisper) checkOverflow() { - queueSize := len(w.messageQueue) +func (whisper *Whisper) checkOverflow() { + queueSize := len(whisper.messageQueue) if queueSize == messageQueueLimit { - if !w.Overflow() { - w.settings.Store(overflowIdx, true) + if !whisper.Overflow() { + whisper.settings.Store(overflowIdx, true) log.Warn("message queue overflow") } } else if queueSize <= messageQueueLimit/2 { - if w.Overflow() { - w.settings.Store(overflowIdx, false) + if whisper.Overflow() { + whisper.settings.Store(overflowIdx, false) log.Warn("message queue overflow fixed (back to normal)") } } } // processQueue delivers the messages to the watchers during the lifetime of the whisper node. -func (w *Whisper) processQueue() { +func (whisper *Whisper) processQueue() { var e *Envelope for { select { - case <-w.quit: + case <-whisper.quit: return - case e = <-w.messageQueue: - w.filters.NotifyWatchers(e, false) + case e = <-whisper.messageQueue: + whisper.filters.NotifyWatchers(e, false) - case e = <-w.p2pMsgQueue: - w.filters.NotifyWatchers(e, true) + case e = <-whisper.p2pMsgQueue: + whisper.filters.NotifyWatchers(e, true) } } } // update loops until the lifetime of the whisper node, updating its internal // state by expiring stale messages from the pool. -func (w *Whisper) update() { +func (whisper *Whisper) update() { // Start a ticker to check for expirations expire := time.NewTicker(expirationCycle) @@ -879,9 +873,9 @@ func (w *Whisper) update() { for { select { case <-expire.C: - w.expire() + whisper.expire() - case <-w.quit: + case <-whisper.quit: return } } @@ -889,75 +883,57 @@ func (w *Whisper) update() { // expire iterates over all the expiration timestamps, removing all stale // messages from the pools. -func (w *Whisper) expire() { - w.poolMu.Lock() - defer w.poolMu.Unlock() +func (whisper *Whisper) expire() { + whisper.poolMu.Lock() + defer whisper.poolMu.Unlock() - w.statsMu.Lock() - defer w.statsMu.Unlock() - w.stats.reset() + whisper.statsMu.Lock() + defer whisper.statsMu.Unlock() + whisper.stats.reset() now := uint32(time.Now().Unix()) - for expiry, hashSet := range w.expirations { + for expiry, hashSet := range whisper.expirations { if expiry < now { // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { - sz := w.envelopes[v.(common.Hash)].size() - delete(w.envelopes, v.(common.Hash)) - w.stats.messagesCleared++ - w.stats.memoryCleared += sz - w.stats.memoryUsed -= sz + sz := whisper.envelopes[v.(common.Hash)].size() + delete(whisper.envelopes, v.(common.Hash)) + whisper.stats.messagesCleared++ + whisper.stats.memoryCleared += sz + whisper.stats.memoryUsed -= sz return true }) - w.expirations[expiry].Clear() - delete(w.expirations, expiry) + whisper.expirations[expiry].Clear() + delete(whisper.expirations, expiry) } } } // Stats returns the whisper node statistics. -func (w *Whisper) Stats() Statistics { - w.statsMu.Lock() - defer w.statsMu.Unlock() +func (whisper *Whisper) Stats() Statistics { + whisper.statsMu.Lock() + defer whisper.statsMu.Unlock() - return w.stats + return whisper.stats } // Envelopes retrieves all the messages currently pooled by the node. -func (w *Whisper) Envelopes() []*Envelope { - w.poolMu.RLock() - defer w.poolMu.RUnlock() +func (whisper *Whisper) Envelopes() []*Envelope { + whisper.poolMu.RLock() + defer whisper.poolMu.RUnlock() - all := make([]*Envelope, 0, len(w.envelopes)) - for _, envelope := range w.envelopes { + all := make([]*Envelope, 0, len(whisper.envelopes)) + for _, envelope := range whisper.envelopes { all = append(all, envelope) } return all } -// Messages iterates through all currently floating envelopes -// and retrieves all the messages, that this filter could decrypt. -func (w *Whisper) Messages(id string) []*ReceivedMessage { - result := make([]*ReceivedMessage, 0) - w.poolMu.RLock() - defer w.poolMu.RUnlock() - - if filter := w.filters.Get(id); filter != nil { - for _, env := range w.envelopes { - msg := filter.processEnvelope(env) - if msg != nil { - result = append(result, msg) - } - } - } - return result -} - // isEnvelopeCached checks if envelope with specific hash has already been received and cached. -func (w *Whisper) isEnvelopeCached(hash common.Hash) bool { - w.poolMu.Lock() - defer w.poolMu.Unlock() +func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool { + whisper.poolMu.Lock() + defer whisper.poolMu.Unlock() - _, exist := w.envelopes[hash] + _, exist := whisper.envelopes[hash] return exist } @@ -983,9 +959,16 @@ func validatePrivateKey(k *ecdsa.PrivateKey) bool { return ValidatePublicKey(&k.PublicKey) } -// validateSymmetricKey returns false if the key contains all zeros -func validateSymmetricKey(k []byte) bool { - return len(k) > 0 && !containsOnlyZeros(k) +// validateDataIntegrity returns false if the data have the wrong or contains all zeros, +// which is the simplest and the most common bug. +func validateDataIntegrity(k []byte, expectedSize int) bool { + if len(k) != expectedSize { + return false + } + if expectedSize > 3 && containsOnlyZeros(k) { + return false + } + return true } // containsOnlyZeros checks if the data contain only zeros. @@ -1019,12 +1002,11 @@ func BytesToUintBigEndian(b []byte) (res uint64) { // GenerateRandomID generates a random string, which is then returned to be used as a key id func GenerateRandomID() (id string, err error) { - buf := make([]byte, keyIdSize) - _, err = crand.Read(buf) + buf, err := generateSecureRandomData(keyIDSize) if err != nil { return "", err } - if !validateSymmetricKey(buf) { + if !validateDataIntegrity(buf, keyIDSize) { return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data") } id = common.Bytes2Hex(buf) @@ -1043,13 +1025,12 @@ func isFullNode(bloom []byte) bool { return true } -func bloomFilterMatch(filter, sample []byte) bool { +func BloomFilterMatch(filter, sample []byte) bool { if filter == nil { - // full node, accepts all messages return true } - for i := 0; i < bloomFilterSize; i++ { + for i := 0; i < BloomFilterSize; i++ { f := filter[i] s := sample[i] if (f | s) != f { @@ -1061,8 +1042,8 @@ func bloomFilterMatch(filter, sample []byte) bool { } func addBloom(a, b []byte) []byte { - c := make([]byte, bloomFilterSize) - for i := 0; i < bloomFilterSize; i++ { + c := make([]byte, BloomFilterSize) + for i := 0; i < BloomFilterSize; i++ { c[i] = a[i] | b[i] } return c |