aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/kademlia.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/kademlia.go')
-rw-r--r--swarm/network/kademlia.go297
1 files changed, 203 insertions, 94 deletions
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index 7d52f26f7..146f39106 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/pot"
+ sv "github.com/ethereum/go-ethereum/swarm/version"
)
/*
@@ -168,82 +169,115 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return nil
}
-// SuggestPeer returns a known peer for the lowest proximity bin for the
-// lowest bincount below depth
-// naturally if there is an empty row it returns a peer for that
-func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) {
+// SuggestPeer returns an unconnected peer address as a peer suggestion for connection
+func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
defer k.lock.Unlock()
- minsize := k.MinBinSize
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- // if there is a callable neighbour within the current proxBin, connect
- // this makes sure nearest neighbour set is fully connected
- var ppo int
- k.addrs.EachNeighbour(k.base, Pof, func(val pot.Val, po int) bool {
- if po < depth {
- return false
+ radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
+ // collect undersaturated bins in ascending order of number of connected peers
+ // and from shallow to deep (ascending order of PO)
+ // insert them in a map of bin arrays, keyed with the number of connected peers
+ saturation := make(map[int][]int)
+ var lastPO int // the last non-empty PO bin in the iteration
+ saturationDepth = -1 // the deepest PO such that all shallower bins have >= k.MinBinSize peers
+ var pastDepth bool // whether po of iteration >= depth
+ k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
+ // process skipped empty bins
+ for ; lastPO < po; lastPO++ {
+ // find the lowest unsaturated bin
+ if saturationDepth == -1 {
+ saturationDepth = lastPO
+ }
+ // if there is an empty bin, depth is surely passed
+ pastDepth = true
+ saturation[0] = append(saturation[0], lastPO)
}
- e := val.(*entry)
- c := k.callable(e)
- if c {
- a = e.BzzAddr
+ lastPO = po + 1
+ // past radius, depth is surely passed
+ if po >= radius {
+ pastDepth = true
}
- ppo = po
- return !c
- })
- if a != nil {
- log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo))
- return a, 0, false
- }
-
- var bpo []int
- prev := -1
- k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
- prev++
- for ; prev < po; prev++ {
- bpo = append(bpo, prev)
- minsize = 0
+ // beyond depth the bin is treated as unsaturated even if size >= k.MinBinSize
+ // in order to achieve full connectivity to all neighbours
+ if pastDepth && size >= k.MinBinSize {
+ size = k.MinBinSize - 1
}
- if size < minsize {
- bpo = append(bpo, po)
- minsize = size
+ // process non-empty unsaturated bins
+ if size < k.MinBinSize {
+ // find the lowest unsaturated bin
+ if saturationDepth == -1 {
+ saturationDepth = po
+ }
+ saturation[size] = append(saturation[size], po)
}
- return size > 0 && po < depth
+ return true
+ })
+ // to trigger peer requests for peers closer than closest connection, include
+ // all bins from nearest connection upto nearest address as unsaturated
+ var nearestAddrAt int
+ k.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool {
+ nearestAddrAt = po
+ return false
})
- // all buckets are full, ie., minsize == k.MinBinSize
- if len(bpo) == 0 {
+ // including bins as size 0 has the effect that requesting connection
+ // is prioritised over non-empty shallower bins
+ for ; lastPO <= nearestAddrAt; lastPO++ {
+ saturation[0] = append(saturation[0], lastPO)
+ }
+ // all PO bins are saturated, ie., minsize >= k.MinBinSize, no peer suggested
+ if len(saturation) == 0 {
return nil, 0, false
}
- // as long as we got candidate peers to connect to
- // dont ask for new peers (want = false)
- // try to select a candidate peer
- // find the first callable peer
- nxt := bpo[0]
- k.addrs.EachBin(k.base, Pof, nxt, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
- // for each bin (up until depth) we find callable candidate peers
- if po >= depth {
- return false
+ // find the first callable peer in the address book
+ // starting from the bins with smallest size proceeding from shallow to deep
+ // for each bin (up until neighbourhood radius) we find callable candidate peers
+ for size := 0; size < k.MinBinSize && suggestedPeer == nil; size++ {
+ bins, ok := saturation[size]
+ if !ok {
+ // no bin with this size
+ continue
}
- return f(func(val pot.Val) bool {
- e := val.(*entry)
- c := k.callable(e)
- if c {
- a = e.BzzAddr
+ cur := 0
+ curPO := bins[0]
+ k.addrs.EachBin(k.base, Pof, curPO, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
+ curPO = bins[cur]
+ // find the next bin that has size size
+ if curPO == po {
+ cur++
+ } else {
+ // skip bins that have no addresses
+ for ; cur < len(bins) && curPO < po; cur++ {
+ curPO = bins[cur]
+ }
+ if po < curPO {
+ cur--
+ return true
+ }
+ // stop if there are no addresses
+ if curPO < po {
+ return false
+ }
}
- return !c
+ // curPO found
+ // find a callable peer out of the addresses in the unsaturated bin
+ // stop if found
+ f(func(val pot.Val) bool {
+ e := val.(*entry)
+ if k.callable(e) {
+ suggestedPeer = e.BzzAddr
+ return false
+ }
+ return true
+ })
+ return cur < len(bins) && suggestedPeer == nil
})
- })
- // found a candidate
- if a != nil {
- return a, 0, false
}
- // no candidate peer found, request for the short bin
- var changed bool
- if uint8(nxt) < k.depth {
- k.depth = uint8(nxt)
- changed = true
+
+ if uint8(saturationDepth) < k.depth {
+ k.depth = uint8(saturationDepth)
+ return suggestedPeer, saturationDepth, true
}
- return a, nxt, changed
+ return suggestedPeer, 0, false
}
// On inserts the peer as a kademlia peer into the live peers
@@ -319,6 +353,9 @@ func (k *Kademlia) sendNeighbourhoodDepthChange() {
// Not receiving from the returned channel will block Register function
// when address count value changes.
func (k *Kademlia) AddrCountC() <-chan int {
+ k.lock.Lock()
+ defer k.lock.Unlock()
+
if k.addrCountC == nil {
k.addrCountC = make(chan int)
}
@@ -398,29 +435,25 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
})
}
+// NeighbourhoodDepth returns the depth for the pot, see depthForPot
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
k.lock.RLock()
defer k.lock.RUnlock()
return depthForPot(k.conns, k.NeighbourhoodSize, k.base)
}
-// depthForPot returns the proximity order that defines the distance of
-// the nearest neighbour set with cardinality >= NeighbourhoodSize
-// if there is altogether less than NeighbourhoodSize peers it returns 0
+// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia
+// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize
+// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether
+// contain at least neighbourhoodSize connected peers
+// if there is altogether less than neighbourhoodSize peers connected, it returns 0
// caller must hold the lock
-func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
+func neighbourhoodRadiusForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
if p.Size() <= neighbourhoodSize {
return 0
}
-
// total number of peers in iteration
var size int
-
- // determining the depth is a two-step process
- // first we find the proximity bin of the shallowest of the NeighbourhoodSize peers
- // the numeric value of depth cannot be higher than this
- var maxDepth int
-
f := func(v pot.Val, i int) bool {
// po == 256 means that addr is the pivot address(self)
if i == 256 {
@@ -431,13 +464,30 @@ func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int
// this means we have all nn-peers.
// depth is by default set to the bin of the farthest nn-peer
if size == neighbourhoodSize {
- maxDepth = i
+ depth = i
return false
}
return true
}
p.EachNeighbour(pivotAddr, Pof, f)
+ return depth
+}
+
+// depthForPot returns the depth for the pot
+// depth is the radius of the minimal extension of nearest neighbourhood that
+// includes all empty PO bins. I.e., depth is the deepest PO such that
+// - it is not deeper than neighbourhood radius
+// - all bins shallower than depth are not empty
+// caller must hold the lock
+func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
+ if p.Size() <= neighbourhoodSize {
+ return 0
+ }
+ // determining the depth is a two-step process
+ // first we find the proximity bin of the shallowest of the neighbourhoodSize peers
+ // the numeric value of depth cannot be higher than this
+ maxDepth := neighbourhoodRadiusForPot(p, neighbourhoodSize, pivotAddr)
// the second step is to test for empty bins in order from shallowest to deepest
// if an empty bin is found, this will be the actual depth
@@ -506,6 +556,9 @@ func (k *Kademlia) string() string {
var rows []string
rows = append(rows, "=========================================================================")
+ if len(sv.GitCommit) > 0 {
+ rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit))
+ }
rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3]))
rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize))
@@ -575,7 +628,8 @@ func (k *Kademlia) string() string {
// used for testing only
// TODO move to separate testing tools file
type PeerPot struct {
- NNSet [][]byte
+ NNSet [][]byte
+ PeersPerBin []int
}
// NewPeerPotMap creates a map of pot record of *BzzAddr with keys
@@ -601,6 +655,7 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
// all nn-peers
var nns [][]byte
+ peersPerBin := make([]int, depth)
// iterate through the neighbours, going from the deepest to the shallowest
np.EachNeighbour(a, Pof, func(val pot.Val, po int) bool {
@@ -614,38 +669,74 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
// a neighbor is any peer in or deeper than the depth
if po >= depth {
nns = append(nns, addr)
- return true
+ } else {
+ // for peers < depth, we just count the number in each bin
+ // the bin is the index of the slice
+ peersPerBin[po]++
}
- return false
+ return true
})
- log.Trace(fmt.Sprintf("%x PeerPotMap NNS: %s", addrs[i][:4], LogAddrs(nns)))
+ log.Trace(fmt.Sprintf("%x PeerPotMap NNS: %s, peersPerBin", addrs[i][:4], LogAddrs(nns)))
ppmap[common.Bytes2Hex(a)] = &PeerPot{
- NNSet: nns,
+ NNSet: nns,
+ PeersPerBin: peersPerBin,
}
}
return ppmap
}
-// saturation iterates through all peers and
-// returns the smallest po value in which the node has less than n peers
-// if the iterator reaches depth, then value for depth is returned
-// TODO move to separate testing tools file
-// TODO this function will stop at the first bin with less than MinBinSize peers, even if there are empty bins between that bin and the depth. This may not be correct behavior
+// saturation returns the smallest po value in which the node has less than MinBinSize peers
+// if the iterator reaches neighbourhood radius, then the last bin + 1 is returned
func (k *Kademlia) saturation() int {
prev := -1
- k.addrs.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
+ radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
+ k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
prev++
+ if po >= radius {
+ return false
+ }
return prev == po && size >= k.MinBinSize
})
- // TODO evaluate whether this check cannot just as well be done within the eachbin
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- if depth < prev {
- return depth
+ if prev < 0 {
+ return 0
}
return prev
}
+// isSaturated returns true if the kademlia is considered saturated, or false if not.
+// It checks this by checking an array of ints called unsaturatedBins; each item in that array corresponds
+// to the bin which is unsaturated (number of connections < k.MinBinSize).
+// The bin is considered unsaturated only if there are actual peers in that PeerPot's bin (peersPerBin)
+// (if there is no peer for a given bin, then no connection could ever be established;
+// in a God's view this is relevant as no more peers will ever appear on that bin)
+func (k *Kademlia) isSaturated(peersPerBin []int, depth int) bool {
+ // depth could be calculated from k but as this is called from `GetHealthInfo()`,
+ // the depth has already been calculated so we can require it as a parameter
+
+ // early check for depth
+ if depth != len(peersPerBin) {
+ return false
+ }
+ unsaturatedBins := make([]int, 0)
+ k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
+
+ if po >= depth {
+ return false
+ }
+ log.Trace("peers per bin", "peersPerBin[po]", peersPerBin[po], "po", po)
+ // if there are actually peers in the PeerPot who can fulfill k.MinBinSize
+ if size < k.MinBinSize && size < peersPerBin[po] {
+ log.Trace("connections for po", "po", po, "size", size)
+ unsaturatedBins = append(unsaturatedBins, po)
+ }
+ return true
+ })
+
+ log.Trace("list of unsaturated bins", "unsaturatedBins", unsaturatedBins)
+ return len(unsaturatedBins) == 0
+}
+
// knowNeighbours tests if all neighbours in the peerpot
// are found among the peers known to the kademlia
// It is used in Healthy function for testing only
@@ -728,11 +819,13 @@ type Health struct {
ConnectNN bool // whether node is connected to all its neighbours
CountConnectNN int // amount of neighbours connected to
MissingConnectNN [][]byte // which neighbours we should have been connected to but we're not
- Saturated bool // whether we are connected to all the peers we would have liked to
- Hive string
+ // Saturated: if in all bins < depth number of connections >= MinBinsize or,
+ // if number of connections < MinBinSize, to the number of available peers in that bin
+ Saturated bool
+ Hive string
}
-// Healthy reports the health state of the kademlia connectivity
+// GetHealthInfo reports the health state of the kademlia connectivity
//
// The PeerPot argument provides an all-knowing view of the network
// The resulting Health object is a result of comparisons between
@@ -740,13 +833,19 @@ type Health struct {
// what SHOULD it have been when we take all we know about the network into consideration.
//
// used for testing only
-func (k *Kademlia) Healthy(pp *PeerPot) *Health {
+func (k *Kademlia) GetHealthInfo(pp *PeerPot) *Health {
k.lock.RLock()
defer k.lock.RUnlock()
+ if len(pp.NNSet) < k.NeighbourhoodSize {
+ log.Warn("peerpot NNSet < NeighbourhoodSize")
+ }
gotnn, countgotnn, culpritsgotnn := k.connectedNeighbours(pp.NNSet)
knownn, countknownn, culpritsknownn := k.knowNeighbours(pp.NNSet)
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- saturated := k.saturation() < depth
+
+ // check saturation
+ saturated := k.isSaturated(pp.PeersPerBin, depth)
+
log.Trace(fmt.Sprintf("%08x: healthy: knowNNs: %v, gotNNs: %v, saturated: %v\n", k.base, knownn, gotnn, saturated))
return &Health{
KnowNN: knownn,
@@ -759,3 +858,13 @@ func (k *Kademlia) Healthy(pp *PeerPot) *Health {
Hive: k.string(),
}
}
+
+// Healthy return the strict interpretation of `Healthy` given a `Health` struct
+// definition of strict health: all conditions must be true:
+// - we at least know one peer
+// - we know all neighbors
+// - we are connected to all known neighbors
+// - it is saturated
+func (h *Health) Healthy() bool {
+ return h.KnowNN && h.ConnectNN && h.CountKnowNN > 0 && h.Saturated
+}