aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv5/whisper.go
diff options
context:
space:
mode:
authorgluk256 <gluk256@users.noreply.github.com>2016-12-20 07:58:01 +0800
committerFelix Lange <fjl@users.noreply.github.com>2016-12-20 07:58:01 +0800
commitba996f5e27572e853bcc5c815ae72082a15c9183 (patch)
tree16d6fd41d3d77208597683c71bdc9af603d43a77 /whisper/whisperv5/whisper.go
parent64bf5bafe9ced66bfb11f34fed9181aa89399473 (diff)
downloadgo-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar
go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.gz
go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.bz2
go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.lz
go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.xz
go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.zst
go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.zip
whisper: refactoring (#3411)
* whisper: refactored message processing * whisper: final polishing * whisper: logging updated * whisper: moved the check, changed the default PoW * whisper: refactoring of message queuing * whisper: refactored parameters
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r--whisper/whisperv5/whisper.go105
1 files changed, 78 insertions, 27 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index dc9571f6e..789adbdb3 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"fmt"
+ "runtime"
"sync"
"time"
@@ -45,7 +46,7 @@ type Whisper struct {
symKeys map[string][]byte
keyMu sync.RWMutex
- envelopes map[common.Hash]*Envelope // Pool of messages currently tracked by this node
+ envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
messages map[common.Hash]*ReceivedMessage // Pool of successfully decrypted messages, which are not expired yet
expirations map[uint32]*set.SetNonTS // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
@@ -55,22 +56,28 @@ type Whisper struct {
mailServer MailServer
- quit chan struct{}
- test bool
+ messageQueue chan *Envelope
+ p2pMsgQueue chan *Envelope
+ quit chan struct{}
+
+ overflow bool
+ test bool
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
// Param s should be passed if you want to implement mail server, otherwise nil.
func NewWhisper(server MailServer) *Whisper {
whisper := &Whisper{
- privateKeys: make(map[string]*ecdsa.PrivateKey),
- symKeys: make(map[string][]byte),
- envelopes: make(map[common.Hash]*Envelope),
- messages: make(map[common.Hash]*ReceivedMessage),
- expirations: make(map[uint32]*set.SetNonTS),
- peers: make(map[*Peer]struct{}),
- mailServer: server,
- quit: make(chan struct{}),
+ privateKeys: make(map[string]*ecdsa.PrivateKey),
+ symKeys: make(map[string][]byte),
+ envelopes: make(map[common.Hash]*Envelope),
+ messages: make(map[common.Hash]*ReceivedMessage),
+ expirations: make(map[uint32]*set.SetNonTS),
+ peers: make(map[*Peer]struct{}),
+ mailServer: server,
+ messageQueue: make(chan *Envelope, messageQueueLimit),
+ p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
+ quit: make(chan struct{}),
}
whisper.filters = NewFilters(whisper)
@@ -124,7 +131,7 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, data []byte) error {
return err
}
p.trusted = true
- return p2p.Send(p.ws, mailRequestCode, data)
+ return p2p.Send(p.ws, p2pRequestCode, data)
}
func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
@@ -270,6 +277,12 @@ func (w *Whisper) Send(envelope *Envelope) error {
func (w *Whisper) Start(*p2p.Server) error {
glog.V(logger.Info).Infoln("Whisper started")
go w.update()
+
+ numCPU := runtime.NumCPU()
+ for i := 0; i < numCPU; i++ {
+ go w.processQueue()
+ }
+
return nil
}
@@ -350,10 +363,10 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return fmt.Errorf("garbage received (directMessage)")
}
for _, envelope := range envelopes {
- wh.postEvent(envelope, p2pCode)
+ wh.postEvent(envelope, true)
}
}
- case mailRequestCode:
+ case p2pRequestCode:
// Must be processed if mail server is implemented. Otherwise ignore.
if wh.mailServer != nil {
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
@@ -382,7 +395,7 @@ func (wh *Whisper) add(envelope *Envelope) error {
if sent > now {
if sent-SynchAllowance > now {
- return fmt.Errorf("message created in the future")
+ return fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
} else {
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.calculatePoW(sent - now + 1)
@@ -393,30 +406,31 @@ func (wh *Whisper) add(envelope *Envelope) error {
if envelope.Expiry+SynchAllowance*2 < now {
return fmt.Errorf("very old message")
} else {
+ glog.V(logger.Debug).Infof("expired envelope dropped [%x]", envelope.Hash())
return nil // drop envelope without error
}
}
if len(envelope.Data) > MaxMessageLength {
- return fmt.Errorf("huge messages are not allowed")
+ return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
if len(envelope.Version) > 4 {
- return fmt.Errorf("oversized Version")
+ return fmt.Errorf("oversized version [%x]", envelope.Hash())
}
if len(envelope.AESNonce) > AESNonceMaxLength {
// the standard AES GSM nonce size is 12,
// but const gcmStandardNonceSize cannot be accessed directly
- return fmt.Errorf("oversized AESNonce")
+ return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash())
}
if len(envelope.Salt) > saltLength {
- return fmt.Errorf("oversized Salt")
+ return fmt.Errorf("oversized salt [%x]", envelope.Hash())
}
if envelope.PoW() < MinimumPoW && !wh.test {
- glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f", envelope.PoW())
+ glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash())
return nil // drop envelope without error
}
@@ -436,22 +450,59 @@ func (wh *Whisper) add(envelope *Envelope) error {
wh.poolMu.Unlock()
if alreadyCached {
- glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
+ glog.V(logger.Detail).Infof("whisper envelope already cached [%x]\n", envelope.Hash())
} else {
- wh.postEvent(envelope, messagesCode) // notify the local node about the new message
- glog.V(logger.Detail).Infof("cached whisper envelope %v\n", envelope)
+ glog.V(logger.Detail).Infof("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope)
+ wh.postEvent(envelope, false) // notify the local node about the new message
}
return nil
}
-// postEvent delivers the message to the watchers.
-func (w *Whisper) postEvent(envelope *Envelope, messageCode uint64) {
+// postEvent queues the message for further processing.
+func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than
// currently supported version, we can not decrypt it,
// and therefore just ignore this message
if envelope.Ver() <= EnvelopeVersion {
- // todo: review if you need an additional thread here
- go w.filters.NotifyWatchers(envelope, messageCode)
+ if isP2P {
+ w.p2pMsgQueue <- envelope
+ } else {
+ w.checkOverflow()
+ w.messageQueue <- envelope
+ }
+ }
+}
+
+// checkOverflow checks if message queue overflow occurs and reports it if necessary.
+func (w *Whisper) checkOverflow() {
+ queueSize := len(w.messageQueue)
+
+ if queueSize == messageQueueLimit {
+ if !w.overflow {
+ w.overflow = true
+ glog.V(logger.Warn).Infoln("message queue overflow")
+ }
+ } else if queueSize <= messageQueueLimit/2 {
+ if w.overflow {
+ w.overflow = false
+ }
+ }
+}
+
+// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
+func (w *Whisper) processQueue() {
+ var e *Envelope
+ for {
+ select {
+ case <-w.quit:
+ return
+
+ case e = <-w.messageQueue:
+ w.filters.NotifyWatchers(e, false)
+
+ case e = <-w.p2pMsgQueue:
+ w.filters.NotifyWatchers(e, true)
+ }
}
}