diff options
Diffstat (limited to 'whisper/whisperv6/peer.go')
-rw-r--r-- | whisper/whisperv6/peer.go | 54 |
1 files changed, 50 insertions, 4 deletions
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index 65e0c77b0..08071c0f7 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -36,6 +36,7 @@ type Peer struct { trusted bool powRequirement float64 + bloomFilter []byte // may contain nil in case of full node known *set.Set // Messages already known by the peer to avoid wasting bandwidth @@ -74,8 +75,12 @@ func (p *Peer) handshake() error { // Send the handshake status message asynchronously errc := make(chan error, 1) go func() { - errc <- p2p.Send(p.ws, statusCode, ProtocolVersion) + pow := p.host.MinPow() + powConverted := math.Float64bits(pow) + bloom := p.host.BloomFilter() + errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom) }() + // Fetch the remote status packet and verify protocol match packet, err := p.ws.ReadMsg() if err != nil { @@ -85,14 +90,42 @@ func (p *Peer) handshake() error { return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code) } s := rlp.NewStream(packet.Payload, uint64(packet.Size)) - peerVersion, err := s.Uint() + _, err = s.List() if err != nil { return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err) } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", p.ID(), err) + } if peerVersion != ProtocolVersion { return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion) } - // Wait until out own status is consumed too + + // only version is mandatory, subsequent parameters are optional + powRaw, err := s.Uint() + if err == nil { + pow := math.Float64frombits(powRaw) + if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { + return fmt.Errorf("peer [%x] sent bad status message: invalid pow", p.ID()) + } + p.powRequirement = pow + + var bloom []byte + err = s.Decode(&bloom) + if err == nil { + sz := len(bloom) + if sz != bloomFilterSize && sz != 0 { + return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz) + } + if isFullNode(bloom) { + p.bloomFilter = nil + } else { + p.bloomFilter = bloom + } + } + } + if err := <-errc; err != nil { return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err) } @@ -156,7 +189,7 @@ func (p *Peer) broadcast() error { envelopes := p.host.Envelopes() bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes { - if !p.marked(envelope) && envelope.PoW() >= p.powRequirement { + if !p.marked(envelope) && envelope.PoW() >= p.powRequirement && p.bloomMatch(envelope) { bundle = append(bundle, envelope) } } @@ -186,3 +219,16 @@ func (p *Peer) notifyAboutPowRequirementChange(pow float64) error { i := math.Float64bits(pow) return p2p.Send(p.ws, powRequirementCode, i) } + +func (p *Peer) notifyAboutBloomFilterChange(bloom []byte) error { + return p2p.Send(p.ws, bloomFilterExCode, bloom) +} + +func (p *Peer) bloomMatch(env *Envelope) bool { + if p.bloomFilter == nil { + // no filter - full node, accepts all envelops + return true + } + + return bloomFilterMatch(p.bloomFilter, env.Bloom()) +} |