diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-05-10 19:09:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-10 19:09:01 +0800 |
commit | 494f5d448a1685d5de4cb1524b863cd1fc9a13b0 (patch) | |
tree | 4db9d1afe4910c888f3488cd93e8537501d88314 /swarm/network/kademlia.go | |
parent | c94d582aa781b26412ba7d570f6707d193303a02 (diff) | |
parent | 9b1543c282f39d452f611eeee0307bdf828e8bc2 (diff) | |
download | go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.gz go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.bz2 go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.lz go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.xz go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.zst go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.zip |
Merge pull request #19550 from ethersphere/swarm-rather-stable
swarm v0.4-rc1
Diffstat (limited to 'swarm/network/kademlia.go')
-rw-r--r-- | swarm/network/kademlia.go | 157 |
1 files changed, 71 insertions, 86 deletions
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index dd6de44fd..90491ab31 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/pot" sv "github.com/ethereum/go-ethereum/swarm/version" @@ -82,14 +83,14 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthC chan int // returned by DepthC function to signal neighbourhood depth change - addrCountC chan int // returned by AddrCountC function to signal peer count change + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthMu sync.RWMutex // protects neighbourhood depth nDepth + nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed } // NewKademlia creates a Kademlia table for base address addr @@ -138,6 +139,9 @@ func (e *entry) Hex() string { func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.register", nil).Inc(1) + var known, size int for _, p := range peers { log.Trace("kademlia trying to register", "addr", p) @@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { return newEntry(p) } - log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr) - return v }) if found { @@ -173,12 +175,8 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { } size++ } - // send new address count value only if there are new addresses - if k.addrCountC != nil && size-known > 0 { - k.addrCountC <- k.addrs.Size() - } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return nil } @@ -186,6 +184,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1) + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) // collect undersaturated bins in ascending order of number of connected peers // and from shallow to deep (ascending order of PO) @@ -297,6 +298,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.on", nil).Inc(1) + var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val { // if not found live @@ -315,12 +319,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val { return a }) - // send new address count value only if the peer is inserted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } } - log.Trace(k.string()) // calculate if depth of saturation changed depth := uint8(k.saturation()) var changed bool @@ -328,75 +327,72 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { changed = true k.depth = depth } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return k.depth, changed } -// NeighbourhoodDepthC returns the channel that sends a new kademlia -// neighbourhood depth on each change. -// Not receiving from the returned channel will block On function -// when the neighbourhood depth is changed. -// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one? -func (k *Kademlia) NeighbourhoodDepthC() <-chan int { - k.lock.Lock() - defer k.lock.Unlock() - if k.nDepthC == nil { - k.nDepthC = make(chan int) +// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot, +// sets it to the nDepth and sends a signal to every nDepthSig channel. +func (k *Kademlia) setNeighbourhoodDepth() { + nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) + var changed bool + k.nDepthMu.Lock() + if nDepth != k.nDepth { + k.nDepth = nDepth + changed = true } - return k.nDepthC -} + k.nDepthMu.Unlock() -// CloseNeighbourhoodDepthC closes the channel returned by -// NeighbourhoodDepthC and stops sending neighbourhood change. -func (k *Kademlia) CloseNeighbourhoodDepthC() { - k.lock.Lock() - defer k.lock.Unlock() - - if k.nDepthC != nil { - close(k.nDepthC) - k.nDepthC = nil + if len(k.nDepthSig) > 0 && changed { + for _, c := range k.nDepthSig { + // Every nDepthSig channel has a buffer capacity of 1, + // so every receiver will get the signal even if the + // select statement has the default case to avoid blocking. + select { + case c <- struct{}{}: + default: + } + } } } -// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel -// if it is initialized. -func (k *Kademlia) sendNeighbourhoodDepthChange() { - // nDepthC is initialized when NeighbourhoodDepthC is called and returned by it. - // It provides signaling of neighbourhood depth change. - // This part of the code is sending new neighbourhood depth to nDepthC if that condition is met. - if k.nDepthC != nil { - nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) - if nDepth != k.nDepth { - k.nDepth = nDepth - k.nDepthC <- nDepth - } - } +// NeighbourhoodDepth returns the value calculated by depthForPot function +// in setNeighbourhoodDepth method. +func (k *Kademlia) NeighbourhoodDepth() int { + k.nDepthMu.RLock() + defer k.nDepthMu.RUnlock() + return k.nDepth } -// AddrCountC returns the channel that sends a new -// address count value on each change. -// Not receiving from the returned channel will block Register function -// when address count value changes. -func (k *Kademlia) AddrCountC() <-chan int { +// SubscribeToNeighbourhoodDepthChange returns the channel that signals +// when neighbourhood depth value is changed. The current neighbourhood depth +// is returned by NeighbourhoodDepth method. Returned function unsubscribes +// the channel from signaling and releases the resources. Returned function is safe +// to be called multiple times. +func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, unsubscribe func()) { + channel := make(chan struct{}, 1) + var closeOnce sync.Once + k.lock.Lock() defer k.lock.Unlock() - if k.addrCountC == nil { - k.addrCountC = make(chan int) - } - return k.addrCountC -} + k.nDepthSig = append(k.nDepthSig, channel) -// CloseAddrCountC closes the channel returned by -// AddrCountC and stops sending address count change. -func (k *Kademlia) CloseAddrCountC() { - k.lock.Lock() - defer k.lock.Unlock() + unsubscribe = func() { + k.lock.Lock() + defer k.lock.Unlock() - if k.addrCountC != nil { - close(k.addrCountC) - k.addrCountC = nil + for i, c := range k.nDepthSig { + if c == channel { + k.nDepthSig = append(k.nDepthSig[:i], k.nDepthSig[i+1:]...) + break + } + } + + closeOnce.Do(func() { close(channel) }) } + + return channel, unsubscribe } // Off removes a peer from among live peers @@ -422,11 +418,7 @@ func (k *Kademlia) Off(p *Peer) { // v cannot be nil, but no need to check return nil }) - // send new address count value only if the peer is deleted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() } } @@ -484,13 +476,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { }) } -// NeighbourhoodDepth returns the depth for the pot, see depthForPot -func (k *Kademlia) NeighbourhoodDepth() (depth int) { - k.lock.RLock() - defer k.lock.RUnlock() - return depthForPot(k.conns, k.NeighbourhoodSize, k.base) -} - // neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia // neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize // i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether @@ -608,7 +593,7 @@ func (k *Kademlia) string() string { if len(sv.GitCommit) > 0 { rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit)) } - rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3])) + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr())) rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize)) liverows := make([]string, k.MaxProxDisplay) |