aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/pss/pss.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/pss/pss.go')
-rw-r--r--swarm/pss/pss.go131
1 files changed, 80 insertions, 51 deletions
diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go
index 1bc28890f..d15401e51 100644
--- a/swarm/pss/pss.go
+++ b/swarm/pss/pss.go
@@ -891,68 +891,97 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
return nil
}
-// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
-// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
-// of the peer address of the equivalent length.
+// sendFunc is a helper function that tries to send a message and returns true on success.
+// It is set here for usage in production, and optionally overridden in tests.
+var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg
+
+// tries to send a message, returns true if successful
+func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
+ var isPssEnabled bool
+ info := sp.Info()
+ for _, capability := range info.Caps {
+ if capability == p.capstring {
+ isPssEnabled = true
+ break
+ }
+ }
+ if !isPssEnabled {
+ log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
+ return false
+ }
+
+ // get the protocol peer from the forwarding peer cache
+ p.fwdPoolMu.RLock()
+ pp := p.fwdPool[sp.Info().ID]
+ p.fwdPoolMu.RUnlock()
+
+ err := pp.Send(context.TODO(), msg)
+ if err != nil {
+ metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
+ log.Error(err.Error())
+ }
+
+ return err == nil
+}
+
+// Forwards a pss message to the peer(s) based on recipient address according to the algorithm
+// described below. The recipient address can be of any length, and the byte slice will be matched
+// to the MSB slice of the peer address of the equivalent length.
+//
+// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
+// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
+// partial address, it should be forwarded to all the peers matching the partial address, if there
+// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
+// forwarding fails, the node should try to forward it to the next best peer, until the message is
+// successfully forwarded to at least one peer.
func (p *Pss) forward(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
-
+ sent := 0 // number of successful sends
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
+ neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
- // send with kademlia
- // find the closest peer to the recipient and attempt to send
- sent := 0
- p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
- info := sp.Info()
-
- // check if the peer is running pss
- var ispss bool
- for _, cap := range info.Caps {
- if cap == p.capstring {
- ispss = true
- break
- }
- }
- if !ispss {
- log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
- return true
- }
+ // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
+ // but the luminosity is less. here luminosity equals the number of bits given in the destination address.
+ luminosityRadius := len(msg.To) * 8
- // get the protocol peer from the forwarding peer cache
- sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
- p.fwdPoolMu.RLock()
- pp := p.fwdPool[sp.Info().ID]
- p.fwdPoolMu.RUnlock()
+ // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
+ pof := pot.DefaultPof(neighbourhoodDepth)
- // attempt to send the message
- err := pp.Send(context.TODO(), msg)
- if err != nil {
- metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
- log.Error(err.Error())
- return true
+ // soft threshold for msg broadcast
+ broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
+ if broadcastThreshold > luminosityRadius {
+ broadcastThreshold = luminosityRadius
+ }
+
+ var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
+
+ // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
+ // call below), then peers that fall in the same proximity bin as recipient address will appear
+ // [at least] one bit closer, but only if these additional bits are given in the recipient address.
+ if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
+ broadcastThreshold++
+ onlySendOnce = true
+ }
+
+ p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
+ if po < broadcastThreshold && sent > 0 {
+ return false // stop iterating
}
- sent++
- log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))
-
- // continue forwarding if:
- // - if the peer is end recipient but the full address has not been disclosed
- // - if the peer address matches the partial address fully
- // - if the peer is in proxbin
- if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
- log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
- return true
- } else if isproxbin {
- log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
- return true
+ if sendFunc(p, sp, msg) {
+ sent++
+ if onlySendOnce {
+ return false
+ }
+ if po == addressLength*8 {
+ // stop iterating if successfully sent to the exact recipient (perfect match of full address)
+ return false
+ }
}
- // at this point we stop forwarding, and the state is as follows:
- // - the peer is end recipient and we have full address
- // - we are not in proxbin (directed routing)
- // - partial addresses don't fully match
- return false
+ return true
})
+ // if we failed to send to anyone, re-insert message in the send-queue
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {