From 11539030cdf91f5a74bf205e8f52f9f08aace8e3 Mon Sep 17 00:00:00 2001 From: gluk256 Date: Thu, 23 Feb 2017 18:46:32 +0100 Subject: whisper: expiry refactoring (#3706) --- whisper/whisperv5/api.go | 8 ++++ whisper/whisperv5/filter.go | 4 +- whisper/whisperv5/peer.go | 10 +---- whisper/whisperv5/peer_test.go | 2 - whisper/whisperv5/whisper.go | 84 ++++++++++++++++++++++++++++++------------ 5 files changed, 72 insertions(+), 36 deletions(-) (limited to 'whisper/whisperv5') diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go index d34213e05..9b43f7b70 100644 --- a/whisper/whisperv5/api.go +++ b/whisper/whisperv5/api.go @@ -64,6 +64,14 @@ func (api *PublicWhisperAPI) Version() (hexutil.Uint, error) { return hexutil.Uint(api.whisper.Version()), nil } +// Stats returns the Whisper statistics for diagnostics. +func (api *PublicWhisperAPI) Stats() (string, error) { + if api.whisper == nil { + return "", whisperOffLineErr + } + return api.whisper.Stats(), nil +} + // MarkPeerTrusted marks specific peer trusted, which will allow it // to send historic (expired) messages. func (api *PublicWhisperAPI) MarkPeerTrusted(peerID hexutil.Bytes) error { diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go index 8aa7b2429..ffa5ae946 100644 --- a/whisper/whisperv5/filter.go +++ b/whisper/whisperv5/filter.go @@ -18,7 +18,7 @@ package whisperv5 import ( "crypto/ecdsa" - "crypto/rand" + crand "crypto/rand" "fmt" "sync" @@ -55,7 +55,7 @@ func NewFilters(w *Whisper) *Filters { func (fs *Filters) generateRandomID() (id string, err error) { buf := make([]byte, 20) for i := 0; i < 3; i++ { - _, err = rand.Read(buf) + _, err = crand.Read(buf) if err != nil { continue } diff --git a/whisper/whisperv5/peer.go b/whisper/whisperv5/peer.go index e137613f5..315401aea 100644 --- a/whisper/whisperv5/peer.go +++ b/whisper/whisperv5/peer.go @@ -133,20 +133,14 @@ func (peer *Peer) marked(envelope *Envelope) bool { // expire iterates over all the known envelopes in the host and removes all // expired (unknown) ones from the known list. func (peer *Peer) expire() { - // Assemble the list of available envelopes - available := set.NewNonTS() - for _, envelope := range peer.host.Envelopes() { - available.Add(envelope.Hash()) - } - // Cross reference availability with known status unmark := make(map[common.Hash]struct{}) peer.known.Each(func(v interface{}) bool { - if !available.Has(v.(common.Hash)) { + if !peer.host.isEnvelopeCached(v.(common.Hash)) { unmark[v.(common.Hash)] = struct{}{} } return true }) - // Dump all known but unavailable + // Dump all known but no longer cached for hash := range unmark { peer.known.Remove(hash) } diff --git a/whisper/whisperv5/peer_test.go b/whisper/whisperv5/peer_test.go index 3dcf3bc70..e3073bc6c 100644 --- a/whisper/whisperv5/peer_test.go +++ b/whisper/whisperv5/peer_test.go @@ -107,8 +107,6 @@ func TestSimulation(t *testing.T) { } func initialize(t *testing.T) { - // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat()))) - var err error ip := net.IPv4(127, 0, 0, 1) port0 := 30303 diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index 558e2909f..5062f7b6b 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -35,6 +35,12 @@ import ( set "gopkg.in/fatih/set.v0" ) +type Statistics struct { + messagesCleared int + memoryCleared int + totalMemoryUsed int +} + // Whisper represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Whisper struct { @@ -59,6 +65,8 @@ type Whisper struct { p2pMsgQueue chan *Envelope quit chan struct{} + stats Statistics + overflow bool test bool } @@ -287,7 +295,8 @@ func (w *Whisper) Unwatch(id string) { // Send injects a message into the whisper send queue, to be distributed in the // network in the coming cycles. func (w *Whisper) Send(envelope *Envelope) error { - return w.add(envelope) + _, err := w.add(envelope) + return err } // Start implements node.Service, starting the background data propagation thread @@ -360,11 +369,14 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } // inject all envelopes into the internal pool for _, envelope := range envelopes { - if err := wh.add(envelope); err != nil { + cached, err := wh.add(envelope) + if err != nil { log.Warn(fmt.Sprintf("%v: bad envelope received: [%v], peer will be disconnected", p.peer, err)) return fmt.Errorf("invalid envelope") } - p.mark(envelope) + if cached { + p.mark(envelope) + } } case p2pCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. @@ -401,13 +413,13 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. -func (wh *Whisper) add(envelope *Envelope) error { +func (wh *Whisper) add(envelope *Envelope) (bool, error) { now := uint32(time.Now().Unix()) sent := envelope.Expiry - envelope.TTL if sent > now { if sent-SynchAllowance > now { - return fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) + return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) } else { // recalculate PoW, adjusted for the time difference, plus one second for latency envelope.calculatePoW(sent - now + 1) @@ -416,34 +428,34 @@ func (wh *Whisper) add(envelope *Envelope) error { if envelope.Expiry < now { if envelope.Expiry+SynchAllowance*2 < now { - return fmt.Errorf("very old message") + return false, fmt.Errorf("very old message") } else { log.Debug(fmt.Sprintf("expired envelope dropped [%x]", envelope.Hash())) - return nil // drop envelope without error + return false, nil // drop envelope without error } } if len(envelope.Data) > MaxMessageLength { - return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) + return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } if len(envelope.Version) > 4 { - return fmt.Errorf("oversized version [%x]", envelope.Hash()) + return false, fmt.Errorf("oversized version [%x]", envelope.Hash()) } if len(envelope.AESNonce) > AESNonceMaxLength { // the standard AES GSM nonce size is 12, // but const gcmStandardNonceSize cannot be accessed directly - return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash()) + return false, fmt.Errorf("oversized AESNonce [%x]", envelope.Hash()) } if len(envelope.Salt) > saltLength { - return fmt.Errorf("oversized salt [%x]", envelope.Hash()) + return false, fmt.Errorf("oversized salt [%x]", envelope.Hash()) } if envelope.PoW() < MinimumPoW && !wh.test { log.Debug(fmt.Sprintf("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash())) - return nil // drop envelope without error + return false, nil // drop envelope without error } hash := envelope.Hash() @@ -465,12 +477,13 @@ func (wh *Whisper) add(envelope *Envelope) error { log.Trace(fmt.Sprintf("whisper envelope already cached [%x]\n", envelope.Hash())) } else { log.Trace(fmt.Sprintf("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope)) + wh.stats.totalMemoryUsed += envelope.size() wh.postEvent(envelope, false) // notify the local node about the new message if wh.mailServer != nil { wh.mailServer.Archive(envelope) } } - return nil + return true, nil } // postEvent queues the message for further processing. @@ -545,22 +558,32 @@ func (w *Whisper) expire() { w.poolMu.Lock() defer w.poolMu.Unlock() + w.stats.clear() now := uint32(time.Now().Unix()) - for then, hashSet := range w.expirations { - // Short circuit if a future time - if then > now { - continue + for expiry, hashSet := range w.expirations { + if expiry < now { + w.stats.messagesCleared++ + + // Dump all expired messages and remove timestamp + hashSet.Each(func(v interface{}) bool { + sz := w.envelopes[v.(common.Hash)].size() + w.stats.memoryCleared += sz + w.stats.totalMemoryUsed -= sz + delete(w.envelopes, v.(common.Hash)) + delete(w.messages, v.(common.Hash)) + return true + }) + w.expirations[expiry].Clear() + delete(w.expirations, expiry) } - // Dump all expired messages and remove timestamp - hashSet.Each(func(v interface{}) bool { - delete(w.envelopes, v.(common.Hash)) - delete(w.messages, v.(common.Hash)) - return true - }) - w.expirations[then].Clear() } } +func (w *Whisper) Stats() string { + return fmt.Sprintf("Latest expiry cycle cleared %d messages (%d bytes). Memory usage: %d bytes.", + w.stats.messagesCleared, w.stats.memoryCleared, w.stats.totalMemoryUsed) +} + // envelopes retrieves all the messages currently pooled by the node. func (w *Whisper) Envelopes() []*Envelope { w.poolMu.RLock() @@ -589,6 +612,14 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage { return result } +func (w *Whisper) isEnvelopeCached(hash common.Hash) bool { + w.poolMu.Lock() + defer w.poolMu.Unlock() + + _, exist := w.envelopes[hash] + return exist +} + func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) { w.poolMu.Lock() defer w.poolMu.Unlock() @@ -596,6 +627,11 @@ func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) { w.messages[msg.EnvelopeHash] = msg } +func (s *Statistics) clear() { + s.memoryCleared = 0 + s.messagesCleared = 0 +} + func ValidatePublicKey(k *ecdsa.PublicKey) bool { return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0 } -- cgit v1.2.3