aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv6/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv6/peer.go')
-rw-r--r--whisper/whisperv6/peer.go54
1 files changed, 50 insertions, 4 deletions
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go
index 65e0c77b0..08071c0f7 100644
--- a/whisper/whisperv6/peer.go
+++ b/whisper/whisperv6/peer.go
@@ -36,6 +36,7 @@ type Peer struct {
trusted bool
powRequirement float64
+ bloomFilter []byte // may contain nil in case of full node
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
@@ -74,8 +75,12 @@ func (p *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
go func() {
- errc <- p2p.Send(p.ws, statusCode, ProtocolVersion)
+ pow := p.host.MinPow()
+ powConverted := math.Float64bits(pow)
+ bloom := p.host.BloomFilter()
+ errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom)
}()
+
// Fetch the remote status packet and verify protocol match
packet, err := p.ws.ReadMsg()
if err != nil {
@@ -85,14 +90,42 @@ func (p *Peer) handshake() error {
return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code)
}
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
- peerVersion, err := s.Uint()
+ _, err = s.List()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err)
}
+ peerVersion, err := s.Uint()
+ if err != nil {
+ return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", p.ID(), err)
+ }
if peerVersion != ProtocolVersion {
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion)
}
- // Wait until out own status is consumed too
+
+ // only version is mandatory, subsequent parameters are optional
+ powRaw, err := s.Uint()
+ if err == nil {
+ pow := math.Float64frombits(powRaw)
+ if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
+ return fmt.Errorf("peer [%x] sent bad status message: invalid pow", p.ID())
+ }
+ p.powRequirement = pow
+
+ var bloom []byte
+ err = s.Decode(&bloom)
+ if err == nil {
+ sz := len(bloom)
+ if sz != bloomFilterSize && sz != 0 {
+ return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz)
+ }
+ if isFullNode(bloom) {
+ p.bloomFilter = nil
+ } else {
+ p.bloomFilter = bloom
+ }
+ }
+ }
+
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err)
}
@@ -156,7 +189,7 @@ func (p *Peer) broadcast() error {
envelopes := p.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
- if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
+ if !p.marked(envelope) && envelope.PoW() >= p.powRequirement && p.bloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
@@ -186,3 +219,16 @@ func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.ws, powRequirementCode, i)
}
+
+func (p *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
+ return p2p.Send(p.ws, bloomFilterExCode, bloom)
+}
+
+func (p *Peer) bloomMatch(env *Envelope) bool {
+ if p.bloomFilter == nil {
+ // no filter - full node, accepts all envelops
+ return true
+ }
+
+ return bloomFilterMatch(p.bloomFilter, env.Bloom())
+}