aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv5/whisper.go
diff options
context:
space:
mode:
authorgluk256 <gluk256@users.noreply.github.com>2017-02-24 01:46:32 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-02-24 01:46:32 +0800
commit11539030cdf91f5a74bf205e8f52f9f08aace8e3 (patch)
tree11ad7d946473f8a59cc512bdafcf81627b7993b5 /whisper/whisperv5/whisper.go
parent357732a8404c9fe4d02f041d20a0d05381a3e6d1 (diff)
downloadgo-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.tar
go-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.tar.gz
go-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.tar.bz2
go-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.tar.lz
go-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.tar.xz
go-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.tar.zst
go-tangerine-11539030cdf91f5a74bf205e8f52f9f08aace8e3.zip
whisper: expiry refactoring (#3706)
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r--whisper/whisperv5/whisper.go84
1 files changed, 60 insertions, 24 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index 558e2909f..5062f7b6b 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -35,6 +35,12 @@ import (
set "gopkg.in/fatih/set.v0"
)
+type Statistics struct {
+ messagesCleared int
+ memoryCleared int
+ totalMemoryUsed int
+}
+
// Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Whisper struct {
@@ -59,6 +65,8 @@ type Whisper struct {
p2pMsgQueue chan *Envelope
quit chan struct{}
+ stats Statistics
+
overflow bool
test bool
}
@@ -287,7 +295,8 @@ func (w *Whisper) Unwatch(id string) {
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func (w *Whisper) Send(envelope *Envelope) error {
- return w.add(envelope)
+ _, err := w.add(envelope)
+ return err
}
// Start implements node.Service, starting the background data propagation thread
@@ -360,11 +369,14 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
// inject all envelopes into the internal pool
for _, envelope := range envelopes {
- if err := wh.add(envelope); err != nil {
+ cached, err := wh.add(envelope)
+ if err != nil {
log.Warn(fmt.Sprintf("%v: bad envelope received: [%v], peer will be disconnected", p.peer, err))
return fmt.Errorf("invalid envelope")
}
- p.mark(envelope)
+ if cached {
+ p.mark(envelope)
+ }
}
case p2pCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
@@ -401,13 +413,13 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
-func (wh *Whisper) add(envelope *Envelope) error {
+func (wh *Whisper) add(envelope *Envelope) (bool, error) {
now := uint32(time.Now().Unix())
sent := envelope.Expiry - envelope.TTL
if sent > now {
if sent-SynchAllowance > now {
- return fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
+ return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
} else {
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.calculatePoW(sent - now + 1)
@@ -416,34 +428,34 @@ func (wh *Whisper) add(envelope *Envelope) error {
if envelope.Expiry < now {
if envelope.Expiry+SynchAllowance*2 < now {
- return fmt.Errorf("very old message")
+ return false, fmt.Errorf("very old message")
} else {
log.Debug(fmt.Sprintf("expired envelope dropped [%x]", envelope.Hash()))
- return nil // drop envelope without error
+ return false, nil // drop envelope without error
}
}
if len(envelope.Data) > MaxMessageLength {
- return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
+ return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
if len(envelope.Version) > 4 {
- return fmt.Errorf("oversized version [%x]", envelope.Hash())
+ return false, fmt.Errorf("oversized version [%x]", envelope.Hash())
}
if len(envelope.AESNonce) > AESNonceMaxLength {
// the standard AES GSM nonce size is 12,
// but const gcmStandardNonceSize cannot be accessed directly
- return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash())
+ return false, fmt.Errorf("oversized AESNonce [%x]", envelope.Hash())
}
if len(envelope.Salt) > saltLength {
- return fmt.Errorf("oversized salt [%x]", envelope.Hash())
+ return false, fmt.Errorf("oversized salt [%x]", envelope.Hash())
}
if envelope.PoW() < MinimumPoW && !wh.test {
log.Debug(fmt.Sprintf("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash()))
- return nil // drop envelope without error
+ return false, nil // drop envelope without error
}
hash := envelope.Hash()
@@ -465,12 +477,13 @@ func (wh *Whisper) add(envelope *Envelope) error {
log.Trace(fmt.Sprintf("whisper envelope already cached [%x]\n", envelope.Hash()))
} else {
log.Trace(fmt.Sprintf("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope))
+ wh.stats.totalMemoryUsed += envelope.size()
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
}
}
- return nil
+ return true, nil
}
// postEvent queues the message for further processing.
@@ -545,22 +558,32 @@ func (w *Whisper) expire() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
+ w.stats.clear()
now := uint32(time.Now().Unix())
- for then, hashSet := range w.expirations {
- // Short circuit if a future time
- if then > now {
- continue
+ for expiry, hashSet := range w.expirations {
+ if expiry < now {
+ w.stats.messagesCleared++
+
+ // Dump all expired messages and remove timestamp
+ hashSet.Each(func(v interface{}) bool {
+ sz := w.envelopes[v.(common.Hash)].size()
+ w.stats.memoryCleared += sz
+ w.stats.totalMemoryUsed -= sz
+ delete(w.envelopes, v.(common.Hash))
+ delete(w.messages, v.(common.Hash))
+ return true
+ })
+ w.expirations[expiry].Clear()
+ delete(w.expirations, expiry)
}
- // Dump all expired messages and remove timestamp
- hashSet.Each(func(v interface{}) bool {
- delete(w.envelopes, v.(common.Hash))
- delete(w.messages, v.(common.Hash))
- return true
- })
- w.expirations[then].Clear()
}
}
+func (w *Whisper) Stats() string {
+ return fmt.Sprintf("Latest expiry cycle cleared %d messages (%d bytes). Memory usage: %d bytes.",
+ w.stats.messagesCleared, w.stats.memoryCleared, w.stats.totalMemoryUsed)
+}
+
// envelopes retrieves all the messages currently pooled by the node.
func (w *Whisper) Envelopes() []*Envelope {
w.poolMu.RLock()
@@ -589,6 +612,14 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage {
return result
}
+func (w *Whisper) isEnvelopeCached(hash common.Hash) bool {
+ w.poolMu.Lock()
+ defer w.poolMu.Unlock()
+
+ _, exist := w.envelopes[hash]
+ return exist
+}
+
func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) {
w.poolMu.Lock()
defer w.poolMu.Unlock()
@@ -596,6 +627,11 @@ func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) {
w.messages[msg.EnvelopeHash] = msg
}
+func (s *Statistics) clear() {
+ s.memoryCleared = 0
+ s.messagesCleared = 0
+}
+
func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
}