aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv5
diff options
context:
space:
mode:
authorgluk256 <gluk256@users.noreply.github.com>2016-12-20 07:58:01 +0800
committerFelix Lange <fjl@users.noreply.github.com>2016-12-20 07:58:01 +0800
commitba996f5e27572e853bcc5c815ae72082a15c9183 (patch)
tree16d6fd41d3d77208597683c71bdc9af603d43a77 /whisper/whisperv5
parent64bf5bafe9ced66bfb11f34fed9181aa89399473 (diff)
downloaddexon-ba996f5e27572e853bcc5c815ae72082a15c9183.tar
dexon-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.gz
dexon-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.bz2
dexon-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.lz
dexon-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.xz
dexon-ba996f5e27572e853bcc5c815ae72082a15c9183.tar.zst
dexon-ba996f5e27572e853bcc5c815ae72082a15c9183.zip
whisper: refactoring (#3411)
* whisper: refactored message processing * whisper: final polishing * whisper: logging updated * whisper: moved the check, changed the default PoW * whisper: refactoring of message queuing * whisper: refactored parameters
Diffstat (limited to 'whisper/whisperv5')
-rw-r--r--whisper/whisperv5/doc.go19
-rw-r--r--whisper/whisperv5/envelope.go18
-rw-r--r--whisper/whisperv5/filter.go20
-rw-r--r--whisper/whisperv5/filter_test.go12
-rw-r--r--whisper/whisperv5/message.go10
-rw-r--r--whisper/whisperv5/message_test.go51
-rw-r--r--whisper/whisperv5/peer_test.go2
-rw-r--r--whisper/whisperv5/topic.go3
-rw-r--r--whisper/whisperv5/whisper.go105
-rw-r--r--whisper/whisperv5/whisper_test.go54
10 files changed, 228 insertions, 66 deletions
diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go
index 223d8246e..e2e255e9e 100644
--- a/whisper/whisperv5/doc.go
+++ b/whisper/whisperv5/doc.go
@@ -15,9 +15,7 @@
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
-Package whisper implements the Whisper PoC-1.
-
-(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec)
+Package whisper implements the Whisper protocol (version 5).
Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP).
As such it may be likened and compared to both, not dissimilar to the
@@ -42,11 +40,11 @@ const (
ProtocolVersionStr = "5.0"
ProtocolName = "shh"
- statusCode = 0
- messagesCode = 1
- p2pCode = 2
- mailRequestCode = 3
- NumberOfMessageCodes = 32
+ statusCode = 0 // used by whisper protocol
+ messagesCode = 1 // normal whisper message
+ p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any futher)
+ p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol
+ NumberOfMessageCodes = 64
paddingMask = byte(3)
signatureFlag = byte(4)
@@ -57,11 +55,12 @@ const (
saltLength = 12
AESNonceMaxLength = 12
- MaxMessageLength = 0xFFFF // todo: remove this restriction after testing in morden and analizing stats. this should be regulated by MinimumPoW.
- MinimumPoW = 10.0 // todo: review
+ MaxMessageLength = 0xFFFF // todo: remove this restriction after testing. this should be regulated by PoW.
+ MinimumPoW = 1.0 // todo: review after testing.
padSizeLimitLower = 128 // it can not be less - we don't want to reveal the absence of signature
padSizeLimitUpper = 256 // just an arbitrary number, could be changed without losing compatibility
+ messageQueueLimit = 1024
expirationCycle = time.Second
transmissionCycle = 300 * time.Millisecond
diff --git a/whisper/whisperv5/envelope.go b/whisper/whisperv5/envelope.go
index 3d048bb44..1b976705d 100644
--- a/whisper/whisperv5/envelope.go
+++ b/whisper/whisperv5/envelope.go
@@ -14,14 +14,14 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// Contains the Whisper protocol Envelope element. For formal details please see
-// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#envelopes.
+// Contains the Whisper protocol Envelope element.
package whisperv5
import (
"crypto/ecdsa"
"encoding/binary"
+ "errors"
"fmt"
"math"
"time"
@@ -86,8 +86,8 @@ func (e *Envelope) Ver() uint64 {
// Seal closes the envelope by spending the requested amount of time as a proof
// of work on hashing the data.
-func (e *Envelope) Seal(options *MessageParams) {
- var target int
+func (e *Envelope) Seal(options *MessageParams) error {
+ var target, bestBit int
if options.PoW == 0 {
// adjust for the duration of Seal() execution only if execution time is predefined unconditionally
e.Expiry += options.WorkTime
@@ -99,7 +99,7 @@ func (e *Envelope) Seal(options *MessageParams) {
h := crypto.Keccak256(e.rlpWithoutNonce())
copy(buf[:32], h)
- finish, bestBit := time.Now().Add(time.Duration(options.WorkTime)*time.Second).UnixNano(), 0
+ finish := time.Now().Add(time.Duration(options.WorkTime) * time.Second).UnixNano()
for nonce := uint64(0); time.Now().UnixNano() < finish; {
for i := 0; i < 1024; i++ {
binary.BigEndian.PutUint64(buf[56:], nonce)
@@ -108,12 +108,18 @@ func (e *Envelope) Seal(options *MessageParams) {
if firstBit > bestBit {
e.EnvNonce, bestBit = nonce, firstBit
if target > 0 && bestBit >= target {
- return
+ return nil
}
}
nonce++
}
}
+
+ if target > 0 && bestBit < target {
+ return errors.New("Failed to reach the PoW target")
+ }
+
+ return nil
}
func (e *Envelope) PoW() float64 {
diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go
index fd5f5083f..3845d0c20 100644
--- a/whisper/whisperv5/filter.go
+++ b/whisper/whisperv5/filter.go
@@ -21,6 +21,8 @@ import (
"sync"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
)
type Filter struct {
@@ -75,11 +77,12 @@ func (fs *Filters) Get(i uint32) *Filter {
return fs.watchers[i]
}
-func (fs *Filters) NotifyWatchers(env *Envelope, messageCode uint64) {
+func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
fs.mutex.RLock()
var msg *ReceivedMessage
- for _, watcher := range fs.watchers {
- if messageCode == p2pCode && !watcher.AcceptP2P {
+ for j, watcher := range fs.watchers {
+ if p2pMessage && !watcher.AcceptP2P {
+ glog.V(logger.Detail).Infof("msg [%x], filter [%d]: p2p messages are not allowed \n", env.Hash(), j)
continue
}
@@ -90,6 +93,11 @@ func (fs *Filters) NotifyWatchers(env *Envelope, messageCode uint64) {
match = watcher.MatchEnvelope(env)
if match {
msg = env.Open(watcher)
+ if msg == nil {
+ glog.V(logger.Detail).Infof("msg [%x], filter [%d]: failed to open \n", env.Hash(), j)
+ }
+ } else {
+ glog.V(logger.Detail).Infof("msg [%x], filter [%d]: does not match \n", env.Hash(), j)
}
}
@@ -137,12 +145,12 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
if f.PoW > 0 && msg.PoW < f.PoW {
return false
}
- if f.Src != nil && !isPubKeyEqual(msg.Src, f.Src) {
+ if f.Src != nil && !IsPubKeyEqual(msg.Src, f.Src) {
return false
}
if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() {
- return isPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic)
+ return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic)
} else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() {
return f.SymKeyHash == msg.SymKeyHash && f.MatchTopic(msg.Topic)
}
@@ -176,7 +184,7 @@ func (f *Filter) MatchTopic(topic TopicType) bool {
return false
}
-func isPubKeyEqual(a, b *ecdsa.PublicKey) bool {
+func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false
} else if !ValidatePublicKey(b) {
diff --git a/whisper/whisperv5/filter_test.go b/whisper/whisperv5/filter_test.go
index 561bb8f7d..9f2a58f28 100644
--- a/whisper/whisperv5/filter_test.go
+++ b/whisper/whisperv5/filter_test.go
@@ -139,7 +139,7 @@ func TestComparePubKey(t *testing.T) {
if err != nil {
t.Fatalf("failed to generate second key with seed %d: %s.", seed, err)
}
- if isPubKeyEqual(&key1.PublicKey, &key2.PublicKey) {
+ if IsPubKeyEqual(&key1.PublicKey, &key2.PublicKey) {
t.Fatalf("public keys are equal, seed %d.", seed)
}
@@ -149,7 +149,7 @@ func TestComparePubKey(t *testing.T) {
if err != nil {
t.Fatalf("failed to generate third key with seed %d: %s.", seed, err)
}
- if isPubKeyEqual(&key1.PublicKey, &key3.PublicKey) {
+ if IsPubKeyEqual(&key1.PublicKey, &key3.PublicKey) {
t.Fatalf("key1 == key3, seed %d.", seed)
}
}
@@ -540,7 +540,7 @@ func TestWatchers(t *testing.T) {
}
for i = 0; i < NumMessages; i++ {
- filters.NotifyWatchers(envelopes[i], messagesCode)
+ filters.NotifyWatchers(envelopes[i], false)
}
var total int
@@ -593,7 +593,7 @@ func TestWatchers(t *testing.T) {
}
for i = 0; i < NumMessages; i++ {
- filters.NotifyWatchers(envelopes[i], messagesCode)
+ filters.NotifyWatchers(envelopes[i], false)
}
for i = 0; i < NumFilters; i++ {
@@ -629,7 +629,7 @@ func TestWatchers(t *testing.T) {
// test AcceptP2P
total = 0
- filters.NotifyWatchers(envelopes[0], p2pCode)
+ filters.NotifyWatchers(envelopes[0], true)
for i = 0; i < NumFilters; i++ {
mail = tst[i].f.Retrieve()
@@ -646,7 +646,7 @@ func TestWatchers(t *testing.T) {
}
f.AcceptP2P = true
total = 0
- filters.NotifyWatchers(envelopes[0], p2pCode)
+ filters.NotifyWatchers(envelopes[0], true)
for i = 0; i < NumFilters; i++ {
mail = tst[i].f.Retrieve()
diff --git a/whisper/whisperv5/message.go b/whisper/whisperv5/message.go
index f3812b1d8..a095f7c0c 100644
--- a/whisper/whisperv5/message.go
+++ b/whisper/whisperv5/message.go
@@ -14,9 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// Contains the Whisper protocol Message element. For formal details please see
-// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#messages.
-// todo: fix the spec link, and move it to doc.go
+// Contains the Whisper protocol Message element.
package whisperv5
@@ -256,7 +254,11 @@ func (msg *SentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
}
envelope = NewEnvelope(options.TTL, options.Topic, salt, nonce, msg)
- envelope.Seal(options)
+ err = envelope.Seal(options)
+ if err != nil {
+ return nil, err
+ }
+
return envelope, nil
}
diff --git a/whisper/whisperv5/message_test.go b/whisper/whisperv5/message_test.go
index cd327d9d9..5cbc9182f 100644
--- a/whisper/whisperv5/message_test.go
+++ b/whisper/whisperv5/message_test.go
@@ -30,11 +30,15 @@ func copyFromBuf(dst []byte, src []byte, beg int) int {
}
func generateMessageParams() (*MessageParams, error) {
+ // set all the parameters except p.Dst
+
buf := make([]byte, 1024)
randomize(buf)
sz := rand.Intn(400)
var p MessageParams
+ p.PoW = 0.01
+ p.WorkTime = 1
p.TTL = uint32(rand.Intn(1024))
p.Payload = make([]byte, sz)
p.Padding = make([]byte, padSizeLimitUpper)
@@ -52,8 +56,6 @@ func generateMessageParams() (*MessageParams, error) {
return nil, err
}
- // p.Dst, p.PoW, p.WorkTime are not set
- p.PoW = 0.01
return &p, nil
}
@@ -114,7 +116,7 @@ func singleMessageTest(t *testing.T, symmetric bool) {
if len(decrypted.Signature) != signatureLength {
t.Fatalf("failed with seed %d: signature len %d.", seed, len(decrypted.Signature))
}
- if !isPubKeyEqual(decrypted.Src, &params.Src.PublicKey) {
+ if !IsPubKeyEqual(decrypted.Src, &params.Src.PublicKey) {
t.Fatalf("failed with seed %d: signature mismatch.", seed)
}
}
@@ -152,6 +154,16 @@ func TestMessageWrap(t *testing.T) {
if pow < target {
t.Fatalf("failed Wrap with seed %d: pow < target (%f vs. %f).", seed, pow, target)
}
+
+ // set PoW target too high, expect error
+ msg2 := NewSentMessage(params)
+ params.TTL = 1000000
+ params.WorkTime = 1
+ params.PoW = 10000000.0
+ env, err = msg2.Wrap(params)
+ if err == nil {
+ t.Fatalf("unexpectedly reached the PoW target with seed %d.", seed)
+ }
}
func TestMessageSeal(t *testing.T) {
@@ -256,7 +268,7 @@ func singleEnvelopeOpenTest(t *testing.T, symmetric bool) {
if len(decrypted.Signature) != signatureLength {
t.Fatalf("failed with seed %d: signature len %d.", seed, len(decrypted.Signature))
}
- if !isPubKeyEqual(decrypted.Src, &params.Src.PublicKey) {
+ if !IsPubKeyEqual(decrypted.Src, &params.Src.PublicKey) {
t.Fatalf("failed with seed %d: signature mismatch.", seed)
}
if decrypted.isAsymmetricEncryption() == symmetric {
@@ -269,8 +281,37 @@ func singleEnvelopeOpenTest(t *testing.T, symmetric bool) {
if decrypted.Dst == nil {
t.Fatalf("failed with seed %d: dst is nil.", seed)
}
- if !isPubKeyEqual(decrypted.Dst, &key.PublicKey) {
+ if !IsPubKeyEqual(decrypted.Dst, &key.PublicKey) {
t.Fatalf("failed with seed %d: Dst.", seed)
}
}
}
+
+func TestEncryptWithZeroKey(t *testing.T) {
+ InitSingleTest()
+
+ params, err := generateMessageParams()
+ if err != nil {
+ t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
+ }
+
+ msg := NewSentMessage(params)
+
+ params.KeySym = make([]byte, aesKeyLength)
+ _, err = msg.Wrap(params)
+ if err == nil {
+ t.Fatalf("wrapped with zero key, seed: %d.", seed)
+ }
+
+ params.KeySym = make([]byte, 0)
+ _, err = msg.Wrap(params)
+ if err == nil {
+ t.Fatalf("wrapped with empty key, seed: %d.", seed)
+ }
+
+ params.KeySym = nil
+ _, err = msg.Wrap(params)
+ if err == nil {
+ t.Fatalf("wrapped with nil key, seed: %d.", seed)
+ }
+}
diff --git a/whisper/whisperv5/peer_test.go b/whisper/whisperv5/peer_test.go
index 082e7f446..88da59bff 100644
--- a/whisper/whisperv5/peer_test.go
+++ b/whisper/whisperv5/peer_test.go
@@ -118,6 +118,7 @@ func initialize(t *testing.T) {
var node TestNode
node.shh = NewWhisper(nil)
node.shh.test = true
+ node.shh.Start(nil)
topics := make([]TopicType, 0)
topics = append(topics, sharedTopic)
f := Filter{KeySym: sharedKey, Topics: topics}
@@ -166,6 +167,7 @@ func stopServers() {
n := nodes[i]
if n != nil {
n.shh.Unwatch(n.filerId)
+ n.shh.Stop()
n.server.Stop()
}
}
diff --git a/whisper/whisperv5/topic.go b/whisper/whisperv5/topic.go
index c29c344be..54d7422d1 100644
--- a/whisper/whisperv5/topic.go
+++ b/whisper/whisperv5/topic.go
@@ -14,8 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// Contains the Whisper protocol Topic element. For formal details please see
-// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics.
+// Contains the Whisper protocol Topic element.
package whisperv5
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index dc9571f6e..789adbdb3 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"fmt"
+ "runtime"
"sync"
"time"
@@ -45,7 +46,7 @@ type Whisper struct {
symKeys map[string][]byte
keyMu sync.RWMutex
- envelopes map[common.Hash]*Envelope // Pool of messages currently tracked by this node
+ envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
messages map[common.Hash]*ReceivedMessage // Pool of successfully decrypted messages, which are not expired yet
expirations map[uint32]*set.SetNonTS // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
@@ -55,22 +56,28 @@ type Whisper struct {
mailServer MailServer
- quit chan struct{}
- test bool
+ messageQueue chan *Envelope
+ p2pMsgQueue chan *Envelope
+ quit chan struct{}
+
+ overflow bool
+ test bool
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
// Param s should be passed if you want to implement mail server, otherwise nil.
func NewWhisper(server MailServer) *Whisper {
whisper := &Whisper{
- privateKeys: make(map[string]*ecdsa.PrivateKey),
- symKeys: make(map[string][]byte),
- envelopes: make(map[common.Hash]*Envelope),
- messages: make(map[common.Hash]*ReceivedMessage),
- expirations: make(map[uint32]*set.SetNonTS),
- peers: make(map[*Peer]struct{}),
- mailServer: server,
- quit: make(chan struct{}),
+ privateKeys: make(map[string]*ecdsa.PrivateKey),
+ symKeys: make(map[string][]byte),
+ envelopes: make(map[common.Hash]*Envelope),
+ messages: make(map[common.Hash]*ReceivedMessage),
+ expirations: make(map[uint32]*set.SetNonTS),
+ peers: make(map[*Peer]struct{}),
+ mailServer: server,
+ messageQueue: make(chan *Envelope, messageQueueLimit),
+ p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
+ quit: make(chan struct{}),
}
whisper.filters = NewFilters(whisper)
@@ -124,7 +131,7 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, data []byte) error {
return err
}
p.trusted = true
- return p2p.Send(p.ws, mailRequestCode, data)
+ return p2p.Send(p.ws, p2pRequestCode, data)
}
func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
@@ -270,6 +277,12 @@ func (w *Whisper) Send(envelope *Envelope) error {
func (w *Whisper) Start(*p2p.Server) error {
glog.V(logger.Info).Infoln("Whisper started")
go w.update()
+
+ numCPU := runtime.NumCPU()
+ for i := 0; i < numCPU; i++ {
+ go w.processQueue()
+ }
+
return nil
}
@@ -350,10 +363,10 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return fmt.Errorf("garbage received (directMessage)")
}
for _, envelope := range envelopes {
- wh.postEvent(envelope, p2pCode)
+ wh.postEvent(envelope, true)
}
}
- case mailRequestCode:
+ case p2pRequestCode:
// Must be processed if mail server is implemented. Otherwise ignore.
if wh.mailServer != nil {
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
@@ -382,7 +395,7 @@ func (wh *Whisper) add(envelope *Envelope) error {
if sent > now {
if sent-SynchAllowance > now {
- return fmt.Errorf("message created in the future")
+ return fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
} else {
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.calculatePoW(sent - now + 1)
@@ -393,30 +406,31 @@ func (wh *Whisper) add(envelope *Envelope) error {
if envelope.Expiry+SynchAllowance*2 < now {
return fmt.Errorf("very old message")
} else {
+ glog.V(logger.Debug).Infof("expired envelope dropped [%x]", envelope.Hash())
return nil // drop envelope without error
}
}
if len(envelope.Data) > MaxMessageLength {
- return fmt.Errorf("huge messages are not allowed")
+ return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
if len(envelope.Version) > 4 {
- return fmt.Errorf("oversized Version")
+ return fmt.Errorf("oversized version [%x]", envelope.Hash())
}
if len(envelope.AESNonce) > AESNonceMaxLength {
// the standard AES GSM nonce size is 12,
// but const gcmStandardNonceSize cannot be accessed directly
- return fmt.Errorf("oversized AESNonce")
+ return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash())
}
if len(envelope.Salt) > saltLength {
- return fmt.Errorf("oversized Salt")
+ return fmt.Errorf("oversized salt [%x]", envelope.Hash())
}
if envelope.PoW() < MinimumPoW && !wh.test {
- glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f", envelope.PoW())
+ glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash())
return nil // drop envelope without error
}
@@ -436,22 +450,59 @@ func (wh *Whisper) add(envelope *Envelope) error {
wh.poolMu.Unlock()
if alreadyCached {
- glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
+ glog.V(logger.Detail).Infof("whisper envelope already cached [%x]\n", envelope.Hash())
} else {
- wh.postEvent(envelope, messagesCode) // notify the local node about the new message
- glog.V(logger.Detail).Infof("cached whisper envelope %v\n", envelope)
+ glog.V(logger.Detail).Infof("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope)
+ wh.postEvent(envelope, false) // notify the local node about the new message
}
return nil
}
-// postEvent delivers the message to the watchers.
-func (w *Whisper) postEvent(envelope *Envelope, messageCode uint64) {
+// postEvent queues the message for further processing.
+func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than
// currently supported version, we can not decrypt it,
// and therefore just ignore this message
if envelope.Ver() <= EnvelopeVersion {
- // todo: review if you need an additional thread here
- go w.filters.NotifyWatchers(envelope, messageCode)
+ if isP2P {
+ w.p2pMsgQueue <- envelope
+ } else {
+ w.checkOverflow()
+ w.messageQueue <- envelope
+ }
+ }
+}
+
+// checkOverflow checks if message queue overflow occurs and reports it if necessary.
+func (w *Whisper) checkOverflow() {
+ queueSize := len(w.messageQueue)
+
+ if queueSize == messageQueueLimit {
+ if !w.overflow {
+ w.overflow = true
+ glog.V(logger.Warn).Infoln("message queue overflow")
+ }
+ } else if queueSize <= messageQueueLimit/2 {
+ if w.overflow {
+ w.overflow = false
+ }
+ }
+}
+
+// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
+func (w *Whisper) processQueue() {
+ var e *Envelope
+ for {
+ select {
+ case <-w.quit:
+ return
+
+ case e = <-w.messageQueue:
+ w.filters.NotifyWatchers(e, false)
+
+ case e = <-w.p2pMsgQueue:
+ w.filters.NotifyWatchers(e, true)
+ }
}
}
diff --git a/whisper/whisperv5/whisper_test.go b/whisper/whisperv5/whisper_test.go
index 3f79a72c8..9af95f445 100644
--- a/whisper/whisperv5/whisper_test.go
+++ b/whisper/whisperv5/whisper_test.go
@@ -19,6 +19,7 @@ package whisperv5
import (
"bytes"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@@ -309,3 +310,56 @@ func TestWhisperSymKeyManagement(t *testing.T) {
t.Fatalf("failed to delete second key: second key is not nil.")
}
}
+
+func TestExpiry(t *testing.T) {
+ InitSingleTest()
+
+ w := NewWhisper(nil)
+ w.test = true
+ w.Start(nil)
+ defer w.Stop()
+
+ params, err := generateMessageParams()
+ if err != nil {
+ t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
+ }
+
+ params.TTL = 1
+ msg := NewSentMessage(params)
+ env, err := msg.Wrap(params)
+ if err != nil {
+ t.Fatalf("failed Wrap with seed %d: %s.", seed, err)
+ }
+
+ err = w.Send(env)
+ if err != nil {
+ t.Fatalf("failed to send envelope with seed %d: %s.", seed, err)
+ }
+
+ // wait till received or timeout
+ var received, expired bool
+ for j := 0; j < 20; j++ {
+ time.Sleep(100 * time.Millisecond)
+ if len(w.Envelopes()) > 0 {
+ received = true
+ break
+ }
+ }
+
+ if !received {
+ t.Fatalf("did not receive the sent envelope, seed: %d.", seed)
+ }
+
+ // wait till expired or timeout
+ for j := 0; j < 20; j++ {
+ time.Sleep(100 * time.Millisecond)
+ if len(w.Envelopes()) == 0 {
+ expired = true
+ break
+ }
+ }
+
+ if !expired {
+ t.Fatalf("expire failed, seed: %d.", seed)
+ }
+}