diff options
author | gluk256 <gluk256@users.noreply.github.com> | 2016-12-20 07:58:01 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2016-12-20 07:58:01 +0800 |
commit | ba996f5e27572e853bcc5c815ae72082a15c9183 (patch) | |
tree | 16d6fd41d3d77208597683c71bdc9af603d43a77 /whisper/whisperv5/whisper.go | |
parent | 64bf5bafe9ced66bfb11f34fed9181aa89399473 (diff) | |
download | go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.gz go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.bz2 go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.lz go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.xz go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.zst go-tangerine-ba996f5e27572e853bcc5c815ae72082a15c9183.zip |
whisper: refactoring (#3411)
* whisper: refactored message processing
* whisper: final polishing
* whisper: logging updated
* whisper: moved the check, changed the default PoW
* whisper: refactoring of message queuing
* whisper: refactored parameters
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r-- | whisper/whisperv5/whisper.go | 105 |
1 files changed, 78 insertions, 27 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index dc9571f6e..789adbdb3 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -22,6 +22,7 @@ import ( crand "crypto/rand" "crypto/sha256" "fmt" + "runtime" "sync" "time" @@ -45,7 +46,7 @@ type Whisper struct { symKeys map[string][]byte keyMu sync.RWMutex - envelopes map[common.Hash]*Envelope // Pool of messages currently tracked by this node + envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node messages map[common.Hash]*ReceivedMessage // Pool of successfully decrypted messages, which are not expired yet expirations map[uint32]*set.SetNonTS // Message expiration pool poolMu sync.RWMutex // Mutex to sync the message and expiration pools @@ -55,22 +56,28 @@ type Whisper struct { mailServer MailServer - quit chan struct{} - test bool + messageQueue chan *Envelope + p2pMsgQueue chan *Envelope + quit chan struct{} + + overflow bool + test bool } // New creates a Whisper client ready to communicate through the Ethereum P2P network. // Param s should be passed if you want to implement mail server, otherwise nil. func NewWhisper(server MailServer) *Whisper { whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - messages: make(map[common.Hash]*ReceivedMessage), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - mailServer: server, - quit: make(chan struct{}), + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + messages: make(map[common.Hash]*ReceivedMessage), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + mailServer: server, + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), } whisper.filters = NewFilters(whisper) @@ -124,7 +131,7 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, data []byte) error { return err } p.trusted = true - return p2p.Send(p.ws, mailRequestCode, data) + return p2p.Send(p.ws, p2pRequestCode, data) } func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { @@ -270,6 +277,12 @@ func (w *Whisper) Send(envelope *Envelope) error { func (w *Whisper) Start(*p2p.Server) error { glog.V(logger.Info).Infoln("Whisper started") go w.update() + + numCPU := runtime.NumCPU() + for i := 0; i < numCPU; i++ { + go w.processQueue() + } + return nil } @@ -350,10 +363,10 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return fmt.Errorf("garbage received (directMessage)") } for _, envelope := range envelopes { - wh.postEvent(envelope, p2pCode) + wh.postEvent(envelope, true) } } - case mailRequestCode: + case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. if wh.mailServer != nil { s := rlp.NewStream(packet.Payload, uint64(packet.Size)) @@ -382,7 +395,7 @@ func (wh *Whisper) add(envelope *Envelope) error { if sent > now { if sent-SynchAllowance > now { - return fmt.Errorf("message created in the future") + return fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) } else { // recalculate PoW, adjusted for the time difference, plus one second for latency envelope.calculatePoW(sent - now + 1) @@ -393,30 +406,31 @@ func (wh *Whisper) add(envelope *Envelope) error { if envelope.Expiry+SynchAllowance*2 < now { return fmt.Errorf("very old message") } else { + glog.V(logger.Debug).Infof("expired envelope dropped [%x]", envelope.Hash()) return nil // drop envelope without error } } if len(envelope.Data) > MaxMessageLength { - return fmt.Errorf("huge messages are not allowed") + return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } if len(envelope.Version) > 4 { - return fmt.Errorf("oversized Version") + return fmt.Errorf("oversized version [%x]", envelope.Hash()) } if len(envelope.AESNonce) > AESNonceMaxLength { // the standard AES GSM nonce size is 12, // but const gcmStandardNonceSize cannot be accessed directly - return fmt.Errorf("oversized AESNonce") + return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash()) } if len(envelope.Salt) > saltLength { - return fmt.Errorf("oversized Salt") + return fmt.Errorf("oversized salt [%x]", envelope.Hash()) } if envelope.PoW() < MinimumPoW && !wh.test { - glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f", envelope.PoW()) + glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash()) return nil // drop envelope without error } @@ -436,22 +450,59 @@ func (wh *Whisper) add(envelope *Envelope) error { wh.poolMu.Unlock() if alreadyCached { - glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + glog.V(logger.Detail).Infof("whisper envelope already cached [%x]\n", envelope.Hash()) } else { - wh.postEvent(envelope, messagesCode) // notify the local node about the new message - glog.V(logger.Detail).Infof("cached whisper envelope %v\n", envelope) + glog.V(logger.Detail).Infof("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope) + wh.postEvent(envelope, false) // notify the local node about the new message } return nil } -// postEvent delivers the message to the watchers. -func (w *Whisper) postEvent(envelope *Envelope, messageCode uint64) { +// postEvent queues the message for further processing. +func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { // if the version of incoming message is higher than // currently supported version, we can not decrypt it, // and therefore just ignore this message if envelope.Ver() <= EnvelopeVersion { - // todo: review if you need an additional thread here - go w.filters.NotifyWatchers(envelope, messageCode) + if isP2P { + w.p2pMsgQueue <- envelope + } else { + w.checkOverflow() + w.messageQueue <- envelope + } + } +} + +// checkOverflow checks if message queue overflow occurs and reports it if necessary. +func (w *Whisper) checkOverflow() { + queueSize := len(w.messageQueue) + + if queueSize == messageQueueLimit { + if !w.overflow { + w.overflow = true + glog.V(logger.Warn).Infoln("message queue overflow") + } + } else if queueSize <= messageQueueLimit/2 { + if w.overflow { + w.overflow = false + } + } +} + +// processQueue delivers the messages to the watchers during the lifetime of the whisper node. +func (w *Whisper) processQueue() { + var e *Envelope + for { + select { + case <-w.quit: + return + + case e = <-w.messageQueue: + w.filters.NotifyWatchers(e, false) + + case e = <-w.p2pMsgQueue: + w.filters.NotifyWatchers(e, true) + } } } |