diff options
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r-- | whisper/whisperv5/whisper.go | 77 |
1 files changed, 37 insertions, 40 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index 85849ccce..62bd1ce17 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -469,18 +469,18 @@ func (w *Whisper) Stop() error { // HandlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. -func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { +func (w *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { // Create the new peer and start tracking it - whisperPeer := newPeer(wh, peer, rw) + whisperPeer := newPeer(w, peer, rw) - wh.peerMu.Lock() - wh.peers[whisperPeer] = struct{}{} - wh.peerMu.Unlock() + w.peerMu.Lock() + w.peers[whisperPeer] = struct{}{} + w.peerMu.Unlock() defer func() { - wh.peerMu.Lock() - delete(wh.peers, whisperPeer) - wh.peerMu.Unlock() + w.peerMu.Lock() + delete(w.peers, whisperPeer) + w.peerMu.Unlock() }() // Run the peer handshake and state updates @@ -490,11 +490,11 @@ func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { whisperPeer.start() defer whisperPeer.stop() - return wh.runMessageLoop(whisperPeer, rw) + return w.runMessageLoop(whisperPeer, rw) } // runMessageLoop reads and processes inbound messages directly to merge into client-global state. -func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { +func (w *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { for { // fetch the next packet packet, err := rw.ReadMsg() @@ -502,7 +502,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("message loop", "peer", p.peer.ID(), "err", err) return err } - if packet.Size > wh.MaxMessageSize() { + if packet.Size > w.MaxMessageSize() { log.Warn("oversized message received", "peer", p.peer.ID()) return errors.New("oversized message received") } @@ -518,7 +518,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid envelope") } - cached, err := wh.add(&envelope) + cached, err := w.add(&envelope) if err != nil { log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid envelope") @@ -537,17 +537,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid direct message") } - wh.postEvent(&envelope, true) + w.postEvent(&envelope, true) } case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. - if wh.mailServer != nil { + if w.mailServer != nil { var request Envelope if err := packet.Decode(&request); err != nil { log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid p2p request") } - wh.mailServer.DeliverMail(p, &request) + w.mailServer.DeliverMail(p, &request) } default: // New message types might be implemented in the future versions of Whisper. @@ -561,29 +561,27 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. -func (wh *Whisper) add(envelope *Envelope) (bool, error) { +func (w *Whisper) add(envelope *Envelope) (bool, error) { now := uint32(time.Now().Unix()) sent := envelope.Expiry - envelope.TTL if sent > now { if sent-SynchAllowance > now { return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) - } else { - // recalculate PoW, adjusted for the time difference, plus one second for latency - envelope.calculatePoW(sent - now + 1) } + // recalculate PoW, adjusted for the time difference, plus one second for latency + envelope.calculatePoW(sent - now + 1) } if envelope.Expiry < now { if envelope.Expiry+SynchAllowance*2 < now { return false, fmt.Errorf("very old message") - } else { - log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error } + log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) + return false, nil // drop envelope without error } - if uint32(envelope.size()) > wh.MaxMessageSize() { + if uint32(envelope.size()) > w.MaxMessageSize() { return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } @@ -598,36 +596,36 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash()) } - if envelope.PoW() < wh.MinPow() { + if envelope.PoW() < w.MinPow() { log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) return false, nil // drop envelope without error } hash := envelope.Hash() - wh.poolMu.Lock() - _, alreadyCached := wh.envelopes[hash] + w.poolMu.Lock() + _, alreadyCached := w.envelopes[hash] if !alreadyCached { - wh.envelopes[hash] = envelope - if wh.expirations[envelope.Expiry] == nil { - wh.expirations[envelope.Expiry] = set.NewNonTS() + w.envelopes[hash] = envelope + if w.expirations[envelope.Expiry] == nil { + w.expirations[envelope.Expiry] = set.NewNonTS() } - if !wh.expirations[envelope.Expiry].Has(hash) { - wh.expirations[envelope.Expiry].Add(hash) + if !w.expirations[envelope.Expiry].Has(hash) { + w.expirations[envelope.Expiry].Add(hash) } } - wh.poolMu.Unlock() + w.poolMu.Unlock() if alreadyCached { log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) } else { log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) - wh.statsMu.Lock() - wh.stats.memoryUsed += envelope.size() - wh.statsMu.Unlock() - wh.postEvent(envelope, false) // notify the local node about the new message - if wh.mailServer != nil { - wh.mailServer.Archive(envelope) + w.statsMu.Lock() + w.stats.memoryUsed += envelope.size() + w.statsMu.Unlock() + w.postEvent(envelope, false) // notify the local node about the new message + if w.mailServer != nil { + w.mailServer.Archive(envelope) } } return true, nil @@ -838,9 +836,8 @@ func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error // because it's a once in a session experience derivedKey := pbkdf2.Key(key, nil, 65356, aesKeyLength, sha256.New) return derivedKey, nil - } else { - return nil, unknownVersionError(version) } + return nil, unknownVersionError(version) } // GenerateRandomID generates a random string, which is then returned to be used as a key id |