aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv6
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv6')
-rw-r--r--whisper/whisperv6/api.go36
-rw-r--r--whisper/whisperv6/doc.go2
-rw-r--r--whisper/whisperv6/envelope.go6
-rw-r--r--whisper/whisperv6/filter.go110
-rw-r--r--whisper/whisperv6/filter_test.go29
-rw-r--r--whisper/whisperv6/peer.go20
-rw-r--r--whisper/whisperv6/peer_test.go24
-rw-r--r--whisper/whisperv6/whisper.go56
-rw-r--r--whisper/whisperv6/whisper_test.go29
9 files changed, 150 insertions, 162 deletions
diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go
index a2c75a41c..96e2b17e7 100644
--- a/whisper/whisperv6/api.go
+++ b/whisper/whisperv6/api.go
@@ -61,32 +61,9 @@ func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI {
w: w,
lastUsed: make(map[string]time.Time),
}
-
- go api.run()
return api
}
-// run the api event loop.
-// this loop deletes filter that have not been used within filterTimeout
-func (api *PublicWhisperAPI) run() {
- timeout := time.NewTicker(2 * time.Minute)
- for {
- <-timeout.C
-
- api.mu.Lock()
- for id, lastUsed := range api.lastUsed {
- if time.Since(lastUsed).Seconds() >= filterTimeout {
- delete(api.lastUsed, id)
- if err := api.w.Unsubscribe(id); err != nil {
- log.Error("could not unsubscribe whisper filter", "error", err)
- }
- log.Debug("delete whisper filter (timeout)", "id", id)
- }
- }
- api.mu.Unlock()
- }
-}
-
// Version returns the Whisper sub-protocol version.
func (api *PublicWhisperAPI) Version(ctx context.Context) string {
return ProtocolVersionStr
@@ -219,6 +196,19 @@ func (api *PublicWhisperAPI) DeleteSymKey(ctx context.Context, id string) bool {
return api.w.DeleteSymKey(id)
}
+// MakeLightClient turns the node into light client, which does not forward
+// any incoming messages, and sends only messages originated in this node.
+func (api *PublicWhisperAPI) MakeLightClient(ctx context.Context) bool {
+ api.w.lightClient = true
+ return api.w.lightClient
+}
+
+// CancelLightClient cancels light client mode.
+func (api *PublicWhisperAPI) CancelLightClient(ctx context.Context) bool {
+ api.w.lightClient = false
+ return !api.w.lightClient
+}
+
//go:generate gencodec -type NewMessage -field-override newMessageOverride -out gen_newmessage_json.go
// NewMessage represents a new whisper message that is posted through the RPC.
diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go
index d5d7fed60..066a9766d 100644
--- a/whisper/whisperv6/doc.go
+++ b/whisper/whisperv6/doc.go
@@ -60,7 +60,7 @@ const (
aesKeyLength = 32 // in bytes
aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize()
keyIDSize = 32 // in bytes
- bloomFilterSize = 64 // in bytes
+ BloomFilterSize = 64 // in bytes
flagsLength = 1
EnvelopeHeaderLength = 20
diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go
index c7bea2bb9..2f947f1a4 100644
--- a/whisper/whisperv6/envelope.go
+++ b/whisper/whisperv6/envelope.go
@@ -208,6 +208,10 @@ func (e *Envelope) OpenSymmetric(key []byte) (msg *ReceivedMessage, err error) {
// Open tries to decrypt an envelope, and populates the message fields in case of success.
func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
+ if watcher == nil {
+ return nil
+ }
+
// The API interface forbids filters doing both symmetric and asymmetric encryption.
if watcher.expectsAsymmetricEncryption() && watcher.expectsSymmetricEncryption() {
return nil
@@ -249,7 +253,7 @@ func (e *Envelope) Bloom() []byte {
// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes)
func TopicToBloom(topic TopicType) []byte {
- b := make([]byte, bloomFilterSize)
+ b := make([]byte, BloomFilterSize)
var index [3]int
for j := 0; j < 3; j++ {
index[j] = int(topic[j])
diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go
index eb0c65fa3..2f170ddeb 100644
--- a/whisper/whisperv6/filter.go
+++ b/whisper/whisperv6/filter.go
@@ -35,6 +35,7 @@ type Filter struct {
PoW float64 // Proof of work as described in the Whisper spec
AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
+ id string // unique identifier
Messages map[common.Hash]*ReceivedMessage
mutex sync.RWMutex
@@ -43,15 +44,21 @@ type Filter struct {
// Filters represents a collection of filters
type Filters struct {
watchers map[string]*Filter
- whisper *Whisper
- mutex sync.RWMutex
+
+ topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic
+ allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is
+
+ whisper *Whisper
+ mutex sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters {
return &Filters{
- watchers: make(map[string]*Filter),
- whisper: w,
+ watchers: make(map[string]*Filter),
+ topicMatcher: make(map[TopicType]map[*Filter]struct{}),
+ allTopicsMatcher: make(map[*Filter]struct{}),
+ whisper: w,
}
}
@@ -81,7 +88,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
+ watcher.id = id
fs.watchers[id] = watcher
+ fs.addTopicMatcher(watcher)
return id, err
}
@@ -91,12 +100,51 @@ func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock()
defer fs.mutex.Unlock()
if fs.watchers[id] != nil {
+ fs.removeFromTopicMatchers(fs.watchers[id])
delete(fs.watchers, id)
return true
}
return false
}
+// addTopicMatcher adds a filter to the topic matchers.
+// If the filter's Topics array is empty, it will be tried on every topic.
+// Otherwise, it will be tried on the topics specified.
+func (fs *Filters) addTopicMatcher(watcher *Filter) {
+ if len(watcher.Topics) == 0 {
+ fs.allTopicsMatcher[watcher] = struct{}{}
+ } else {
+ for _, t := range watcher.Topics {
+ topic := BytesToTopic(t)
+ if fs.topicMatcher[topic] == nil {
+ fs.topicMatcher[topic] = make(map[*Filter]struct{})
+ }
+ fs.topicMatcher[topic][watcher] = struct{}{}
+ }
+ }
+}
+
+// removeFromTopicMatchers removes a filter from the topic matchers
+func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
+ delete(fs.allTopicsMatcher, watcher)
+ for _, topic := range watcher.Topics {
+ delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
+ }
+}
+
+// getWatchersByTopic returns a slice containing the filters that
+// match a specific topic
+func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
+ res := make([]*Filter, 0, len(fs.allTopicsMatcher))
+ for watcher := range fs.allTopicsMatcher {
+ res = append(res, watcher)
+ }
+ for watcher := range fs.topicMatcher[topic] {
+ res = append(res, watcher)
+ }
+ return res
+}
+
// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock()
@@ -112,11 +160,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
- i := -1 // only used for logging info
- for _, watcher := range fs.watchers {
- i++
+ candidates := fs.getWatchersByTopic(env.Topic)
+ for _, watcher := range candidates {
if p2pMessage && !watcher.AllowP2P {
- log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i))
+ log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
continue
}
@@ -128,10 +175,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match {
msg = env.Open(watcher)
if msg == nil {
- log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
+ log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
}
} else {
- log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
+ log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
}
}
@@ -144,20 +191,6 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
}
}
-func (f *Filter) processEnvelope(env *Envelope) *ReceivedMessage {
- if f.MatchEnvelope(env) {
- msg := env.Open(f)
- if msg != nil {
- return msg
- }
-
- log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex())
- } else {
- log.Trace("processing envelope: does not match", "hash", env.Hash().Hex())
- }
- return nil
-}
-
func (f *Filter) expectsAsymmetricEncryption() bool {
return f.KeyAsym != nil
}
@@ -194,16 +227,17 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) {
// MatchMessage checks if the filter matches an already decrypted
// message (i.e. a Message that has already been handled by
-// MatchEnvelope when checked by a previous filter)
+// MatchEnvelope when checked by a previous filter).
+// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
if f.PoW > 0 && msg.PoW < f.PoW {
return false
}
if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() {
- return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic)
+ return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst)
} else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() {
- return f.SymKeyHash == msg.SymKeyHash && f.MatchTopic(msg.Topic)
+ return f.SymKeyHash == msg.SymKeyHash
}
return false
}
@@ -211,27 +245,9 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
// MatchEnvelope checks if it's worth decrypting the message. If
// it returns `true`, client code is expected to attempt decrypting
// the message and subsequently call MatchMessage.
+// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
- if f.PoW > 0 && envelope.pow < f.PoW {
- return false
- }
-
- return f.MatchTopic(envelope.Topic)
-}
-
-// MatchTopic checks that the filter captures a given topic.
-func (f *Filter) MatchTopic(topic TopicType) bool {
- if len(f.Topics) == 0 {
- // any topic matches
- return true
- }
-
- for _, bt := range f.Topics {
- if matchSingleTopic(topic, bt) {
- return true
- }
- }
- return false
+ return f.PoW <= 0 || envelope.pow >= f.PoW
}
func matchSingleTopic(topic TopicType, bt []byte) bool {
diff --git a/whisper/whisperv6/filter_test.go b/whisper/whisperv6/filter_test.go
index e7230ef38..0bb7986c3 100644
--- a/whisper/whisperv6/filter_test.go
+++ b/whisper/whisperv6/filter_test.go
@@ -303,9 +303,8 @@ func TestMatchEnvelope(t *testing.T) {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
}
- params.Topic[0] = 0xFF // ensure mismatch
+ params.Topic[0] = 0xFF // topic mismatch
- // mismatch with pseudo-random data
msg, err := NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
@@ -314,14 +313,6 @@ func TestMatchEnvelope(t *testing.T) {
if err != nil {
t.Fatalf("failed Wrap with seed %d: %s.", seed, err)
}
- match := fsym.MatchEnvelope(env)
- if match {
- t.Fatalf("failed MatchEnvelope symmetric with seed %d.", seed)
- }
- match = fasym.MatchEnvelope(env)
- if match {
- t.Fatalf("failed MatchEnvelope asymmetric with seed %d.", seed)
- }
// encrypt symmetrically
i := mrand.Int() % 4
@@ -337,7 +328,7 @@ func TestMatchEnvelope(t *testing.T) {
}
// symmetric + matching topic: match
- match = fsym.MatchEnvelope(env)
+ match := fsym.MatchEnvelope(env)
if !match {
t.Fatalf("failed MatchEnvelope() symmetric with seed %d.", seed)
}
@@ -396,7 +387,7 @@ func TestMatchEnvelope(t *testing.T) {
// asymmetric + matching topic: match
fasym.Topics[i] = fasym.Topics[i+1]
match = fasym.MatchEnvelope(env)
- if match {
+ if !match {
t.Fatalf("failed MatchEnvelope(asymmetric + matching topic) with seed %d.", seed)
}
@@ -431,7 +422,8 @@ func TestMatchEnvelope(t *testing.T) {
// filter with topic + envelope without topic: mismatch
fasym.Topics = fsym.Topics
match = fasym.MatchEnvelope(env)
- if match {
+ if !match {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
t.Fatalf("failed MatchEnvelope(filter without topic + envelope without topic) with seed %d.", seed)
}
}
@@ -487,7 +479,8 @@ func TestMatchMessageSym(t *testing.T) {
// topic mismatch
f.Topics[index][0]++
- if f.MatchMessage(msg) {
+ if !f.MatchMessage(msg) {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed)
}
f.Topics[index][0]--
@@ -580,7 +573,8 @@ func TestMatchMessageAsym(t *testing.T) {
// topic mismatch
f.Topics[index][0]++
- if f.MatchMessage(msg) {
+ if !f.MatchMessage(msg) {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed)
}
f.Topics[index][0]--
@@ -829,8 +823,9 @@ func TestVariableTopics(t *testing.T) {
f.Topics[i][lastTopicByte]++
match = f.MatchEnvelope(env)
- if match {
- t.Fatalf("MatchEnvelope symmetric with seed %d, step %d: false positive.", seed, i)
+ if !match {
+ // topic mismatch should have no affect, as topics are handled by topic matchers
+ t.Fatalf("MatchEnvelope symmetric with seed %d, step %d.", seed, i)
}
}
}
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go
index 4ef0f3c43..2bf1c905b 100644
--- a/whisper/whisperv6/peer.go
+++ b/whisper/whisperv6/peer.go
@@ -19,6 +19,7 @@ package whisperv6
import (
"fmt"
"math"
+ "sync"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -36,6 +37,7 @@ type Peer struct {
trusted bool
powRequirement float64
+ bloomMu sync.Mutex
bloomFilter []byte
fullNode bool
@@ -54,7 +56,7 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
powRequirement: 0.0,
known: set.New(),
quit: make(chan struct{}),
- bloomFilter: makeFullNodeBloom(),
+ bloomFilter: MakeFullNodeBloom(),
fullNode: true,
}
}
@@ -118,7 +120,7 @@ func (peer *Peer) handshake() error {
err = s.Decode(&bloom)
if err == nil {
sz := len(bloom)
- if sz != bloomFilterSize && sz != 0 {
+ if sz != BloomFilterSize && sz != 0 {
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
}
peer.setBloomFilter(bloom)
@@ -225,20 +227,24 @@ func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
}
func (peer *Peer) bloomMatch(env *Envelope) bool {
- return peer.fullNode || bloomFilterMatch(peer.bloomFilter, env.Bloom())
+ peer.bloomMu.Lock()
+ defer peer.bloomMu.Unlock()
+ return peer.fullNode || BloomFilterMatch(peer.bloomFilter, env.Bloom())
}
func (peer *Peer) setBloomFilter(bloom []byte) {
+ peer.bloomMu.Lock()
+ defer peer.bloomMu.Unlock()
peer.bloomFilter = bloom
peer.fullNode = isFullNode(bloom)
if peer.fullNode && peer.bloomFilter == nil {
- peer.bloomFilter = makeFullNodeBloom()
+ peer.bloomFilter = MakeFullNodeBloom()
}
}
-func makeFullNodeBloom() []byte {
- bloom := make([]byte, bloomFilterSize)
- for i := 0; i < bloomFilterSize; i++ {
+func MakeFullNodeBloom() []byte {
+ bloom := make([]byte, BloomFilterSize)
+ for i := 0; i < BloomFilterSize; i++ {
bloom[i] = 0xFF
}
return bloom
diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go
index 9ce5eed8b..ec985ae65 100644
--- a/whisper/whisperv6/peer_test.go
+++ b/whisper/whisperv6/peer_test.go
@@ -23,6 +23,7 @@ import (
mrand "math/rand"
"net"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -71,7 +72,7 @@ var keys = []string{
}
type TestData struct {
- started int
+ started int64
counter [NumNodes]int
mutex sync.RWMutex
}
@@ -151,7 +152,7 @@ func resetParams(t *testing.T) {
}
func initBloom(t *testing.T) {
- masterBloomFilter = make([]byte, bloomFilterSize)
+ masterBloomFilter = make([]byte, BloomFilterSize)
_, err := mrand.Read(masterBloomFilter)
if err != nil {
t.Fatalf("rand failed: %s.", err)
@@ -163,7 +164,7 @@ func initBloom(t *testing.T) {
masterBloomFilter[i] = 0xFF
}
- if !bloomFilterMatch(masterBloomFilter, msgBloom) {
+ if !BloomFilterMatch(masterBloomFilter, msgBloom) {
t.Fatalf("bloom mismatch on initBloom.")
}
}
@@ -177,7 +178,7 @@ func initialize(t *testing.T) {
for i := 0; i < NumNodes; i++ {
var node TestNode
- b := make([]byte, bloomFilterSize)
+ b := make([]byte, BloomFilterSize)
copy(b, masterBloomFilter)
node.shh = New(&DefaultConfig)
node.shh.SetMinimumPoW(masterPow)
@@ -240,9 +241,7 @@ func startServer(t *testing.T, s *p2p.Server) {
t.Fatalf("failed to start the fisrt server.")
}
- result.mutex.Lock()
- defer result.mutex.Unlock()
- result.started++
+ atomic.AddInt64(&result.started, 1)
}
func stopServers() {
@@ -472,7 +471,10 @@ func checkPowExchange(t *testing.T) {
func checkBloomFilterExchangeOnce(t *testing.T, mustPass bool) bool {
for i, node := range nodes {
for peer := range node.shh.peers {
- if !bytes.Equal(peer.bloomFilter, masterBloomFilter) {
+ peer.bloomMu.Lock()
+ equals := bytes.Equal(peer.bloomFilter, masterBloomFilter)
+ peer.bloomMu.Unlock()
+ if !equals {
if mustPass {
t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got",
i, round, masterBloomFilter, peer.bloomFilter)
@@ -500,11 +502,13 @@ func checkBloomFilterExchange(t *testing.T) {
func waitForServersToStart(t *testing.T) {
const iterations = 200
+ var started int64
for j := 0; j < iterations; j++ {
time.Sleep(50 * time.Millisecond)
- if result.started == NumNodes {
+ started = atomic.LoadInt64(&result.started)
+ if started == NumNodes {
return
}
}
- t.Fatalf("Failed to start all the servers, running: %d", result.started)
+ t.Fatalf("Failed to start all the servers, running: %d", started)
}
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 600f9cb28..880cced09 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -82,6 +82,8 @@ type Whisper struct {
syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
+ lightClient bool // indicates is this node is pure light client (does not forward any messages)
+
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
@@ -230,11 +232,11 @@ func (whisper *Whisper) SetMaxMessageSize(size uint32) error {
// SetBloomFilter sets the new bloom filter
func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
- if len(bloom) != bloomFilterSize {
+ if len(bloom) != BloomFilterSize {
return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
}
- b := make([]byte, bloomFilterSize)
+ b := make([]byte, BloomFilterSize)
copy(b, bloom)
whisper.settings.Store(bloomFilterIdx, b)
@@ -556,14 +558,14 @@ func (whisper *Whisper) Subscribe(f *Filter) (string, error) {
// updateBloomFilter recalculates the new value of bloom filter,
// and informs the peers if necessary.
func (whisper *Whisper) updateBloomFilter(f *Filter) {
- aggregate := make([]byte, bloomFilterSize)
+ aggregate := make([]byte, BloomFilterSize)
for _, t := range f.Topics {
top := BytesToTopic(t)
b := TopicToBloom(top)
aggregate = addBloom(aggregate, b)
}
- if !bloomFilterMatch(whisper.BloomFilter(), aggregate) {
+ if !BloomFilterMatch(whisper.BloomFilter(), aggregate) {
// existing bloom filter must be updated
aggregate = addBloom(whisper.BloomFilter(), aggregate)
whisper.SetBloomFilter(aggregate)
@@ -587,11 +589,8 @@ func (whisper *Whisper) Unsubscribe(id string) error {
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func (whisper *Whisper) Send(envelope *Envelope) error {
- ok, err := whisper.add(envelope)
- if err != nil {
- return err
- }
- if !ok {
+ ok, err := whisper.add(envelope, false)
+ if err == nil && !ok {
return fmt.Errorf("failed to add envelope")
}
return err
@@ -673,7 +672,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
trouble := false
for _, env := range envelopes {
- cached, err := whisper.add(env)
+ cached, err := whisper.add(env, whisper.lightClient)
if err != nil {
trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
@@ -702,7 +701,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
case bloomFilterExCode:
var bloom []byte
err := packet.Decode(&bloom)
- if err == nil && len(bloom) != bloomFilterSize {
+ if err == nil && len(bloom) != BloomFilterSize {
err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
}
@@ -746,7 +745,8 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
-func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
+// param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
+func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
now := uint32(time.Now().Unix())
sent := envelope.Expiry - envelope.TTL
@@ -779,11 +779,11 @@ func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
}
}
- if !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
+ if !BloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by BloomFilterTolerance()
// for a short period of peer synchronization.
- if !bloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
+ if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
}
@@ -811,7 +811,7 @@ func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
whisper.statsMu.Lock()
whisper.stats.memoryUsed += envelope.size()
whisper.statsMu.Unlock()
- whisper.postEvent(envelope, false) // notify the local node about the new message
+ whisper.postEvent(envelope, isP2P) // notify the local node about the new message
if whisper.mailServer != nil {
whisper.mailServer.Archive(envelope)
}
@@ -928,24 +928,6 @@ func (whisper *Whisper) Envelopes() []*Envelope {
return all
}
-// Messages iterates through all currently floating envelopes
-// and retrieves all the messages, that this filter could decrypt.
-func (whisper *Whisper) Messages(id string) []*ReceivedMessage {
- result := make([]*ReceivedMessage, 0)
- whisper.poolMu.RLock()
- defer whisper.poolMu.RUnlock()
-
- if filter := whisper.filters.Get(id); filter != nil {
- for _, env := range whisper.envelopes {
- msg := filter.processEnvelope(env)
- if msg != nil {
- result = append(result, msg)
- }
- }
- }
- return result
-}
-
// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool {
whisper.poolMu.Lock()
@@ -1043,12 +1025,12 @@ func isFullNode(bloom []byte) bool {
return true
}
-func bloomFilterMatch(filter, sample []byte) bool {
+func BloomFilterMatch(filter, sample []byte) bool {
if filter == nil {
return true
}
- for i := 0; i < bloomFilterSize; i++ {
+ for i := 0; i < BloomFilterSize; i++ {
f := filter[i]
s := sample[i]
if (f | s) != f {
@@ -1060,8 +1042,8 @@ func bloomFilterMatch(filter, sample []byte) bool {
}
func addBloom(a, b []byte) []byte {
- c := make([]byte, bloomFilterSize)
- for i := 0; i < bloomFilterSize; i++ {
+ c := make([]byte, BloomFilterSize)
+ for i := 0; i < BloomFilterSize; i++ {
c[i] = a[i] | b[i]
}
return c
diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go
index 99e5f0bbb..7fe256309 100644
--- a/whisper/whisperv6/whisper_test.go
+++ b/whisper/whisperv6/whisper_test.go
@@ -75,10 +75,6 @@ func TestWhisperBasic(t *testing.T) {
if len(mail) != 0 {
t.Fatalf("failed w.Envelopes().")
}
- m := w.Messages("non-existent")
- if len(m) != 0 {
- t.Fatalf("failed w.Messages.")
- }
derived := pbkdf2.Key([]byte(peerID), nil, 65356, aesKeyLength, sha256.New)
if !validateDataIntegrity(derived, aesKeyLength) {
@@ -593,7 +589,7 @@ func TestCustomization(t *testing.T) {
}
// check w.messages()
- id, err := w.Subscribe(f)
+ _, err = w.Subscribe(f)
if err != nil {
t.Fatalf("failed subscribe with seed %d: %s.", seed, err)
}
@@ -602,11 +598,6 @@ func TestCustomization(t *testing.T) {
if len(mail) > 0 {
t.Fatalf("received premature mail")
}
-
- mail = w.Messages(id)
- if len(mail) != 2 {
- t.Fatalf("failed to get whisper messages")
- }
}
func TestSymmetricSendCycle(t *testing.T) {
@@ -835,11 +826,11 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
func TestBloom(t *testing.T) {
topic := TopicType{0, 0, 255, 6}
b := TopicToBloom(topic)
- x := make([]byte, bloomFilterSize)
+ x := make([]byte, BloomFilterSize)
x[0] = byte(1)
x[32] = byte(1)
- x[bloomFilterSize-1] = byte(128)
- if !bloomFilterMatch(x, b) || !bloomFilterMatch(b, x) {
+ x[BloomFilterSize-1] = byte(128)
+ if !BloomFilterMatch(x, b) || !BloomFilterMatch(b, x) {
t.Fatalf("bloom filter does not match the mask")
}
@@ -851,11 +842,11 @@ func TestBloom(t *testing.T) {
if err != nil {
t.Fatalf("math rand error")
}
- if !bloomFilterMatch(b, b) {
+ if !BloomFilterMatch(b, b) {
t.Fatalf("bloom filter does not match self")
}
x = addBloom(x, b)
- if !bloomFilterMatch(x, b) {
+ if !BloomFilterMatch(x, b) {
t.Fatalf("bloom filter does not match combined bloom")
}
if !isFullNode(nil) {
@@ -865,16 +856,16 @@ func TestBloom(t *testing.T) {
if isFullNode(x) {
t.Fatalf("isFullNode false positive")
}
- for i := 0; i < bloomFilterSize; i++ {
+ for i := 0; i < BloomFilterSize; i++ {
b[i] = byte(255)
}
if !isFullNode(b) {
t.Fatalf("isFullNode false negative")
}
- if bloomFilterMatch(x, b) {
+ if BloomFilterMatch(x, b) {
t.Fatalf("bloomFilterMatch false positive")
}
- if !bloomFilterMatch(b, x) {
+ if !BloomFilterMatch(b, x) {
t.Fatalf("bloomFilterMatch false negative")
}
@@ -888,7 +879,7 @@ func TestBloom(t *testing.T) {
t.Fatalf("failed to set bloom filter: %s", err)
}
f = w.BloomFilter()
- if !bloomFilterMatch(f, x) || !bloomFilterMatch(x, f) {
+ if !BloomFilterMatch(f, x) || !BloomFilterMatch(x, f) {
t.Fatalf("retireved wrong bloom filter")
}
}