aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/kademlia
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/kademlia')
-rw-r--r--swarm/network/kademlia/address.go173
-rw-r--r--swarm/network/kademlia/address_test.go96
-rw-r--r--swarm/network/kademlia/kaddb.go350
-rw-r--r--swarm/network/kademlia/kademlia.go454
-rw-r--r--swarm/network/kademlia/kademlia_test.go392
5 files changed, 0 insertions, 1465 deletions
diff --git a/swarm/network/kademlia/address.go b/swarm/network/kademlia/address.go
deleted file mode 100644
index ef82d2e8b..000000000
--- a/swarm/network/kademlia/address.go
+++ /dev/null
@@ -1,173 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package kademlia
-
-import (
- "fmt"
- "math/rand"
- "strings"
-
- "github.com/ethereum/go-ethereum/common"
-)
-
-type Address common.Hash
-
-func (a Address) String() string {
- return fmt.Sprintf("%x", a[:])
-}
-
-func (a *Address) MarshalJSON() (out []byte, err error) {
- return []byte(`"` + a.String() + `"`), nil
-}
-
-func (a *Address) UnmarshalJSON(value []byte) error {
- *a = Address(common.HexToHash(string(value[1 : len(value)-1])))
- return nil
-}
-
-// the string form of the binary representation of an address (only first 8 bits)
-func (a Address) Bin() string {
- var bs []string
- for _, b := range a[:] {
- bs = append(bs, fmt.Sprintf("%08b", b))
- }
- return strings.Join(bs, "")
-}
-
-/*
-Proximity(x, y) returns the proximity order of the MSB distance between x and y
-
-The distance metric MSB(x, y) of two equal length byte sequences x and y is the
-value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed.
-the binary cast is big endian: most significant bit first (=MSB).
-
-Proximity(x, y) is a discrete logarithmic scaling of the MSB distance.
-It is defined as the reverse rank of the integer part of the base 2
-logarithm of the distance.
-It is calculated by counting the number of common leading zeros in the (MSB)
-binary representation of the x^y.
-
-(0 farthest, 255 closest, 256 self)
-*/
-func proximity(one, other Address) (ret int) {
- for i := 0; i < len(one); i++ {
- oxo := one[i] ^ other[i]
- for j := 0; j < 8; j++ {
- if (oxo>>uint8(7-j))&0x01 != 0 {
- return i*8 + j
- }
- }
- }
- return len(one) * 8
-}
-
-// Address.ProxCmp compares the distances a->target and b->target.
-// Returns -1 if a is closer to target, 1 if b is closer to target
-// and 0 if they are equal.
-func (target Address) ProxCmp(a, b Address) int {
- for i := range target {
- da := a[i] ^ target[i]
- db := b[i] ^ target[i]
- if da > db {
- return 1
- } else if da < db {
- return -1
- }
- }
- return 0
-}
-
-// randomAddressAt(address, prox) generates a random address
-// at proximity order prox relative to address
-// if prox is negative a random address is generated
-func RandomAddressAt(self Address, prox int) (addr Address) {
- addr = self
- var pos int
- if prox >= 0 {
- pos = prox / 8
- trans := prox % 8
- transbytea := byte(0)
- for j := 0; j <= trans; j++ {
- transbytea |= 1 << uint8(7-j)
- }
- flipbyte := byte(1 << uint8(7-trans))
- transbyteb := transbytea ^ byte(255)
- randbyte := byte(rand.Intn(255))
- addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb
- }
- for i := pos + 1; i < len(addr); i++ {
- addr[i] = byte(rand.Intn(255))
- }
-
- return
-}
-
-// KeyRange(a0, a1, proxLimit) returns the address inclusive address
-// range that contain addresses closer to one than other
-func KeyRange(one, other Address, proxLimit int) (start, stop Address) {
- prox := proximity(one, other)
- if prox >= proxLimit {
- prox = proxLimit
- }
- start = CommonBitsAddrByte(one, other, byte(0x00), prox)
- stop = CommonBitsAddrByte(one, other, byte(0xff), prox)
- return
-}
-
-func CommonBitsAddrF(self, other Address, f func() byte, p int) (addr Address) {
- prox := proximity(self, other)
- var pos int
- if p <= prox {
- prox = p
- }
- pos = prox / 8
- addr = self
- trans := byte(prox % 8)
- var transbytea byte
- if p > prox {
- transbytea = byte(0x7f)
- } else {
- transbytea = byte(0xff)
- }
- transbytea >>= trans
- transbyteb := transbytea ^ byte(0xff)
- addrpos := addr[pos]
- addrpos &= transbyteb
- if p > prox {
- addrpos ^= byte(0x80 >> trans)
- }
- addrpos |= transbytea & f()
- addr[pos] = addrpos
- for i := pos + 1; i < len(addr); i++ {
- addr[i] = f()
- }
-
- return
-}
-
-func CommonBitsAddr(self, other Address, prox int) (addr Address) {
- return CommonBitsAddrF(self, other, func() byte { return byte(rand.Intn(255)) }, prox)
-}
-
-func CommonBitsAddrByte(self, other Address, b byte, prox int) (addr Address) {
- return CommonBitsAddrF(self, other, func() byte { return b }, prox)
-}
-
-// randomAddressAt() generates a random address
-func RandomAddress() Address {
- return RandomAddressAt(Address{}, -1)
-}
diff --git a/swarm/network/kademlia/address_test.go b/swarm/network/kademlia/address_test.go
deleted file mode 100644
index c062c8eaf..000000000
--- a/swarm/network/kademlia/address_test.go
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package kademlia
-
-import (
- "math/rand"
- "reflect"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
-)
-
-func (Address) Generate(rand *rand.Rand, size int) reflect.Value {
- var id Address
- for i := 0; i < len(id); i++ {
- id[i] = byte(uint8(rand.Intn(255)))
- }
- return reflect.ValueOf(id)
-}
-
-func TestCommonBitsAddrF(t *testing.T) {
- a := Address(common.HexToHash("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
- b := Address(common.HexToHash("0x8123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
- c := Address(common.HexToHash("0x4123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
- d := Address(common.HexToHash("0x0023456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
- e := Address(common.HexToHash("0x01A3456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
- ab := CommonBitsAddrF(a, b, func() byte { return byte(0x00) }, 10)
- expab := Address(common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000"))
-
- if ab != expab {
- t.Fatalf("%v != %v", ab, expab)
- }
- ac := CommonBitsAddrF(a, c, func() byte { return byte(0x00) }, 10)
- expac := Address(common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000"))
-
- if ac != expac {
- t.Fatalf("%v != %v", ac, expac)
- }
- ad := CommonBitsAddrF(a, d, func() byte { return byte(0x00) }, 10)
- expad := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
-
- if ad != expad {
- t.Fatalf("%v != %v", ad, expad)
- }
- ae := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 10)
- expae := Address(common.HexToHash("0x0180000000000000000000000000000000000000000000000000000000000000"))
-
- if ae != expae {
- t.Fatalf("%v != %v", ae, expae)
- }
- acf := CommonBitsAddrF(a, c, func() byte { return byte(0xff) }, 10)
- expacf := Address(common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
-
- if acf != expacf {
- t.Fatalf("%v != %v", acf, expacf)
- }
- aeo := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 2)
- expaeo := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
-
- if aeo != expaeo {
- t.Fatalf("%v != %v", aeo, expaeo)
- }
- aep := CommonBitsAddrF(a, e, func() byte { return byte(0xff) }, 2)
- expaep := Address(common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
-
- if aep != expaep {
- t.Fatalf("%v != %v", aep, expaep)
- }
-
-}
-
-func TestRandomAddressAt(t *testing.T) {
- var a Address
- for i := 0; i < 100; i++ {
- a = RandomAddress()
- prox := rand.Intn(255)
- b := RandomAddressAt(a, prox)
- if proximity(a, b) != prox {
- t.Fatalf("incorrect address prox(%v, %v) == %v (expected %v)", a, b, proximity(a, b), prox)
- }
- }
-}
diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go
deleted file mode 100644
index b37ced5ba..000000000
--- a/swarm/network/kademlia/kaddb.go
+++ /dev/null
@@ -1,350 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package kademlia
-
-import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/log"
-)
-
-type NodeData interface {
- json.Marshaler
- json.Unmarshaler
-}
-
-// allow inactive peers under
-type NodeRecord struct {
- Addr Address // address of node
- Url string // Url, used to connect to node
- After time.Time // next call after time
- Seen time.Time // last connected at time
- Meta *json.RawMessage // arbitrary metadata saved for a peer
-
- node Node
-}
-
-func (self *NodeRecord) setSeen() {
- t := time.Now()
- self.Seen = t
- self.After = t
-}
-
-func (self *NodeRecord) String() string {
- return fmt.Sprintf("<%v>", self.Addr)
-}
-
-// persisted node record database ()
-type KadDb struct {
- Address Address
- Nodes [][]*NodeRecord
- index map[Address]*NodeRecord
- cursors []int
- lock sync.RWMutex
- purgeInterval time.Duration
- initialRetryInterval time.Duration
- connRetryExp int
-}
-
-func newKadDb(addr Address, params *KadParams) *KadDb {
- return &KadDb{
- Address: addr,
- Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load
- cursors: make([]int, params.MaxProx+1),
- index: make(map[Address]*NodeRecord),
- purgeInterval: params.PurgeInterval,
- initialRetryInterval: params.InitialRetryInterval,
- connRetryExp: params.ConnRetryExp,
- }
-}
-
-func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
- defer self.lock.Unlock()
- self.lock.Lock()
-
- record, found := self.index[a]
- if !found {
- record = &NodeRecord{
- Addr: a,
- Url: url,
- }
- log.Info(fmt.Sprintf("add new record %v to kaddb", record))
- // insert in kaddb
- self.index[a] = record
- self.Nodes[index] = append(self.Nodes[index], record)
- } else {
- log.Info(fmt.Sprintf("found record %v in kaddb", record))
- }
- // update last seen time
- record.setSeen()
- // update with url in case IP/port changes
- record.Url = url
- return record
-}
-
-// add adds node records to kaddb (persisted node record db)
-func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
- defer self.lock.Unlock()
- self.lock.Lock()
- var n int
- var nodes []*NodeRecord
- for _, node := range nrs {
- _, found := self.index[node.Addr]
- if !found && node.Addr != self.Address {
- node.setSeen()
- self.index[node.Addr] = node
- index := proximityBin(node.Addr)
- dbcursor := self.cursors[index]
- nodes = self.Nodes[index]
- // this is inefficient for allocation, need to just append then shift
- newnodes := make([]*NodeRecord, len(nodes)+1)
- copy(newnodes[:], nodes[:dbcursor])
- newnodes[dbcursor] = node
- copy(newnodes[dbcursor+1:], nodes[dbcursor:])
- log.Trace(fmt.Sprintf("new nodes: %v, nodes: %v", newnodes, nodes))
- self.Nodes[index] = newnodes
- n++
- }
- }
- if n > 0 {
- log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs)))
- }
-}
-
-/*
-next return one node record with the highest priority for desired
-connection.
-This is used to pick candidates for live nodes that are most wanted for
-a higly connected low centrality network structure for Swarm which best suits
-for a Kademlia-style routing.
-
-* Starting as naive node with empty db, this implements Kademlia bootstrapping
-* As a mature node, it fills short lines. All on demand.
-
-The candidate is chosen using the following strategy:
-We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds.
-On each round we proceed from the low to high proximity order buckets.
-If the number of active nodes (=connected peers) is < rounds, then start looking
-for a known candidate. To determine if there is a candidate to recommend the
-kaddb node record database row corresponding to the bucket is checked.
-
-If the row cursor is on position i, the ith element in the row is chosen.
-If the record is scheduled not to be retried before NOW, the next element is taken.
-If the record is scheduled to be retried, it is set as checked, scheduled for
-checking and is returned. The time of the next check is in X (duration) such that
-X = ConnRetryExp * delta where delta is the time past since the last check and
-ConnRetryExp is constant obsoletion factor. (Note that when node records are added
-from peer messages, they are marked as checked and placed at the cursor, ie.
-given priority over older entries). Entries which were checked more than
-purgeInterval ago are deleted from the kaddb row. If no candidate is found after
-a full round of checking the next bucket up is considered. If no candidate is
-found when we reach the maximum-proximity bucket, the next round starts.
-
-node record a is more favoured to b a > b iff a is a passive node (record of
-offline past peer)
-|proxBin(a)| < |proxBin(b)|
-|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|)
-|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b))
-
-
-The second argument returned names the first missing slot found
-*/
-func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) {
- // return nil, proxLimit indicates that all buckets are filled
- defer self.lock.Unlock()
- self.lock.Lock()
-
- var interval time.Duration
- var found bool
- var purge []bool
- var delta time.Duration
- var cursor int
- var count int
- var after time.Time
-
- // iterate over columns maximum bucketsize times
- for rounds := 1; rounds <= maxBinSize; rounds++ {
- ROUND:
- // iterate over rows from PO 0 upto MaxProx
- for po, dbrow := range self.Nodes {
- // if row has rounds connected peers, then take the next
- if binSize(po) >= rounds {
- continue ROUND
- }
- if !need {
- // set proxlimit to the PO where the first missing slot is found
- proxLimit = po
- need = true
- }
- purge = make([]bool, len(dbrow))
-
- // there is a missing slot - finding a node to connect to
- // select a node record from the relavant kaddb row (of identical prox order)
- ROW:
- for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) {
- count++
- node = dbrow[cursor]
-
- // skip already connected nodes
- if node.node != nil {
- log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)))
- continue ROW
- }
-
- // if node is scheduled to connect
- if node.After.After(time.Now()) {
- log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
- continue ROW
- }
-
- delta = time.Since(node.Seen)
- if delta < self.initialRetryInterval {
- delta = self.initialRetryInterval
- }
- if delta > self.purgeInterval {
- // remove node
- purge[cursor] = true
- log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen))
- continue ROW
- }
-
- log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
-
- // scheduling next check
- interval = delta * time.Duration(self.connRetryExp)
- after = time.Now().Add(interval)
-
- log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval))
- node.After = after
- found = true
- } // ROW
- self.cursors[po] = cursor
- self.delete(po, purge)
- if found {
- return node, need, proxLimit
- }
- } // ROUND
- } // ROUNDS
-
- return nil, need, proxLimit
-}
-
-// deletes the noderecords of a kaddb row corresponding to the indexes
-// caller must hold the dblock
-// the call is unsafe, no index checks
-func (self *KadDb) delete(row int, purge []bool) {
- var nodes []*NodeRecord
- dbrow := self.Nodes[row]
- for i, del := range purge {
- if i == self.cursors[row] {
- //reset cursor
- self.cursors[row] = len(nodes)
- }
- // delete the entry to be purged
- if del {
- delete(self.index, dbrow[i].Addr)
- continue
- }
- // otherwise append to new list
- nodes = append(nodes, dbrow[i])
- }
- self.Nodes[row] = nodes
-}
-
-// save persists kaddb on disk (written to file on path in json format.
-func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
- defer self.lock.Unlock()
- self.lock.Lock()
-
- var n int
-
- for _, b := range self.Nodes {
- for _, node := range b {
- n++
- node.After = time.Now()
- node.Seen = time.Now()
- if cb != nil {
- cb(node, node.node)
- }
- }
- }
-
- data, err := json.MarshalIndent(self, "", " ")
- if err != nil {
- return err
- }
- err = ioutil.WriteFile(path, data, os.ModePerm)
- if err != nil {
- log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: %v", n, path, err))
- } else {
- log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path))
- }
- return err
-}
-
-// Load(path) loads the node record database (kaddb) from file on path.
-func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) {
- defer self.lock.Unlock()
- self.lock.Lock()
-
- var data []byte
- data, err = ioutil.ReadFile(path)
- if err != nil {
- return
- }
-
- err = json.Unmarshal(data, self)
- if err != nil {
- return
- }
- var n int
- var purge []bool
- for po, b := range self.Nodes {
- purge = make([]bool, len(b))
- ROW:
- for i, node := range b {
- if cb != nil {
- err = cb(node, node.node)
- if err != nil {
- purge[i] = true
- continue ROW
- }
- }
- n++
- if node.After.IsZero() {
- node.After = time.Now()
- }
- self.index[node.Addr] = node
- }
- self.delete(po, purge)
- }
- log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path))
-
- return
-}
-
-// accessor for KAD offline db count
-func (self *KadDb) count() int {
- defer self.lock.Unlock()
- self.lock.Lock()
- return len(self.index)
-}
diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go
deleted file mode 100644
index b5999b52d..000000000
--- a/swarm/network/kademlia/kademlia.go
+++ /dev/null
@@ -1,454 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package kademlia
-
-import (
- "fmt"
- "sort"
- "strings"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
-)
-
-//metrics variables
-//For metrics, we want to count how many times peers are added/removed
-//at a certain index. Thus we do that with an array of counters with
-//entry for each index
-var (
- bucketAddIndexCount []metrics.Counter
- bucketRmIndexCount []metrics.Counter
-)
-
-const (
- bucketSize = 4
- proxBinSize = 2
- maxProx = 8
- connRetryExp = 2
- maxPeers = 100
-)
-
-var (
- purgeInterval = 42 * time.Hour
- initialRetryInterval = 42 * time.Millisecond
- maxIdleInterval = 42 * 1000 * time.Millisecond
- // maxIdleInterval = 42 * 10 0 * time.Millisecond
-)
-
-type KadParams struct {
- // adjustable parameters
- MaxProx int
- ProxBinSize int
- BucketSize int
- PurgeInterval time.Duration
- InitialRetryInterval time.Duration
- MaxIdleInterval time.Duration
- ConnRetryExp int
-}
-
-func NewDefaultKadParams() *KadParams {
- return &KadParams{
- MaxProx: maxProx,
- ProxBinSize: proxBinSize,
- BucketSize: bucketSize,
- PurgeInterval: purgeInterval,
- InitialRetryInterval: initialRetryInterval,
- MaxIdleInterval: maxIdleInterval,
- ConnRetryExp: connRetryExp,
- }
-}
-
-// Kademlia is a table of active nodes
-type Kademlia struct {
- addr Address // immutable baseaddress of the table
- *KadParams // Kademlia configuration parameters
- proxLimit int // state, the PO of the first row of the most proximate bin
- proxSize int // state, the number of peers in the most proximate bin
- count int // number of active peers (w live connection)
- buckets [][]Node // the actual bins
- db *KadDb // kaddb, node record database
- lock sync.RWMutex // mutex to access buckets
-}
-
-type Node interface {
- Addr() Address
- Url() string
- LastActive() time.Time
- Drop()
-}
-
-// public constructor
-// add is the base address of the table
-// params is KadParams configuration
-func New(addr Address, params *KadParams) *Kademlia {
- buckets := make([][]Node, params.MaxProx+1)
- kad := &Kademlia{
- addr: addr,
- KadParams: params,
- buckets: buckets,
- db: newKadDb(addr, params),
- }
- kad.initMetricsVariables()
- return kad
-}
-
-// accessor for KAD base address
-func (self *Kademlia) Addr() Address {
- return self.addr
-}
-
-// accessor for KAD active node count
-func (self *Kademlia) Count() int {
- defer self.lock.Unlock()
- self.lock.Lock()
- return self.count
-}
-
-// accessor for KAD active node count
-func (self *Kademlia) DBCount() int {
- return self.db.count()
-}
-
-// On is the entry point called when a new nodes is added
-// unsafe in that node is not checked to be already active node (to be called once)
-func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) {
- log.Debug(fmt.Sprintf("%v", self))
- defer self.lock.Unlock()
- self.lock.Lock()
-
- index := self.proximityBin(node.Addr())
- record := self.db.findOrCreate(index, node.Addr(), node.Url())
-
- if cb != nil {
- err = cb(record, node)
- log.Trace(fmt.Sprintf("cb(%v, %v) ->%v", record, node, err))
- if err != nil {
- return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err)
- }
- log.Debug(fmt.Sprintf("add node record %v with node %v", record, node))
- }
-
- // insert in kademlia table of active nodes
- bucket := self.buckets[index]
- // if bucket is full insertion replaces the worst node
- // TODO: give priority to peers with active traffic
- if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
- self.buckets[index] = append(bucket, node)
- bucketAddIndexCount[index].Inc(1)
- log.Debug(fmt.Sprintf("add node %v to table", node))
- self.setProxLimit(index, true)
- record.node = node
- self.count++
- return nil
- }
-
- // always rotate peers
- idle := self.MaxIdleInterval
- var pos int
- var replaced Node
- for i, p := range bucket {
- idleInt := time.Since(p.LastActive())
- if idleInt > idle {
- idle = idleInt
- pos = i
- replaced = p
- }
- }
- if replaced == nil {
- log.Debug(fmt.Sprintf("all peers wanted, PO%03d bucket full", index))
- return fmt.Errorf("bucket full")
- }
- log.Debug(fmt.Sprintf("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval))
- replaced.Drop()
- // actually replace in the row. When off(node) is called, the peer is no longer in the row
- bucket[pos] = node
- // there is no change in bucket cardinalities so no prox limit adjustment is needed
- record.node = node
- self.count++
- return nil
-
-}
-
-// Off is the called when a node is taken offline (from the protocol main loop exit)
-func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
- self.lock.Lock()
- defer self.lock.Unlock()
-
- index := self.proximityBin(node.Addr())
- bucketRmIndexCount[index].Inc(1)
- bucket := self.buckets[index]
- for i := 0; i < len(bucket); i++ {
- if node.Addr() == bucket[i].Addr() {
- self.buckets[index] = append(bucket[:i], bucket[(i+1):]...)
- self.setProxLimit(index, false)
- break
- }
- }
-
- record := self.db.index[node.Addr()]
- // callback on remove
- if cb != nil {
- cb(record, record.node)
- }
- record.node = nil
- self.count--
- log.Debug(fmt.Sprintf("remove node %v from table, population now is %v", node, self.count))
-
- return
-}
-
-// proxLimit is dynamically adjusted so that
-// 1) there is no empty buckets in bin < proxLimit and
-// 2) the sum of all items are the minimum possible but higher than ProxBinSize
-// adjust Prox (proxLimit and proxSize after an insertion/removal of nodes)
-// caller holds the lock
-func (self *Kademlia) setProxLimit(r int, on bool) {
- // if the change is outside the core (PO lower)
- // and the change does not leave a bucket empty then
- // no adjustment needed
- if r < self.proxLimit && len(self.buckets[r]) > 0 {
- return
- }
- // if on=a node was added, then r must be within prox limit so increment cardinality
- if on {
- self.proxSize++
- curr := len(self.buckets[self.proxLimit])
- // if now core is big enough without the furthest bucket, then contract
- // this can result in more than one bucket change
- for self.proxSize >= self.ProxBinSize+curr && curr > 0 {
- self.proxSize -= curr
- self.proxLimit++
- curr = len(self.buckets[self.proxLimit])
-
- log.Trace(fmt.Sprintf("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r))
- }
- return
- }
- // otherwise
- if r >= self.proxLimit {
- self.proxSize--
- }
- // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality
- for (self.proxSize < self.ProxBinSize || r < self.proxLimit) &&
- self.proxLimit > 0 {
- //
- self.proxLimit--
- self.proxSize += len(self.buckets[self.proxLimit])
- log.Trace(fmt.Sprintf("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r))
- }
-}
-
-/*
-returns the list of nodes belonging to the same proximity bin
-as the target. The most proximate bin will be the union of the bins between
-proxLimit and MaxProx.
-*/
-func (self *Kademlia) FindClosest(target Address, max int) []Node {
- self.lock.Lock()
- defer self.lock.Unlock()
-
- r := nodesByDistance{
- target: target,
- }
-
- po := self.proximityBin(target)
- index := po
- step := 1
- log.Trace(fmt.Sprintf("serving %v nodes at %v (PO%02d)", max, index, po))
-
- // if max is set to 0, just want a full bucket, dynamic number
- min := max
- // set limit to max
- limit := max
- if max == 0 {
- min = 1
- limit = maxPeers
- }
-
- var n int
- for index >= 0 {
- // add entire bucket
- for _, p := range self.buckets[index] {
- r.push(p, limit)
- n++
- }
- // terminate if index reached the bottom or enough peers > min
- log.Trace(fmt.Sprintf("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po))
- if n >= min && (step < 0 || max == 0) {
- break
- }
- // reach top most non-empty PO bucket, turn around
- if index == self.MaxProx {
- index = po
- step = -1
- }
- index += step
- }
- log.Trace(fmt.Sprintf("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po))
- return r.nodes
-}
-
-func (self *Kademlia) Suggest() (*NodeRecord, bool, int) {
- defer self.lock.RUnlock()
- self.lock.RLock()
- return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) })
-}
-
-// adds node records to kaddb (persisted node record db)
-func (self *Kademlia) Add(nrs []*NodeRecord) {
- self.db.add(nrs, self.proximityBin)
-}
-
-// nodesByDistance is a list of nodes, ordered by distance to target.
-type nodesByDistance struct {
- nodes []Node
- target Address
-}
-
-func sortedByDistanceTo(target Address, slice []Node) bool {
- var last Address
- for i, node := range slice {
- if i > 0 {
- if target.ProxCmp(node.Addr(), last) < 0 {
- return false
- }
- }
- last = node.Addr()
- }
- return true
-}
-
-// push(node, max) adds the given node to the list, keeping the total size
-// below max elements.
-func (h *nodesByDistance) push(node Node, max int) {
- // returns the firt index ix such that func(i) returns true
- ix := sort.Search(len(h.nodes), func(i int) bool {
- return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0
- })
-
- if len(h.nodes) < max {
- h.nodes = append(h.nodes, node)
- }
- if ix < len(h.nodes) {
- copy(h.nodes[ix+1:], h.nodes[ix:])
- h.nodes[ix] = node
- }
-}
-
-/*
-Taking the proximity order relative to a fix point x classifies the points in
-the space (n byte long byte sequences) into bins. Items in each are at
-most half as distant from x as items in the previous bin. Given a sample of
-uniformly distributed items (a hash function over arbitrary sequence) the
-proximity scale maps onto series of subsets with cardinalities on a negative
-exponential scale.
-
-It also has the property that any two item belonging to the same bin are at
-most half as distant from each other as they are from x.
-
-If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local
-decisions for graph traversal where the task is to find a route between two
-points. Since in every hop, the finite distance halves, there is
-a guaranteed constant maximum limit on the number of hops needed to reach one
-node from the other.
-*/
-
-func (self *Kademlia) proximityBin(other Address) (ret int) {
- ret = proximity(self.addr, other)
- if ret > self.MaxProx {
- ret = self.MaxProx
- }
- return
-}
-
-// provides keyrange for chunk db iteration
-func (self *Kademlia) KeyRange(other Address) (start, stop Address) {
- defer self.lock.RUnlock()
- self.lock.RLock()
- return KeyRange(self.addr, other, self.proxLimit)
-}
-
-// save persists kaddb on disk (written to file on path in json format.
-func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error {
- return self.db.save(path, cb)
-}
-
-// Load(path) loads the node record database (kaddb) from file on path.
-func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) {
- return self.db.load(path, cb)
-}
-
-// kademlia table + kaddb table displayed with ascii
-func (self *Kademlia) String() string {
- defer self.lock.RUnlock()
- self.lock.RLock()
- defer self.db.lock.RUnlock()
- self.db.lock.RLock()
-
- var rows []string
- rows = append(rows, "=========================================================================")
- rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6]))
- rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize))
- rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize))
-
- for i, bucket := range self.buckets {
-
- if i == self.proxLimit {
- rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i))
- }
- row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))}
- var k int
- c := self.db.cursors[i]
- for ; k < len(bucket); k++ {
- p := bucket[(c+k)%len(bucket)]
- row = append(row, p.Addr().String()[:6])
- if k == 4 {
- break
- }
- }
- for ; k < 4; k++ {
- row = append(row, " ")
- }
- row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i]))
-
- for j, p := range self.db.Nodes[i] {
- row = append(row, p.Addr.String()[:6])
- if j == 3 {
- break
- }
- }
- rows = append(rows, strings.Join(row, " "))
- if i == self.MaxProx {
- }
- }
- rows = append(rows, "=========================================================================")
- return strings.Join(rows, "\n")
-}
-
-//We have to build up the array of counters for each index
-func (self *Kademlia) initMetricsVariables() {
- //create the arrays
- bucketAddIndexCount = make([]metrics.Counter, self.MaxProx+1)
- bucketRmIndexCount = make([]metrics.Counter, self.MaxProx+1)
- //at each index create a metrics counter
- for i := 0; i < (self.KadParams.MaxProx + 1); i++ {
- bucketAddIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.add.%d.index", i), nil)
- bucketRmIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.rm.%d.index", i), nil)
- }
-}
diff --git a/swarm/network/kademlia/kademlia_test.go b/swarm/network/kademlia/kademlia_test.go
deleted file mode 100644
index 88858908a..000000000
--- a/swarm/network/kademlia/kademlia_test.go
+++ /dev/null
@@ -1,392 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package kademlia
-
-import (
- "fmt"
- "math"
- "math/rand"
- "os"
- "path/filepath"
- "reflect"
- "testing"
- "testing/quick"
- "time"
-)
-
-var (
- quickrand = rand.New(rand.NewSource(time.Now().Unix()))
- quickcfgFindClosest = &quick.Config{MaxCount: 50, Rand: quickrand}
- quickcfgBootStrap = &quick.Config{MaxCount: 100, Rand: quickrand}
-)
-
-type testNode struct {
- addr Address
-}
-
-func (n *testNode) String() string {
- return fmt.Sprintf("%x", n.addr[:])
-}
-
-func (n *testNode) Addr() Address {
- return n.addr
-}
-
-func (n *testNode) Drop() {
-}
-
-func (n *testNode) Url() string {
- return ""
-}
-
-func (n *testNode) LastActive() time.Time {
- return time.Now()
-}
-
-func TestOn(t *testing.T) {
- addr, ok1 := gen(Address{}, quickrand).(Address)
- other, ok2 := gen(Address{}, quickrand).(Address)
- if !ok1 || !ok2 {
- t.Errorf("oops")
- }
- kad := New(addr, NewDefaultKadParams())
- err := kad.On(&testNode{addr: other}, nil)
- _ = err
-}
-
-func TestBootstrap(t *testing.T) {
-
- test := func(test *bootstrapTest) bool {
- // for any node kad.le, Target and N
- params := NewDefaultKadParams()
- params.MaxProx = test.MaxProx
- params.BucketSize = test.BucketSize
- params.ProxBinSize = test.BucketSize
- kad := New(test.Self, params)
- var err error
-
- for p := 0; p < 9; p++ {
- var nrs []*NodeRecord
- n := math.Pow(float64(2), float64(7-p))
- for i := 0; i < int(n); i++ {
- addr := RandomAddressAt(test.Self, p)
- nrs = append(nrs, &NodeRecord{
- Addr: addr,
- })
- }
- kad.Add(nrs)
- }
-
- node := &testNode{test.Self}
-
- n := 0
- for n < 100 {
- err = kad.On(node, nil)
- if err != nil {
- t.Fatalf("backend not accepting node: %v", err)
- }
-
- record, need, _ := kad.Suggest()
- if !need {
- break
- }
- n++
- if record == nil {
- continue
- }
- node = &testNode{record.Addr}
- }
- exp := test.BucketSize * (test.MaxProx + 1)
- if kad.Count() != exp {
- t.Errorf("incorrect number of peers, expected %d, got %d\n%v", exp, kad.Count(), kad)
- return false
- }
- return true
- }
- if err := quick.Check(test, quickcfgBootStrap); err != nil {
- t.Error(err)
- }
-
-}
-
-func TestFindClosest(t *testing.T) {
-
- test := func(test *FindClosestTest) bool {
- // for any node kad.le, Target and N
- params := NewDefaultKadParams()
- params.MaxProx = 7
- kad := New(test.Self, params)
- var err error
- for _, node := range test.All {
- err = kad.On(node, nil)
- if err != nil && err.Error() != "bucket full" {
- t.Fatalf("backend not accepting node: %v", err)
- }
- }
-
- if len(test.All) == 0 || test.N == 0 {
- return true
- }
- nodes := kad.FindClosest(test.Target, test.N)
-
- // check that the number of results is min(N, kad.len)
- wantN := test.N
- if tlen := kad.Count(); tlen < test.N {
- wantN = tlen
- }
-
- if len(nodes) != wantN {
- t.Errorf("wrong number of nodes: got %d, want %d", len(nodes), wantN)
- return false
- }
-
- if hasDuplicates(nodes) {
- t.Errorf("result contains duplicates")
- return false
- }
-
- if !sortedByDistanceTo(test.Target, nodes) {
- t.Errorf("result is not sorted by distance to target")
- return false
- }
-
- // check that the result nodes have minimum distance to target.
- farthestResult := nodes[len(nodes)-1].Addr()
- for i, b := range kad.buckets {
- for j, n := range b {
- if contains(nodes, n.Addr()) {
- continue // don't run the check below for nodes in result
- }
- if test.Target.ProxCmp(n.Addr(), farthestResult) < 0 {
- _ = i * j
- t.Errorf("kad.le contains node that is closer to target but it's not in result")
- return false
- }
- }
- }
- return true
- }
- if err := quick.Check(test, quickcfgFindClosest); err != nil {
- t.Error(err)
- }
-}
-
-type proxTest struct {
- add bool
- index int
- addr Address
-}
-
-var (
- addresses []Address
-)
-
-func TestProxAdjust(t *testing.T) {
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- self := gen(Address{}, r).(Address)
- params := NewDefaultKadParams()
- params.MaxProx = 7
- kad := New(self, params)
-
- var err error
- for i := 0; i < 100; i++ {
- a := gen(Address{}, r).(Address)
- addresses = append(addresses, a)
- err = kad.On(&testNode{addr: a}, nil)
- if err != nil && err.Error() != "bucket full" {
- t.Fatalf("backend not accepting node: %v", err)
- }
- if !kad.proxCheck(t) {
- return
- }
- }
- test := func(test *proxTest) bool {
- node := &testNode{test.addr}
- if test.add {
- kad.On(node, nil)
- } else {
- kad.Off(node, nil)
- }
- return kad.proxCheck(t)
- }
- if err := quick.Check(test, quickcfgFindClosest); err != nil {
- t.Error(err)
- }
-}
-
-func TestSaveLoad(t *testing.T) {
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- addresses := gen([]Address{}, r).([]Address)
- self := RandomAddress()
- params := NewDefaultKadParams()
- params.MaxProx = 7
- kad := New(self, params)
-
- var err error
-
- for _, a := range addresses {
- err = kad.On(&testNode{addr: a}, nil)
- if err != nil && err.Error() != "bucket full" {
- t.Fatalf("backend not accepting node: %v", err)
- }
- }
- nodes := kad.FindClosest(self, 100)
-
- path := filepath.Join(os.TempDir(), "bzz-kad-test-save-load.peers")
- err = kad.Save(path, nil)
- if err != nil && err.Error() != "bucket full" {
- t.Fatalf("unepected error saving kaddb: %v", err)
- }
- kad = New(self, params)
- err = kad.Load(path, nil)
- if err != nil && err.Error() != "bucket full" {
- t.Fatalf("unepected error loading kaddb: %v", err)
- }
- for _, b := range kad.db.Nodes {
- for _, node := range b {
- err = kad.On(&testNode{node.Addr}, nil)
- if err != nil && err.Error() != "bucket full" {
- t.Fatalf("backend not accepting node: %v", err)
- }
- }
- }
- loadednodes := kad.FindClosest(self, 100)
- for i, node := range loadednodes {
- if nodes[i].Addr() != node.Addr() {
- t.Errorf("node mismatch at %d/%d: %v != %v", i, len(nodes), nodes[i].Addr(), node.Addr())
- }
- }
-}
-
-func (self *Kademlia) proxCheck(t *testing.T) bool {
- var sum int
- for i, b := range self.buckets {
- l := len(b)
- // if we are in the high prox multibucket
- if i >= self.proxLimit {
- sum += l
- } else if l == 0 {
- t.Errorf("bucket %d empty, yet proxLimit is %d\n%v", len(b), self.proxLimit, self)
- return false
- }
- }
- // check if merged high prox bucket does not exceed size
- if sum > 0 {
- if sum != self.proxSize {
- t.Errorf("proxSize incorrect, expected %v, got %v", sum, self.proxSize)
- return false
- }
- last := len(self.buckets[self.proxLimit])
- if last > 0 && sum >= self.ProxBinSize+last {
- t.Errorf("proxLimit %v incorrect, redundant non-empty bucket %d added to proxBin with %v (target %v)\n%v", self.proxLimit, last, sum-last, self.ProxBinSize, self)
- return false
- }
- if self.proxLimit > 0 && sum < self.ProxBinSize {
- t.Errorf("proxLimit %v incorrect. proxSize %v is less than target %v, yet there is more peers", self.proxLimit, sum, self.ProxBinSize)
- return false
- }
- }
- return true
-}
-
-type bootstrapTest struct {
- MaxProx int
- BucketSize int
- Self Address
-}
-
-func (*bootstrapTest) Generate(rand *rand.Rand, size int) reflect.Value {
- t := &bootstrapTest{
- Self: gen(Address{}, rand).(Address),
- MaxProx: 5 + rand.Intn(2),
- BucketSize: rand.Intn(3) + 1,
- }
- return reflect.ValueOf(t)
-}
-
-type FindClosestTest struct {
- Self Address
- Target Address
- All []Node
- N int
-}
-
-func (c FindClosestTest) String() string {
- return fmt.Sprintf("A: %064x\nT: %064x\n(%d)\n", c.Self[:], c.Target[:], c.N)
-}
-
-func (*FindClosestTest) Generate(rand *rand.Rand, size int) reflect.Value {
- t := &FindClosestTest{
- Self: gen(Address{}, rand).(Address),
- Target: gen(Address{}, rand).(Address),
- N: rand.Intn(bucketSize),
- }
- for _, a := range gen([]Address{}, rand).([]Address) {
- t.All = append(t.All, &testNode{addr: a})
- }
- return reflect.ValueOf(t)
-}
-
-func (*proxTest) Generate(rand *rand.Rand, size int) reflect.Value {
- var add bool
- if rand.Intn(1) == 0 {
- add = true
- }
- var t *proxTest
- if add {
- t = &proxTest{
- addr: gen(Address{}, rand).(Address),
- add: add,
- }
- } else {
- t = &proxTest{
- index: rand.Intn(len(addresses)),
- add: add,
- }
- }
- return reflect.ValueOf(t)
-}
-
-func hasDuplicates(slice []Node) bool {
- seen := make(map[Address]bool)
- for _, node := range slice {
- if seen[node.Addr()] {
- return true
- }
- seen[node.Addr()] = true
- }
- return false
-}
-
-func contains(nodes []Node, addr Address) bool {
- for _, n := range nodes {
- if n.Addr() == addr {
- return true
- }
- }
- return false
-}
-
-// gen wraps quick.Value so it's easier to use.
-// it generates a random value of the given value's type.
-func gen(typ interface{}, rand *rand.Rand) interface{} {
- v, ok := quick.Value(reflect.TypeOf(typ), rand)
- if !ok {
- panic(fmt.Sprintf("couldn't generate random value of type %T", typ))
- }
- return v.Interface()
-}