diff options
author | gluk256 <gluk256@users.noreply.github.com> | 2017-11-04 04:29:49 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-11-04 04:29:49 +0800 |
commit | 9f7cd7568275e2db45a3d90429f7c92bf7dfbf19 (patch) | |
tree | e3fd637e6cd7b9968b4bedb030959167e299c136 /whisper/whisperv6/whisper.go | |
parent | 0131bd6ff9b1850fdd307715c62174af4f05d2c7 (diff) | |
download | go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.tar go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.tar.gz go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.tar.bz2 go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.tar.lz go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.tar.xz go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.tar.zst go-tangerine-9f7cd7568275e2db45a3d90429f7c92bf7dfbf19.zip |
whisper/whisperv6: initial commit (clone of v5) (#15324)
Diffstat (limited to 'whisper/whisperv6/whisper.go')
-rw-r--r-- | whisper/whisperv6/whisper.go | 858 |
1 files changed, 858 insertions, 0 deletions
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go new file mode 100644 index 000000000..553ac3f00 --- /dev/null +++ b/whisper/whisperv6/whisper.go @@ -0,0 +1,858 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv6 + +import ( + "bytes" + "crypto/ecdsa" + crand "crypto/rand" + "crypto/sha256" + "fmt" + "runtime" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/syndtr/goleveldb/leveldb/errors" + "golang.org/x/crypto/pbkdf2" + "golang.org/x/sync/syncmap" + set "gopkg.in/fatih/set.v0" +) + +type Statistics struct { + messagesCleared int + memoryCleared int + memoryUsed int + cycles int + totalMessagesCleared int +} + +const ( + minPowIdx = iota // Minimal PoW required by the whisper node + maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node + overflowIdx = iota // Indicator of message queue overflow +) + +// Whisper represents a dark communication interface through the Ethereum +// network, using its very own P2P communication layer. +type Whisper struct { + protocol p2p.Protocol // Protocol description and parameters + filters *Filters // Message filters installed with Subscribe function + + privateKeys map[string]*ecdsa.PrivateKey // Private key storage + symKeys map[string][]byte // Symmetric key storage + keyMu sync.RWMutex // Mutex associated with key storages + + poolMu sync.RWMutex // Mutex to sync the message and expiration pools + envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node + expirations map[uint32]*set.SetNonTS // Message expiration pool + + peerMu sync.RWMutex // Mutex to sync the active peer set + peers map[*Peer]struct{} // Set of currently active peers + + messageQueue chan *Envelope // Message queue for normal whisper messages + p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) + quit chan struct{} // Channel used for graceful exit + + settings syncmap.Map // holds configuration settings that can be dynamically changed + + statsMu sync.Mutex // guard stats + stats Statistics // Statistics of whisper node + + mailServer MailServer // MailServer interface +} + +// New creates a Whisper client ready to communicate through the Ethereum P2P network. +func New(cfg *Config) *Whisper { + if cfg == nil { + cfg = &DefaultConfig + } + + whisper := &Whisper{ + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + } + + whisper.filters = NewFilters(whisper) + + whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW) + whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize) + whisper.settings.Store(overflowIdx, false) + + // p2p whisper sub protocol handler + whisper.protocol = p2p.Protocol{ + Name: ProtocolName, + Version: uint(ProtocolVersion), + Length: NumberOfMessageCodes, + Run: whisper.HandlePeer, + NodeInfo: func() interface{} { + return map[string]interface{}{ + "version": ProtocolVersionStr, + "maxMessageSize": whisper.MaxMessageSize(), + "minimumPoW": whisper.MinPow(), + } + }, + } + + return whisper +} + +func (w *Whisper) MinPow() float64 { + val, _ := w.settings.Load(minPowIdx) + return val.(float64) +} + +// MaxMessageSize returns the maximum accepted message size. +func (w *Whisper) MaxMessageSize() uint32 { + val, _ := w.settings.Load(maxMsgSizeIdx) + return val.(uint32) +} + +// Overflow returns an indication if the message queue is full. +func (w *Whisper) Overflow() bool { + val, _ := w.settings.Load(overflowIdx) + return val.(bool) +} + +// APIs returns the RPC descriptors the Whisper implementation offers +func (w *Whisper) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: ProtocolName, + Version: ProtocolVersionStr, + Service: NewPublicWhisperAPI(w), + Public: true, + }, + } +} + +// RegisterServer registers MailServer interface. +// MailServer will process all the incoming messages with p2pRequestCode. +func (w *Whisper) RegisterServer(server MailServer) { + w.mailServer = server +} + +// Protocols returns the whisper sub-protocols ran by this particular client. +func (w *Whisper) Protocols() []p2p.Protocol { + return []p2p.Protocol{w.protocol} +} + +// Version returns the whisper sub-protocols version number. +func (w *Whisper) Version() uint { + return w.protocol.Version +} + +// SetMaxMessageSize sets the maximal message size allowed by this node +func (w *Whisper) SetMaxMessageSize(size uint32) error { + if size > MaxMessageSize { + return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) + } + w.settings.Store(maxMsgSizeIdx, uint32(size)) + return nil +} + +// SetMinimumPoW sets the minimal PoW required by this node +func (w *Whisper) SetMinimumPoW(val float64) error { + if val <= 0.0 { + return fmt.Errorf("invalid PoW: %f", val) + } + w.settings.Store(minPowIdx, val) + return nil +} + +// getPeer retrieves peer by ID +func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { + w.peerMu.Lock() + defer w.peerMu.Unlock() + for p := range w.peers { + id := p.peer.ID() + if bytes.Equal(peerID, id[:]) { + return p, nil + } + } + return nil, fmt.Errorf("Could not find peer with ID: %x", peerID) +} + +// AllowP2PMessagesFromPeer marks specific peer trusted, +// which will allow it to send historic (expired) messages. +func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { + p, err := w.getPeer(peerID) + if err != nil { + return err + } + p.trusted = true + return nil +} + +// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, +// which is known to implement MailServer interface, and is supposed to process this +// request and respond with a number of peer-to-peer messages (possibly expired), +// which are not supposed to be forwarded any further. +// The whisper protocol is agnostic of the format and contents of envelope. +func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { + p, err := w.getPeer(peerID) + if err != nil { + return err + } + p.trusted = true + return p2p.Send(p.ws, p2pRequestCode, envelope) +} + +// SendP2PMessage sends a peer-to-peer message to a specific peer. +func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { + p, err := w.getPeer(peerID) + if err != nil { + return err + } + return w.SendP2PDirect(p, envelope) +} + +// SendP2PDirect sends a peer-to-peer message to a specific peer. +func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { + return p2p.Send(peer.ws, p2pCode, envelope) +} + +// NewKeyPair generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. Returns ID of the new key pair. +func (w *Whisper) NewKeyPair() (string, error) { + key, err := crypto.GenerateKey() + if err != nil || !validatePrivateKey(key) { + key, err = crypto.GenerateKey() // retry once + } + if err != nil { + return "", err + } + if !validatePrivateKey(key) { + return "", fmt.Errorf("failed to generate valid key") + } + + id, err := GenerateRandomID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + w.keyMu.Lock() + defer w.keyMu.Unlock() + + if w.privateKeys[id] != nil { + return "", fmt.Errorf("failed to generate unique ID") + } + w.privateKeys[id] = key + return id, nil +} + +// DeleteKeyPair deletes the specified key if it exists. +func (w *Whisper) DeleteKeyPair(key string) bool { + w.keyMu.Lock() + defer w.keyMu.Unlock() + + if w.privateKeys[key] != nil { + delete(w.privateKeys, key) + return true + } + return false +} + +// AddKeyPair imports a asymmetric private key and returns it identifier. +func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { + id, err := GenerateRandomID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + w.keyMu.Lock() + w.privateKeys[id] = key + w.keyMu.Unlock() + + return id, nil +} + +// HasKeyPair checks if the the whisper node is configured with the private key +// of the specified public pair. +func (w *Whisper) HasKeyPair(id string) bool { + w.keyMu.RLock() + defer w.keyMu.RUnlock() + return w.privateKeys[id] != nil +} + +// GetPrivateKey retrieves the private key of the specified identity. +func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { + w.keyMu.RLock() + defer w.keyMu.RUnlock() + key := w.privateKeys[id] + if key == nil { + return nil, fmt.Errorf("invalid id") + } + return key, nil +} + +// GenerateSymKey generates a random symmetric key and stores it under id, +// which is then returned. Will be used in the future for session key exchange. +func (w *Whisper) GenerateSymKey() (string, error) { + key := make([]byte, aesKeyLength) + _, err := crand.Read(key) + if err != nil { + return "", err + } else if !validateSymmetricKey(key) { + return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data") + } + + id, err := GenerateRandomID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + w.keyMu.Lock() + defer w.keyMu.Unlock() + + if w.symKeys[id] != nil { + return "", fmt.Errorf("failed to generate unique ID") + } + w.symKeys[id] = key + return id, nil +} + +// AddSymKeyDirect stores the key, and returns its id. +func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) { + if len(key) != aesKeyLength { + return "", fmt.Errorf("wrong key size: %d", len(key)) + } + + id, err := GenerateRandomID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + w.keyMu.Lock() + defer w.keyMu.Unlock() + + if w.symKeys[id] != nil { + return "", fmt.Errorf("failed to generate unique ID") + } + w.symKeys[id] = key + return id, nil +} + +// AddSymKeyFromPassword generates the key from password, stores it, and returns its id. +func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) { + id, err := GenerateRandomID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + if w.HasSymKey(id) { + return "", fmt.Errorf("failed to generate unique ID") + } + + derived, err := deriveKeyMaterial([]byte(password), EnvelopeVersion) + if err != nil { + return "", err + } + + w.keyMu.Lock() + defer w.keyMu.Unlock() + + // double check is necessary, because deriveKeyMaterial() is very slow + if w.symKeys[id] != nil { + return "", fmt.Errorf("critical error: failed to generate unique ID") + } + w.symKeys[id] = derived + return id, nil +} + +// HasSymKey returns true if there is a key associated with the given id. +// Otherwise returns false. +func (w *Whisper) HasSymKey(id string) bool { + w.keyMu.RLock() + defer w.keyMu.RUnlock() + return w.symKeys[id] != nil +} + +// DeleteSymKey deletes the key associated with the name string if it exists. +func (w *Whisper) DeleteSymKey(id string) bool { + w.keyMu.Lock() + defer w.keyMu.Unlock() + if w.symKeys[id] != nil { + delete(w.symKeys, id) + return true + } + return false +} + +// GetSymKey returns the symmetric key associated with the given id. +func (w *Whisper) GetSymKey(id string) ([]byte, error) { + w.keyMu.RLock() + defer w.keyMu.RUnlock() + if w.symKeys[id] != nil { + return w.symKeys[id], nil + } + return nil, fmt.Errorf("non-existent key ID") +} + +// Subscribe installs a new message handler used for filtering, decrypting +// and subsequent storing of incoming messages. +func (w *Whisper) Subscribe(f *Filter) (string, error) { + return w.filters.Install(f) +} + +// GetFilter returns the filter by id. +func (w *Whisper) GetFilter(id string) *Filter { + return w.filters.Get(id) +} + +// Unsubscribe removes an installed message handler. +func (w *Whisper) Unsubscribe(id string) error { + ok := w.filters.Uninstall(id) + if !ok { + return fmt.Errorf("Unsubscribe: Invalid ID") + } + return nil +} + +// Send injects a message into the whisper send queue, to be distributed in the +// network in the coming cycles. +func (w *Whisper) Send(envelope *Envelope) error { + ok, err := w.add(envelope) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("failed to add envelope") + } + return err +} + +// Start implements node.Service, starting the background data propagation thread +// of the Whisper protocol. +func (w *Whisper) Start(*p2p.Server) error { + log.Info("started whisper v." + ProtocolVersionStr) + go w.update() + + numCPU := runtime.NumCPU() + for i := 0; i < numCPU; i++ { + go w.processQueue() + } + + return nil +} + +// Stop implements node.Service, stopping the background data propagation thread +// of the Whisper protocol. +func (w *Whisper) Stop() error { + close(w.quit) + log.Info("whisper stopped") + return nil +} + +// HandlePeer is called by the underlying P2P layer when the whisper sub-protocol +// connection is negotiated. +func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + // Create the new peer and start tracking it + whisperPeer := newPeer(wh, peer, rw) + + wh.peerMu.Lock() + wh.peers[whisperPeer] = struct{}{} + wh.peerMu.Unlock() + + defer func() { + wh.peerMu.Lock() + delete(wh.peers, whisperPeer) + wh.peerMu.Unlock() + }() + + // Run the peer handshake and state updates + if err := whisperPeer.handshake(); err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + + return wh.runMessageLoop(whisperPeer, rw) +} + +// runMessageLoop reads and processes inbound messages directly to merge into client-global state. +func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { + for { + // fetch the next packet + packet, err := rw.ReadMsg() + if err != nil { + log.Warn("message loop", "peer", p.peer.ID(), "err", err) + return err + } + if packet.Size > wh.MaxMessageSize() { + log.Warn("oversized message received", "peer", p.peer.ID()) + return errors.New("oversized message received") + } + + switch packet.Code { + case statusCode: + // this should not happen, but no need to panic; just ignore this message. + log.Warn("unxepected status message received", "peer", p.peer.ID()) + case messagesCode: + // decode the contained envelopes + var envelope Envelope + if err := packet.Decode(&envelope); err != nil { + log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid envelope") + } + cached, err := wh.add(&envelope) + if err != nil { + log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid envelope") + } + if cached { + p.mark(&envelope) + } + case p2pCode: + // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. + // this message is not supposed to be forwarded to other peers, and + // therefore might not satisfy the PoW, expiry and other requirements. + // these messages are only accepted from the trusted peer. + if p.trusted { + var envelope Envelope + if err := packet.Decode(&envelope); err != nil { + log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid direct message") + } + wh.postEvent(&envelope, true) + } + case p2pRequestCode: + // Must be processed if mail server is implemented. Otherwise ignore. + if wh.mailServer != nil { + var request Envelope + if err := packet.Decode(&request); err != nil { + log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid p2p request") + } + wh.mailServer.DeliverMail(p, &request) + } + default: + // New message types might be implemented in the future versions of Whisper. + // For forward compatibility, just ignore. + } + + packet.Discard() + } +} + +// add inserts a new envelope into the message pool to be distributed within the +// whisper network. It also inserts the envelope into the expiration pool at the +// appropriate time-stamp. In case of error, connection should be dropped. +func (wh *Whisper) add(envelope *Envelope) (bool, error) { + now := uint32(time.Now().Unix()) + sent := envelope.Expiry - envelope.TTL + + if sent > now { + if sent-SynchAllowance > now { + return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) + } else { + // recalculate PoW, adjusted for the time difference, plus one second for latency + envelope.calculatePoW(sent - now + 1) + } + } + + if envelope.Expiry < now { + if envelope.Expiry+SynchAllowance*2 < now { + return false, fmt.Errorf("very old message") + } else { + log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) + return false, nil // drop envelope without error + } + } + + if uint32(envelope.size()) > wh.MaxMessageSize() { + return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) + } + + if len(envelope.Version) > 4 { + return false, fmt.Errorf("oversized version [%x]", envelope.Hash()) + } + + aesNonceSize := len(envelope.AESNonce) + if aesNonceSize != 0 && aesNonceSize != AESNonceLength { + // the standard AES GCM nonce size is 12 bytes, + // but constant gcmStandardNonceSize cannot be accessed (not exported) + return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash()) + } + + if envelope.PoW() < wh.MinPow() { + log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) + return false, nil // drop envelope without error + } + + hash := envelope.Hash() + + wh.poolMu.Lock() + _, alreadyCached := wh.envelopes[hash] + if !alreadyCached { + wh.envelopes[hash] = envelope + if wh.expirations[envelope.Expiry] == nil { + wh.expirations[envelope.Expiry] = set.NewNonTS() + } + if !wh.expirations[envelope.Expiry].Has(hash) { + wh.expirations[envelope.Expiry].Add(hash) + } + } + wh.poolMu.Unlock() + + if alreadyCached { + log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) + } else { + log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) + wh.statsMu.Lock() + wh.stats.memoryUsed += envelope.size() + wh.statsMu.Unlock() + wh.postEvent(envelope, false) // notify the local node about the new message + if wh.mailServer != nil { + wh.mailServer.Archive(envelope) + } + } + return true, nil +} + +// postEvent queues the message for further processing. +func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { + // if the version of incoming message is higher than + // currently supported version, we can not decrypt it, + // and therefore just ignore this message + if envelope.Ver() <= EnvelopeVersion { + if isP2P { + w.p2pMsgQueue <- envelope + } else { + w.checkOverflow() + w.messageQueue <- envelope + } + } +} + +// checkOverflow checks if message queue overflow occurs and reports it if necessary. +func (w *Whisper) checkOverflow() { + queueSize := len(w.messageQueue) + + if queueSize == messageQueueLimit { + if !w.Overflow() { + w.settings.Store(overflowIdx, true) + log.Warn("message queue overflow") + } + } else if queueSize <= messageQueueLimit/2 { + if w.Overflow() { + w.settings.Store(overflowIdx, false) + log.Warn("message queue overflow fixed (back to normal)") + } + } +} + +// processQueue delivers the messages to the watchers during the lifetime of the whisper node. +func (w *Whisper) processQueue() { + var e *Envelope + for { + select { + case <-w.quit: + return + + case e = <-w.messageQueue: + w.filters.NotifyWatchers(e, false) + + case e = <-w.p2pMsgQueue: + w.filters.NotifyWatchers(e, true) + } + } +} + +// update loops until the lifetime of the whisper node, updating its internal +// state by expiring stale messages from the pool. +func (w *Whisper) update() { + // Start a ticker to check for expirations + expire := time.NewTicker(expirationCycle) + + // Repeat updates until termination is requested + for { + select { + case <-expire.C: + w.expire() + + case <-w.quit: + return + } + } +} + +// expire iterates over all the expiration timestamps, removing all stale +// messages from the pools. +func (w *Whisper) expire() { + w.poolMu.Lock() + defer w.poolMu.Unlock() + + w.statsMu.Lock() + defer w.statsMu.Unlock() + w.stats.reset() + now := uint32(time.Now().Unix()) + for expiry, hashSet := range w.expirations { + if expiry < now { + // Dump all expired messages and remove timestamp + hashSet.Each(func(v interface{}) bool { + sz := w.envelopes[v.(common.Hash)].size() + delete(w.envelopes, v.(common.Hash)) + w.stats.messagesCleared++ + w.stats.memoryCleared += sz + w.stats.memoryUsed -= sz + return true + }) + w.expirations[expiry].Clear() + delete(w.expirations, expiry) + } + } +} + +// Stats returns the whisper node statistics. +func (w *Whisper) Stats() Statistics { + w.statsMu.Lock() + defer w.statsMu.Unlock() + + return w.stats +} + +// Envelopes retrieves all the messages currently pooled by the node. +func (w *Whisper) Envelopes() []*Envelope { + w.poolMu.RLock() + defer w.poolMu.RUnlock() + + all := make([]*Envelope, 0, len(w.envelopes)) + for _, envelope := range w.envelopes { + all = append(all, envelope) + } + return all +} + +// Messages iterates through all currently floating envelopes +// and retrieves all the messages, that this filter could decrypt. +func (w *Whisper) Messages(id string) []*ReceivedMessage { + result := make([]*ReceivedMessage, 0) + w.poolMu.RLock() + defer w.poolMu.RUnlock() + + if filter := w.filters.Get(id); filter != nil { + for _, env := range w.envelopes { + msg := filter.processEnvelope(env) + if msg != nil { + result = append(result, msg) + } + } + } + return result +} + +// isEnvelopeCached checks if envelope with specific hash has already been received and cached. +func (w *Whisper) isEnvelopeCached(hash common.Hash) bool { + w.poolMu.Lock() + defer w.poolMu.Unlock() + + _, exist := w.envelopes[hash] + return exist +} + +// reset resets the node's statistics after each expiry cycle. +func (s *Statistics) reset() { + s.cycles++ + s.totalMessagesCleared += s.messagesCleared + + s.memoryCleared = 0 + s.messagesCleared = 0 +} + +// ValidatePublicKey checks the format of the given public key. +func ValidatePublicKey(k *ecdsa.PublicKey) bool { + return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0 +} + +// validatePrivateKey checks the format of the given private key. +func validatePrivateKey(k *ecdsa.PrivateKey) bool { + if k == nil || k.D == nil || k.D.Sign() == 0 { + return false + } + return ValidatePublicKey(&k.PublicKey) +} + +// validateSymmetricKey returns false if the key contains all zeros +func validateSymmetricKey(k []byte) bool { + return len(k) > 0 && !containsOnlyZeros(k) +} + +// containsOnlyZeros checks if the data contain only zeros. +func containsOnlyZeros(data []byte) bool { + for _, b := range data { + if b != 0 { + return false + } + } + return true +} + +// bytesToUintLittleEndian converts the slice to 64-bit unsigned integer. +func bytesToUintLittleEndian(b []byte) (res uint64) { + mul := uint64(1) + for i := 0; i < len(b); i++ { + res += uint64(b[i]) * mul + mul *= 256 + } + return res +} + +// BytesToUintBigEndian converts the slice to 64-bit unsigned integer. +func BytesToUintBigEndian(b []byte) (res uint64) { + for i := 0; i < len(b); i++ { + res *= 256 + res += uint64(b[i]) + } + return res +} + +// deriveKeyMaterial derives symmetric key material from the key or password. +// pbkdf2 is used for security, in case people use password instead of randomly generated keys. +func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error) { + if version == 0 { + // kdf should run no less than 0.1 seconds on average compute, + // because it's a once in a session experience + derivedKey := pbkdf2.Key(key, nil, 65356, aesKeyLength, sha256.New) + return derivedKey, nil + } else { + return nil, unknownVersionError(version) + } +} + +// GenerateRandomID generates a random string, which is then returned to be used as a key id +func GenerateRandomID() (id string, err error) { + buf := make([]byte, keyIdSize) + _, err = crand.Read(buf) + if err != nil { + return "", err + } + if !validateSymmetricKey(buf) { + return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data") + } + id = common.Bytes2Hex(buf) + return id, err +} |