aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/kademlia.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/kademlia.go')
-rw-r--r--swarm/network/kademlia.go157
1 files changed, 71 insertions, 86 deletions
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index dd6de44fd..90491ab31 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/pot"
sv "github.com/ethereum/go-ethereum/swarm/version"
@@ -82,14 +83,14 @@ func NewKadParams() *KadParams {
// Kademlia is a table of live peers and a db of known peers (node records)
type Kademlia struct {
lock sync.RWMutex
- *KadParams // Kademlia configuration parameters
- base []byte // immutable baseaddress of the table
- addrs *pot.Pot // pots container for known peer addresses
- conns *pot.Pot // pots container for live peer connections
- depth uint8 // stores the last current depth of saturation
- nDepth int // stores the last neighbourhood depth
- nDepthC chan int // returned by DepthC function to signal neighbourhood depth change
- addrCountC chan int // returned by AddrCountC function to signal peer count change
+ *KadParams // Kademlia configuration parameters
+ base []byte // immutable baseaddress of the table
+ addrs *pot.Pot // pots container for known peer addresses
+ conns *pot.Pot // pots container for live peer connections
+ depth uint8 // stores the last current depth of saturation
+ nDepth int // stores the last neighbourhood depth
+ nDepthMu sync.RWMutex // protects neighbourhood depth nDepth
+ nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed
}
// NewKademlia creates a Kademlia table for base address addr
@@ -138,6 +139,9 @@ func (e *entry) Hex() string {
func (k *Kademlia) Register(peers ...*BzzAddr) error {
k.lock.Lock()
defer k.lock.Unlock()
+
+ metrics.GetOrRegisterCounter("kad.register", nil).Inc(1)
+
var known, size int
for _, p := range peers {
log.Trace("kademlia trying to register", "addr", p)
@@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return newEntry(p)
}
- log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr)
-
return v
})
if found {
@@ -173,12 +175,8 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
}
size++
}
- // send new address count value only if there are new addresses
- if k.addrCountC != nil && size-known > 0 {
- k.addrCountC <- k.addrs.Size()
- }
- k.sendNeighbourhoodDepthChange()
+ k.setNeighbourhoodDepth()
return nil
}
@@ -186,6 +184,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
defer k.lock.Unlock()
+
+ metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1)
+
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
// collect undersaturated bins in ascending order of number of connected peers
// and from shallow to deep (ascending order of PO)
@@ -297,6 +298,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.lock.Lock()
defer k.lock.Unlock()
+
+ metrics.GetOrRegisterCounter("kad.on", nil).Inc(1)
+
var ins bool
k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val {
// if not found live
@@ -315,12 +319,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
return a
})
- // send new address count value only if the peer is inserted
- if k.addrCountC != nil {
- k.addrCountC <- k.addrs.Size()
- }
}
- log.Trace(k.string())
// calculate if depth of saturation changed
depth := uint8(k.saturation())
var changed bool
@@ -328,75 +327,72 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
changed = true
k.depth = depth
}
- k.sendNeighbourhoodDepthChange()
+ k.setNeighbourhoodDepth()
return k.depth, changed
}
-// NeighbourhoodDepthC returns the channel that sends a new kademlia
-// neighbourhood depth on each change.
-// Not receiving from the returned channel will block On function
-// when the neighbourhood depth is changed.
-// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one?
-func (k *Kademlia) NeighbourhoodDepthC() <-chan int {
- k.lock.Lock()
- defer k.lock.Unlock()
- if k.nDepthC == nil {
- k.nDepthC = make(chan int)
+// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot,
+// sets it to the nDepth and sends a signal to every nDepthSig channel.
+func (k *Kademlia) setNeighbourhoodDepth() {
+ nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
+ var changed bool
+ k.nDepthMu.Lock()
+ if nDepth != k.nDepth {
+ k.nDepth = nDepth
+ changed = true
}
- return k.nDepthC
-}
+ k.nDepthMu.Unlock()
-// CloseNeighbourhoodDepthC closes the channel returned by
-// NeighbourhoodDepthC and stops sending neighbourhood change.
-func (k *Kademlia) CloseNeighbourhoodDepthC() {
- k.lock.Lock()
- defer k.lock.Unlock()
-
- if k.nDepthC != nil {
- close(k.nDepthC)
- k.nDepthC = nil
+ if len(k.nDepthSig) > 0 && changed {
+ for _, c := range k.nDepthSig {
+ // Every nDepthSig channel has a buffer capacity of 1,
+ // so every receiver will get the signal even if the
+ // select statement has the default case to avoid blocking.
+ select {
+ case c <- struct{}{}:
+ default:
+ }
+ }
}
}
-// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel
-// if it is initialized.
-func (k *Kademlia) sendNeighbourhoodDepthChange() {
- // nDepthC is initialized when NeighbourhoodDepthC is called and returned by it.
- // It provides signaling of neighbourhood depth change.
- // This part of the code is sending new neighbourhood depth to nDepthC if that condition is met.
- if k.nDepthC != nil {
- nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- if nDepth != k.nDepth {
- k.nDepth = nDepth
- k.nDepthC <- nDepth
- }
- }
+// NeighbourhoodDepth returns the value calculated by depthForPot function
+// in setNeighbourhoodDepth method.
+func (k *Kademlia) NeighbourhoodDepth() int {
+ k.nDepthMu.RLock()
+ defer k.nDepthMu.RUnlock()
+ return k.nDepth
}
-// AddrCountC returns the channel that sends a new
-// address count value on each change.
-// Not receiving from the returned channel will block Register function
-// when address count value changes.
-func (k *Kademlia) AddrCountC() <-chan int {
+// SubscribeToNeighbourhoodDepthChange returns the channel that signals
+// when neighbourhood depth value is changed. The current neighbourhood depth
+// is returned by NeighbourhoodDepth method. Returned function unsubscribes
+// the channel from signaling and releases the resources. Returned function is safe
+// to be called multiple times.
+func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, unsubscribe func()) {
+ channel := make(chan struct{}, 1)
+ var closeOnce sync.Once
+
k.lock.Lock()
defer k.lock.Unlock()
- if k.addrCountC == nil {
- k.addrCountC = make(chan int)
- }
- return k.addrCountC
-}
+ k.nDepthSig = append(k.nDepthSig, channel)
-// CloseAddrCountC closes the channel returned by
-// AddrCountC and stops sending address count change.
-func (k *Kademlia) CloseAddrCountC() {
- k.lock.Lock()
- defer k.lock.Unlock()
+ unsubscribe = func() {
+ k.lock.Lock()
+ defer k.lock.Unlock()
- if k.addrCountC != nil {
- close(k.addrCountC)
- k.addrCountC = nil
+ for i, c := range k.nDepthSig {
+ if c == channel {
+ k.nDepthSig = append(k.nDepthSig[:i], k.nDepthSig[i+1:]...)
+ break
+ }
+ }
+
+ closeOnce.Do(func() { close(channel) })
}
+
+ return channel, unsubscribe
}
// Off removes a peer from among live peers
@@ -422,11 +418,7 @@ func (k *Kademlia) Off(p *Peer) {
// v cannot be nil, but no need to check
return nil
})
- // send new address count value only if the peer is deleted
- if k.addrCountC != nil {
- k.addrCountC <- k.addrs.Size()
- }
- k.sendNeighbourhoodDepthChange()
+ k.setNeighbourhoodDepth()
}
}
@@ -484,13 +476,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
})
}
-// NeighbourhoodDepth returns the depth for the pot, see depthForPot
-func (k *Kademlia) NeighbourhoodDepth() (depth int) {
- k.lock.RLock()
- defer k.lock.RUnlock()
- return depthForPot(k.conns, k.NeighbourhoodSize, k.base)
-}
-
// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia
// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize
// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether
@@ -608,7 +593,7 @@ func (k *Kademlia) string() string {
if len(sv.GitCommit) > 0 {
rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit))
}
- rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3]))
+ rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()))
rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize))
liverows := make([]string, k.MaxProxDisplay)