path: root/swarm/network/kademlia
diff options
Diffstat (limited to 'swarm/network/kademlia')
5 files changed, 1441 insertions, 0 deletions
diff --git a/swarm/network/kademlia/address.go b/swarm/network/kademlia/address.go
new file mode 100644
index 000000000..16c5ce532
--- /dev/null
+++ b/swarm/network/kademlia/address.go
@@ -0,0 +1,173 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package kademlia
+import (
+ "fmt"
+ "math/rand"
+ "strings"
+ "github.com/ethereum/go-ethereum/common"
+type Address common.Hash
+func (a Address) String() string {
+ return fmt.Sprintf("%x", a[:])
+func (a *Address) MarshalJSON() (out []byte, err error) {
+ return []byte(`"` + a.String() + `"`), nil
+func (a *Address) UnmarshalJSON(value []byte) error {
+ *a = Address(common.HexToHash(string(value[1 : len(value)-1])))
+ return nil
+// the string form of the binary representation of an address (only first 8 bits)
+func (a Address) Bin() string {
+ var bs []string
+ for _, b := range a[:] {
+ bs = append(bs, fmt.Sprintf("%08b", b))
+ }
+ return strings.Join(bs, "")
+Proximity(x, y) returns the proximity order of the MSB distance between x and y
+The distance metric MSB(x, y) of two equal length byte sequences x an y is the
+value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed.
+the binary cast is big endian: most significant bit first (=MSB).
+Proximity(x, y) is a discrete logarithmic scaling of the MSB distance.
+It is defined as the reverse rank of the integer part of the base 2
+logarithm of the distance.
+It is calculated by counting the number of common leading zeros in the (MSB)
+binary representation of the x^y.
+(0 farthest, 255 closest, 256 self)
+func proximity(one, other Address) (ret int) {
+ for i := 0; i < len(one); i++ {
+ oxo := one[i] ^ other[i]
+ for j := 0; j < 8; j++ {
+ if (uint8(oxo)>>uint8(7-j))&0x01 != 0 {
+ return i*8 + j
+ }
+ }
+ }
+ return len(one) * 8
+// Address.ProxCmp compares the distances a->target and b->target.
+// Returns -1 if a is closer to target, 1 if b is closer to target
+// and 0 if they are equal.
+func (target Address) ProxCmp(a, b Address) int {
+ for i := range target {
+ da := a[i] ^ target[i]
+ db := b[i] ^ target[i]
+ if da > db {
+ return 1
+ } else if da < db {
+ return -1
+ }
+ }
+ return 0
+// randomAddressAt(address, prox) generates a random address
+// at proximity order prox relative to address
+// if prox is negative a random address is generated
+func RandomAddressAt(self Address, prox int) (addr Address) {
+ addr = self
+ var pos int
+ if prox >= 0 {
+ pos = prox / 8
+ trans := prox % 8
+ transbytea := byte(0)
+ for j := 0; j <= trans; j++ {
+ transbytea |= 1 << uint8(7-j)
+ }
+ flipbyte := byte(1 << uint8(7-trans))
+ transbyteb := transbytea ^ byte(255)
+ randbyte := byte(rand.Intn(255))
+ addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb
+ }
+ for i := pos + 1; i < len(addr); i++ {
+ addr[i] = byte(rand.Intn(255))
+ }
+ return
+// KeyRange(a0, a1, proxLimit) returns the address inclusive address
+// range that contain addresses closer to one than other
+func KeyRange(one, other Address, proxLimit int) (start, stop Address) {
+ prox := proximity(one, other)
+ if prox >= proxLimit {
+ prox = proxLimit
+ }
+ start = CommonBitsAddrByte(one, other, byte(0x00), prox)
+ stop = CommonBitsAddrByte(one, other, byte(0xff), prox)
+ return
+func CommonBitsAddrF(self, other Address, f func() byte, p int) (addr Address) {
+ prox := proximity(self, other)
+ var pos int
+ if p <= prox {
+ prox = p
+ }
+ pos = prox / 8
+ addr = self
+ trans := byte(prox % 8)
+ var transbytea byte
+ if p > prox {
+ transbytea = byte(0x7f)
+ } else {
+ transbytea = byte(0xff)
+ }
+ transbytea >>= trans
+ transbyteb := transbytea ^ byte(0xff)
+ addrpos := addr[pos]
+ addrpos &= transbyteb
+ if p > prox {
+ addrpos ^= byte(0x80 >> trans)
+ }
+ addrpos |= transbytea & f()
+ addr[pos] = addrpos
+ for i := pos + 1; i < len(addr); i++ {
+ addr[i] = f()
+ }
+ return
+func CommonBitsAddr(self, other Address, prox int) (addr Address) {
+ return CommonBitsAddrF(self, other, func() byte { return byte(rand.Intn(255)) }, prox)
+func CommonBitsAddrByte(self, other Address, b byte, prox int) (addr Address) {
+ return CommonBitsAddrF(self, other, func() byte { return b }, prox)
+// randomAddressAt() generates a random address
+func RandomAddress() Address {
+ return RandomAddressAt(Address{}, -1)
diff --git a/swarm/network/kademlia/address_test.go b/swarm/network/kademlia/address_test.go
new file mode 100644
index 000000000..c062c8eaf
--- /dev/null
+++ b/swarm/network/kademlia/address_test.go
@@ -0,0 +1,96 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package kademlia
+import (
+ "math/rand"
+ "reflect"
+ "testing"
+ "github.com/ethereum/go-ethereum/common"
+func (Address) Generate(rand *rand.Rand, size int) reflect.Value {
+ var id Address
+ for i := 0; i < len(id); i++ {
+ id[i] = byte(uint8(rand.Intn(255)))
+ }
+ return reflect.ValueOf(id)
+func TestCommonBitsAddrF(t *testing.T) {
+ a := Address(common.HexToHash("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ b := Address(common.HexToHash("0x8123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ c := Address(common.HexToHash("0x4123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ d := Address(common.HexToHash("0x0023456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ e := Address(common.HexToHash("0x01A3456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ ab := CommonBitsAddrF(a, b, func() byte { return byte(0x00) }, 10)
+ expab := Address(common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000"))
+ if ab != expab {
+ t.Fatalf("%v != %v", ab, expab)
+ }
+ ac := CommonBitsAddrF(a, c, func() byte { return byte(0x00) }, 10)
+ expac := Address(common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000"))
+ if ac != expac {
+ t.Fatalf("%v != %v", ac, expac)
+ }
+ ad := CommonBitsAddrF(a, d, func() byte { return byte(0x00) }, 10)
+ expad := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
+ if ad != expad {
+ t.Fatalf("%v != %v", ad, expad)
+ }
+ ae := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 10)
+ expae := Address(common.HexToHash("0x0180000000000000000000000000000000000000000000000000000000000000"))
+ if ae != expae {
+ t.Fatalf("%v != %v", ae, expae)
+ }
+ acf := CommonBitsAddrF(a, c, func() byte { return byte(0xff) }, 10)
+ expacf := Address(common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
+ if acf != expacf {
+ t.Fatalf("%v != %v", acf, expacf)
+ }
+ aeo := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 2)
+ expaeo := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
+ if aeo != expaeo {
+ t.Fatalf("%v != %v", aeo, expaeo)
+ }
+ aep := CommonBitsAddrF(a, e, func() byte { return byte(0xff) }, 2)
+ expaep := Address(common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
+ if aep != expaep {
+ t.Fatalf("%v != %v", aep, expaep)
+ }
+func TestRandomAddressAt(t *testing.T) {
+ var a Address
+ for i := 0; i < 100; i++ {
+ a = RandomAddress()
+ prox := rand.Intn(255)
+ b := RandomAddressAt(a, prox)
+ if proximity(a, b) != prox {
+ t.Fatalf("incorrect address prox(%v, %v) == %v (expected %v)", a, b, proximity(a, b), prox)
+ }
+ }
diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go
new file mode 100644
index 000000000..53a7db451
--- /dev/null
+++ b/swarm/network/kademlia/kaddb.go
@@ -0,0 +1,351 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package kademlia
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+ "time"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+type NodeData interface {
+ json.Marshaler
+ json.Unmarshaler
+// allow inactive peers under
+type NodeRecord struct {
+ Addr Address // address of node
+ Url string // Url, used to connect to node
+ After time.Time // next call after time
+ Seen time.Time // last connected at time
+ Meta *json.RawMessage // arbitrary metadata saved for a peer
+ node Node
+func (self *NodeRecord) setSeen() {
+ t := time.Now()
+ self.Seen = t
+ self.After = t
+func (self *NodeRecord) String() string {
+ return fmt.Sprintf("<%v>", self.Addr)
+// persisted node record database ()
+type KadDb struct {
+ Address Address
+ Nodes [][]*NodeRecord
+ index map[Address]*NodeRecord
+ cursors []int
+ lock sync.RWMutex
+ purgeInterval time.Duration
+ initialRetryInterval time.Duration
+ connRetryExp int
+func newKadDb(addr Address, params *KadParams) *KadDb {
+ return &KadDb{
+ Address: addr,
+ Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load
+ cursors: make([]int, params.MaxProx+1),
+ index: make(map[Address]*NodeRecord),
+ purgeInterval: params.PurgeInterval,
+ initialRetryInterval: params.InitialRetryInterval,
+ connRetryExp: params.ConnRetryExp,
+ }
+func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ record, found := self.index[a]
+ if !found {
+ record = &NodeRecord{
+ Addr: a,
+ Url: url,
+ }
+ glog.V(logger.Info).Infof("add new record %v to kaddb", record)
+ // insert in kaddb
+ self.index[a] = record
+ self.Nodes[index] = append(self.Nodes[index], record)
+ } else {
+ glog.V(logger.Info).Infof("found record %v in kaddb", record)
+ }
+ // update last seen time
+ record.setSeen()
+ // update with url in case IP/port changes
+ record.Url = url
+ return record
+// add adds node records to kaddb (persisted node record db)
+func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ var n int
+ var nodes []*NodeRecord
+ for _, node := range nrs {
+ _, found := self.index[node.Addr]
+ if !found && node.Addr != self.Address {
+ node.setSeen()
+ self.index[node.Addr] = node
+ index := proximityBin(node.Addr)
+ dbcursor := self.cursors[index]
+ nodes = self.Nodes[index]
+ // this is inefficient for allocation, need to just append then shift
+ newnodes := make([]*NodeRecord, len(nodes)+1)
+ copy(newnodes[:], nodes[:dbcursor])
+ newnodes[dbcursor] = node
+ copy(newnodes[dbcursor+1:], nodes[dbcursor:])
+ glog.V(logger.Detail).Infof("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes)
+ self.Nodes[index] = newnodes
+ n++
+ }
+ }
+ if n > 0 {
+ glog.V(logger.Debug).Infof("%d/%d node records (new/known)", n, len(nrs))
+ }
+next return one node record with the highest priority for desired
+This is used to pick candidates for live nodes that are most wanted for
+a higly connected low centrality network structure for Swarm which best suits
+for a Kademlia-style routing.
+* Starting as naive node with empty db, this implements Kademlia bootstrapping
+* As a mature node, it fills short lines. All on demand.
+The candidate is chosen using the following strategy:
+We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds.
+On each round we proceed from the low to high proximity order buckets.
+If the number of active nodes (=connected peers) is < rounds, then start looking
+for a known candidate. To determine if there is a candidate to recommend the
+kaddb node record database row corresponding to the bucket is checked.
+If the row cursor is on position i, the ith element in the row is chosen.
+If the record is scheduled not to be retried before NOW, the next element is taken.
+If the record is scheduled to be retried, it is set as checked, scheduled for
+checking and is returned. The time of the next check is in X (duration) such that
+X = ConnRetryExp * delta where delta is the time past since the last check and
+ConnRetryExp is constant obsoletion factor. (Note that when node records are added
+from peer messages, they are marked as checked and placed at the cursor, ie.
+given priority over older entries). Entries which were checked more than
+purgeInterval ago are deleted from the kaddb row. If no candidate is found after
+a full round of checking the next bucket up is considered. If no candidate is
+found when we reach the maximum-proximity bucket, the next round starts.
+node record a is more favoured to b a > b iff a is a passive node (record of
+offline past peer)
+|proxBin(a)| < |proxBin(b)|
+|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|)
+|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b))
+The second argument returned names the first missing slot found
+func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) {
+ // return nil, proxLimit indicates that all buckets are filled
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ var interval time.Duration
+ var found bool
+ var purge []bool
+ var delta time.Duration
+ var cursor int
+ var count int
+ var after time.Time
+ // iterate over columns maximum bucketsize times
+ for rounds := 1; rounds <= maxBinSize; rounds++ {
+ // iterate over rows from PO 0 upto MaxProx
+ for po, dbrow := range self.Nodes {
+ // if row has rounds connected peers, then take the next
+ if binSize(po) >= rounds {
+ continue ROUND
+ }
+ if !need {
+ // set proxlimit to the PO where the first missing slot is found
+ proxLimit = po
+ need = true
+ }
+ purge = make([]bool, len(dbrow))
+ // there is a missing slot - finding a node to connect to
+ // select a node record from the relavant kaddb row (of identical prox order)
+ ROW:
+ for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) {
+ count++
+ node = dbrow[cursor]
+ // skip already connected nodes
+ if node.node != nil {
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow))
+ continue ROW
+ }
+ // if node is scheduled to connect
+ if time.Time(node.After).After(time.Now()) {
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)
+ continue ROW
+ }
+ delta = time.Since(time.Time(node.Seen))
+ if delta < self.initialRetryInterval {
+ delta = self.initialRetryInterval
+ }
+ if delta > self.purgeInterval {
+ // remove node
+ purge[cursor] = true
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen)
+ continue ROW
+ }
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)
+ // scheduling next check
+ interval = time.Duration(delta * time.Duration(self.connRetryExp))
+ after = time.Now().Add(interval)
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval)
+ node.After = after
+ found = true
+ } // ROW
+ self.cursors[po] = cursor
+ self.delete(po, purge)
+ if found {
+ return node, need, proxLimit
+ }
+ } // ROUND
+ } // ROUNDS
+ return nil, need, proxLimit
+// deletes the noderecords of a kaddb row corresponding to the indexes
+// caller must hold the dblock
+// the call is unsafe, no index checks
+func (self *KadDb) delete(row int, purge []bool) {
+ var nodes []*NodeRecord
+ dbrow := self.Nodes[row]
+ for i, del := range purge {
+ if i == self.cursors[row] {
+ //reset cursor
+ self.cursors[row] = len(nodes)
+ }
+ // delete the entry to be purged
+ if del {
+ delete(self.index, dbrow[i].Addr)
+ continue
+ }
+ // otherwise append to new list
+ nodes = append(nodes, dbrow[i])
+ }
+ self.Nodes[row] = nodes
+// save persists kaddb on disk (written to file on path in json format.
+func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ var n int
+ for _, b := range self.Nodes {
+ for _, node := range b {
+ n++
+ node.After = time.Now()
+ node.Seen = time.Now()
+ if cb != nil {
+ cb(node, node.node)
+ }
+ }
+ }
+ data, err := json.MarshalIndent(self, "", " ")
+ if err != nil {
+ return err
+ }
+ err = ioutil.WriteFile(path, data, os.ModePerm)
+ if err != nil {
+ glog.V(logger.Warn).Infof("unable to save kaddb with %v nodes to %v: err", n, path, err)
+ } else {
+ glog.V(logger.Info).Infof("saved kaddb with %v nodes to %v", n, path)
+ }
+ return err
+// Load(path) loads the node record database (kaddb) from file on path.
+func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ var data []byte
+ data, err = ioutil.ReadFile(path)
+ if err != nil {
+ return
+ }
+ err = json.Unmarshal(data, self)
+ if err != nil {
+ return
+ }
+ var n int
+ var purge []bool
+ for po, b := range self.Nodes {
+ purge = make([]bool, len(b))
+ ROW:
+ for i, node := range b {
+ if cb != nil {
+ err = cb(node, node.node)
+ if err != nil {
+ purge[i] = true
+ continue ROW
+ }
+ }
+ n++
+ if (node.After == time.Time{}) {
+ node.After = time.Now()
+ }
+ self.index[node.Addr] = node
+ }
+ self.delete(po, purge)
+ }
+ glog.V(logger.Info).Infof("loaded kaddb with %v nodes from %v", n, path)
+ return
+// accessor for KAD offline db count
+func (self *KadDb) count() int {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ return len(self.index)
diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go
new file mode 100644
index 000000000..87c57cefe
--- /dev/null
+++ b/swarm/network/kademlia/kademlia.go
@@ -0,0 +1,429 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package kademlia
+import (
+ "fmt"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+const (
+ bucketSize = 4
+ proxBinSize = 2
+ maxProx = 8
+ connRetryExp = 2
+ maxPeers = 100
+var (
+ purgeInterval = 42 * time.Hour
+ initialRetryInterval = 42 * time.Millisecond
+ maxIdleInterval = 42 * 1000 * time.Millisecond
+ // maxIdleInterval = 42 * 10 0 * time.Millisecond
+type KadParams struct {
+ // adjustable parameters
+ MaxProx int
+ ProxBinSize int
+ BucketSize int
+ PurgeInterval time.Duration
+ InitialRetryInterval time.Duration
+ MaxIdleInterval time.Duration
+ ConnRetryExp int
+func NewKadParams() *KadParams {
+ return &KadParams{
+ MaxProx: maxProx,
+ ProxBinSize: proxBinSize,
+ BucketSize: bucketSize,
+ PurgeInterval: purgeInterval,
+ InitialRetryInterval: initialRetryInterval,
+ MaxIdleInterval: maxIdleInterval,
+ ConnRetryExp: connRetryExp,
+ }
+// Kademlia is a table of active nodes
+type Kademlia struct {
+ addr Address // immutable baseaddress of the table
+ *KadParams // Kademlia configuration parameters
+ proxLimit int // state, the PO of the first row of the most proximate bin
+ proxSize int // state, the number of peers in the most proximate bin
+ count int // number of active peers (w live connection)
+ buckets [][]Node // the actual bins
+ db *KadDb // kaddb, node record database
+ lock sync.RWMutex // mutex to access buckets
+type Node interface {
+ Addr() Address
+ Url() string
+ LastActive() time.Time
+ Drop()
+// public constructor
+// add is the base address of the table
+// params is KadParams configuration
+func New(addr Address, params *KadParams) *Kademlia {
+ buckets := make([][]Node, params.MaxProx+1)
+ return &Kademlia{
+ addr: addr,
+ KadParams: params,
+ buckets: buckets,
+ db: newKadDb(addr, params),
+ }
+// accessor for KAD base address
+func (self *Kademlia) Addr() Address {
+ return self.addr
+// accessor for KAD active node count
+func (self *Kademlia) Count() int {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ return self.count
+// accessor for KAD active node count
+func (self *Kademlia) DBCount() int {
+ return self.db.count()
+// On is the entry point called when a new nodes is added
+// unsafe in that node is not checked to be already active node (to be called once)
+func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) {
+ glog.V(logger.Warn).Infof("%v", self)
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ index := self.proximityBin(node.Addr())
+ record := self.db.findOrCreate(index, node.Addr(), node.Url())
+ if cb != nil {
+ err = cb(record, node)
+ glog.V(logger.Detail).Infof("cb(%v, %v) ->%v", record, node, err)
+ if err != nil {
+ return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err)
+ }
+ glog.V(logger.Debug).Infof("add node record %v with node %v", record, node)
+ }
+ // insert in kademlia table of active nodes
+ bucket := self.buckets[index]
+ // if bucket is full insertion replaces the worst node
+ // TODO: give priority to peers with active traffic
+ if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
+ self.buckets[index] = append(bucket, node)
+ glog.V(logger.Debug).Infof("add node %v to table", node)
+ self.setProxLimit(index, true)
+ record.node = node
+ self.count++
+ return nil
+ }
+ // always rotate peers
+ idle := self.MaxIdleInterval
+ var pos int
+ var replaced Node
+ for i, p := range bucket {
+ idleInt := time.Since(p.LastActive())
+ if idleInt > idle {
+ idle = idleInt
+ pos = i
+ replaced = p
+ }
+ }
+ if replaced == nil {
+ glog.V(logger.Debug).Infof("all peers wanted, PO%03d bucket full", index)
+ return fmt.Errorf("bucket full")
+ }
+ glog.V(logger.Debug).Infof("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval)
+ replaced.Drop()
+ // actually replace in the row. When off(node) is called, the peer is no longer in the row
+ bucket[pos] = node
+ // there is no change in bucket cardinalities so no prox limit adjustment is needed
+ record.node = node
+ self.count++
+ return nil
+// Off is the called when a node is taken offline (from the protocol main loop exit)
+func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ index := self.proximityBin(node.Addr())
+ bucket := self.buckets[index]
+ for i := 0; i < len(bucket); i++ {
+ if node.Addr() == bucket[i].Addr() {
+ self.buckets[index] = append(bucket[:i], bucket[(i+1):]...)
+ self.setProxLimit(index, false)
+ break
+ }
+ }
+ record := self.db.index[node.Addr()]
+ // callback on remove
+ if cb != nil {
+ cb(record, record.node)
+ }
+ record.node = nil
+ self.count--
+ glog.V(logger.Debug).Infof("remove node %v from table, population now is %v", node, self.count)
+ return
+// proxLimit is dynamically adjusted so that
+// 1) there is no empty buckets in bin < proxLimit and
+// 2) the sum of all items are the minimum possible but higher than ProxBinSize
+// adjust Prox (proxLimit and proxSize after an insertion/removal of nodes)
+// caller holds the lock
+func (self *Kademlia) setProxLimit(r int, on bool) {
+ // if the change is outside the core (PO lower)
+ // and the change does not leave a bucket empty then
+ // no adjustment needed
+ if r < self.proxLimit && len(self.buckets[r]) > 0 {
+ return
+ }
+ // if on=a node was added, then r must be within prox limit so increment cardinality
+ if on {
+ self.proxSize++
+ curr := len(self.buckets[self.proxLimit])
+ // if now core is big enough without the furthest bucket, then contract
+ // this can result in more than one bucket change
+ for self.proxSize >= self.ProxBinSize+curr && curr > 0 {
+ self.proxSize -= curr
+ self.proxLimit++
+ curr = len(self.buckets[self.proxLimit])
+ glog.V(logger.Detail).Infof("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)
+ }
+ return
+ }
+ // otherwise
+ if r >= self.proxLimit {
+ self.proxSize--
+ }
+ // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality
+ for (self.proxSize < self.ProxBinSize || r < self.proxLimit) &&
+ self.proxLimit > 0 {
+ //
+ self.proxLimit--
+ self.proxSize += len(self.buckets[self.proxLimit])
+ glog.V(logger.Detail).Infof("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)
+ }
+returns the list of nodes belonging to the same proximity bin
+as the target. The most proximate bin will be the union of the bins between
+proxLimit and MaxProx.
+func (self *Kademlia) FindClosest(target Address, max int) []Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ r := nodesByDistance{
+ target: target,
+ }
+ po := self.proximityBin(target)
+ index := po
+ step := 1
+ glog.V(logger.Detail).Infof("serving %v nodes at %v (PO%02d)", max, index, po)
+ // if max is set to 0, just want a full bucket, dynamic number
+ min := max
+ // set limit to max
+ limit := max
+ if max == 0 {
+ min = 1
+ limit = maxPeers
+ }
+ var n int
+ for index >= 0 {
+ // add entire bucket
+ for _, p := range self.buckets[index] {
+ r.push(p, limit)
+ n++
+ }
+ // terminate if index reached the bottom or enough peers > min
+ glog.V(logger.Detail).Infof("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po)
+ if n >= min && (step < 0 || max == 0) {
+ break
+ }
+ // reach top most non-empty PO bucket, turn around
+ if index == self.MaxProx {
+ index = po
+ step = -1
+ }
+ index += step
+ }
+ glog.V(logger.Detail).Infof("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po)
+ return r.nodes
+func (self *Kademlia) Suggest() (*NodeRecord, bool, int) {
+ defer self.lock.RUnlock()
+ self.lock.RLock()
+ return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) })
+// adds node records to kaddb (persisted node record db)
+func (self *Kademlia) Add(nrs []*NodeRecord) {
+ self.db.add(nrs, self.proximityBin)
+// nodesByDistance is a list of nodes, ordered by distance to target.
+type nodesByDistance struct {
+ nodes []Node
+ target Address
+func sortedByDistanceTo(target Address, slice []Node) bool {
+ var last Address
+ for i, node := range slice {
+ if i > 0 {
+ if target.ProxCmp(node.Addr(), last) < 0 {
+ return false
+ }
+ }
+ last = node.Addr()
+ }
+ return true
+// push(node, max) adds the given node to the list, keeping the total size
+// below max elements.
+func (h *nodesByDistance) push(node Node, max int) {
+ // returns the firt index ix such that func(i) returns true
+ ix := sort.Search(len(h.nodes), func(i int) bool {
+ return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0
+ })
+ if len(h.nodes) < max {
+ h.nodes = append(h.nodes, node)
+ }
+ if ix < len(h.nodes) {
+ copy(h.nodes[ix+1:], h.nodes[ix:])
+ h.nodes[ix] = node
+ }
+Taking the proximity order relative to a fix point x classifies the points in
+the space (n byte long byte sequences) into bins. Items in each are at
+most half as distant from x as items in the previous bin. Given a sample of
+uniformly distributed items (a hash function over arbitrary sequence) the
+proximity scale maps onto series of subsets with cardinalities on a negative
+exponential scale.
+It also has the property that any two item belonging to the same bin are at
+most half as distant from each other as they are from x.
+If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local
+decisions for graph traversal where the task is to find a route between two
+points. Since in every hop, the finite distance halves, there is
+a guaranteed constant maximum limit on the number of hops needed to reach one
+node from the other.
+func (self *Kademlia) proximityBin(other Address) (ret int) {
+ ret = proximity(self.addr, other)
+ if ret > self.MaxProx {
+ ret = self.MaxProx
+ }
+ return
+// provides keyrange for chunk db iteration
+func (self *Kademlia) KeyRange(other Address) (start, stop Address) {
+ defer self.lock.RUnlock()
+ self.lock.RLock()
+ return KeyRange(self.addr, other, self.proxLimit)
+// save persists kaddb on disk (written to file on path in json format.
+func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error {
+ return self.db.save(path, cb)
+// Load(path) loads the node record database (kaddb) from file on path.
+func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) {
+ return self.db.load(path, cb)
+// kademlia table + kaddb table displayed with ascii
+func (self *Kademlia) String() string {
+ defer self.lock.RUnlock()
+ self.lock.RLock()
+ defer self.db.lock.RUnlock()
+ self.db.lock.RLock()
+ var rows []string
+ rows = append(rows, "=========================================================================")
+ rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6]))
+ rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize))
+ rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize))
+ for i, bucket := range self.buckets {
+ if i == self.proxLimit {
+ rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i))
+ }
+ row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))}
+ var k int
+ c := self.db.cursors[i]
+ for ; k < len(bucket); k++ {
+ p := bucket[(c+k)%len(bucket)]
+ row = append(row, p.Addr().String()[:6])
+ if k == 4 {
+ break
+ }
+ }
+ for ; k < 4; k++ {
+ row = append(row, " ")
+ }
+ row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i]))
+ for j, p := range self.db.Nodes[i] {
+ row = append(row, p.Addr.String()[:6])
+ if j == 3 {
+ break
+ }
+ }
+ rows = append(rows, strings.Join(row, " "))
+ if i == self.MaxProx {
+ }
+ }
+ rows = append(rows, "=========================================================================")
+ return strings.Join(rows, "\n")
diff --git a/swarm/network/kademlia/kademlia_test.go b/swarm/network/kademlia/kademlia_test.go
new file mode 100644
index 000000000..66edfe654
--- /dev/null
+++ b/swarm/network/kademlia/kademlia_test.go
@@ -0,0 +1,392 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package kademlia
+import (
+ "fmt"
+ "math"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+ "testing/quick"
+ "time"
+var (
+ quickrand = rand.New(rand.NewSource(time.Now().Unix()))
+ quickcfgFindClosest = &quick.Config{MaxCount: 50, Rand: quickrand}
+ quickcfgBootStrap = &quick.Config{MaxCount: 100, Rand: quickrand}
+type testNode struct {
+ addr Address
+func (n *testNode) String() string {
+ return fmt.Sprintf("%x", n.addr[:])
+func (n *testNode) Addr() Address {
+ return n.addr
+func (n *testNode) Drop() {
+func (n *testNode) Url() string {
+ return ""
+func (n *testNode) LastActive() time.Time {
+ return time.Now()
+func TestOn(t *testing.T) {
+ addr, ok := gen(Address{}, quickrand).(Address)
+ other, ok := gen(Address{}, quickrand).(Address)
+ if !ok {
+ t.Errorf("oops")
+ }
+ kad := New(addr, NewKadParams())
+ err := kad.On(&testNode{addr: other}, nil)
+ _ = err
+func TestBootstrap(t *testing.T) {
+ test := func(test *bootstrapTest) bool {
+ // for any node kad.le, Target and N
+ params := NewKadParams()
+ params.MaxProx = test.MaxProx
+ params.BucketSize = test.BucketSize
+ params.ProxBinSize = test.BucketSize
+ kad := New(test.Self, params)
+ var err error
+ for p := 0; p < 9; p++ {
+ var nrs []*NodeRecord
+ n := math.Pow(float64(2), float64(7-p))
+ for i := 0; i < int(n); i++ {
+ addr := RandomAddressAt(test.Self, p)
+ nrs = append(nrs, &NodeRecord{
+ Addr: addr,
+ })
+ }
+ kad.Add(nrs)
+ }
+ node := &testNode{test.Self}
+ n := 0
+ for n < 100 {
+ err = kad.On(node, nil)
+ if err != nil {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ record, need, _ := kad.Suggest()
+ if !need {
+ break
+ }
+ n++
+ if record == nil {
+ continue
+ }
+ node = &testNode{record.Addr}
+ }
+ exp := test.BucketSize * (test.MaxProx + 1)
+ if kad.Count() != exp {
+ t.Errorf("incorrect number of peers, expected %d, got %d\n%v", exp, kad.Count(), kad)
+ return false
+ }
+ return true
+ }
+ if err := quick.Check(test, quickcfgBootStrap); err != nil {
+ t.Error(err)
+ }
+func TestFindClosest(t *testing.T) {
+ test := func(test *FindClosestTest) bool {
+ // for any node kad.le, Target and N
+ params := NewKadParams()
+ params.MaxProx = 7
+ kad := New(test.Self, params)
+ var err error
+ for _, node := range test.All {
+ err = kad.On(node, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ }
+ if len(test.All) == 0 || test.N == 0 {
+ return true
+ }
+ nodes := kad.FindClosest(test.Target, test.N)
+ // check that the number of results is min(N, kad.len)
+ wantN := test.N
+ if tlen := kad.Count(); tlen < test.N {
+ wantN = tlen
+ }
+ if len(nodes) != wantN {
+ t.Errorf("wrong number of nodes: got %d, want %d", len(nodes), wantN)
+ return false
+ }
+ if hasDuplicates(nodes) {
+ t.Errorf("result contains duplicates")
+ return false
+ }
+ if !sortedByDistanceTo(test.Target, nodes) {
+ t.Errorf("result is not sorted by distance to target")
+ return false
+ }
+ // check that the result nodes have minimum distance to target.
+ farthestResult := nodes[len(nodes)-1].Addr()
+ for i, b := range kad.buckets {
+ for j, n := range b {
+ if contains(nodes, n.Addr()) {
+ continue // don't run the check below for nodes in result
+ }
+ if test.Target.ProxCmp(n.Addr(), farthestResult) < 0 {
+ _ = i * j
+ t.Errorf("kad.le contains node that is closer to target but it's not in result")
+ return false
+ }
+ }
+ }
+ return true
+ }
+ if err := quick.Check(test, quickcfgFindClosest); err != nil {
+ t.Error(err)
+ }
+type proxTest struct {
+ add bool
+ index int
+ addr Address
+var (
+ addresses []Address
+func TestProxAdjust(t *testing.T) {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ self := gen(Address{}, r).(Address)
+ params := NewKadParams()
+ params.MaxProx = 7
+ kad := New(self, params)
+ var err error
+ for i := 0; i < 100; i++ {
+ a := gen(Address{}, r).(Address)
+ addresses = append(addresses, a)
+ err = kad.On(&testNode{addr: a}, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ if !kad.proxCheck(t) {
+ return
+ }
+ }
+ test := func(test *proxTest) bool {
+ node := &testNode{test.addr}
+ if test.add {
+ kad.On(node, nil)
+ } else {
+ kad.Off(node, nil)
+ }
+ return kad.proxCheck(t)
+ }
+ if err := quick.Check(test, quickcfgFindClosest); err != nil {
+ t.Error(err)
+ }
+func TestSaveLoad(t *testing.T) {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ addresses := gen([]Address{}, r).([]Address)
+ self := RandomAddress()
+ params := NewKadParams()
+ params.MaxProx = 7
+ kad := New(self, params)
+ var err error
+ for _, a := range addresses {
+ err = kad.On(&testNode{addr: a}, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ }
+ nodes := kad.FindClosest(self, 100)
+ path := filepath.Join(os.TempDir(), "bzz-kad-test-save-load.peers")
+ err = kad.Save(path, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("unepected error saving kaddb: %v", err)
+ }
+ kad = New(self, params)
+ err = kad.Load(path, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("unepected error loading kaddb: %v", err)
+ }
+ for _, b := range kad.db.Nodes {
+ for _, node := range b {
+ err = kad.On(&testNode{node.Addr}, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ }
+ }
+ loadednodes := kad.FindClosest(self, 100)
+ for i, node := range loadednodes {
+ if nodes[i].Addr() != node.Addr() {
+ t.Errorf("node mismatch at %d/%d: %v != %v", i, len(nodes), nodes[i].Addr(), node.Addr())
+ }
+ }
+func (self *Kademlia) proxCheck(t *testing.T) bool {
+ var sum int
+ for i, b := range self.buckets {
+ l := len(b)
+ // if we are in the high prox multibucket
+ if i >= self.proxLimit {
+ sum += l
+ } else if l == 0 {
+ t.Errorf("bucket %d empty, yet proxLimit is %d\n%v", len(b), self.proxLimit, self)
+ return false
+ }
+ }
+ // check if merged high prox bucket does not exceed size
+ if sum > 0 {
+ if sum != self.proxSize {
+ t.Errorf("proxSize incorrect, expected %v, got %v", sum, self.proxSize)
+ return false
+ }
+ last := len(self.buckets[self.proxLimit])
+ if last > 0 && sum >= self.ProxBinSize+last {
+ t.Errorf("proxLimit %v incorrect, redundant non-empty bucket %d added to proxBin with %v (target %v)\n%v", self.proxLimit, last, sum-last, self.ProxBinSize, self)
+ return false
+ }
+ if self.proxLimit > 0 && sum < self.ProxBinSize {
+ t.Errorf("proxLimit %v incorrect. proxSize %v is less than target %v, yet there is more peers", self.proxLimit, sum, self.ProxBinSize)
+ return false
+ }
+ }
+ return true
+type bootstrapTest struct {
+ MaxProx int
+ BucketSize int
+ Self Address
+func (*bootstrapTest) Generate(rand *rand.Rand, size int) reflect.Value {
+ t := &bootstrapTest{
+ Self: gen(Address{}, rand).(Address),
+ MaxProx: 5 + rand.Intn(2),
+ BucketSize: rand.Intn(3) + 1,
+ }
+ return reflect.ValueOf(t)
+type FindClosestTest struct {
+ Self Address
+ Target Address
+ All []Node
+ N int
+func (c FindClosestTest) String() string {
+ return fmt.Sprintf("A: %064x\nT: %064x\n(%d)\n", c.Self[:], c.Target[:], c.N)
+func (*FindClosestTest) Generate(rand *rand.Rand, size int) reflect.Value {
+ t := &FindClosestTest{
+ Self: gen(Address{}, rand).(Address),
+ Target: gen(Address{}, rand).(Address),
+ N: rand.Intn(bucketSize),
+ }
+ for _, a := range gen([]Address{}, rand).([]Address) {
+ t.All = append(t.All, &testNode{addr: a})
+ }
+ return reflect.ValueOf(t)
+func (*proxTest) Generate(rand *rand.Rand, size int) reflect.Value {
+ var add bool
+ if rand.Intn(1) == 0 {
+ add = true
+ }
+ var t *proxTest
+ if add {
+ t = &proxTest{
+ addr: gen(Address{}, rand).(Address),
+ add: add,
+ }
+ } else {
+ t = &proxTest{
+ index: rand.Intn(len(addresses)),
+ add: add,
+ }
+ }
+ return reflect.ValueOf(t)
+func hasDuplicates(slice []Node) bool {
+ seen := make(map[Address]bool)
+ for _, node := range slice {
+ if seen[node.Addr()] {
+ return true
+ }
+ seen[node.Addr()] = true
+ }
+ return false
+func contains(nodes []Node, addr Address) bool {
+ for _, n := range nodes {
+ if n.Addr() == addr {
+ return true
+ }
+ }
+ return false
+// gen wraps quick.Value so it's easier to use.
+// it generates a random value of the given value's type.
+func gen(typ interface{}, rand *rand.Rand) interface{} {
+ v, ok := quick.Value(reflect.TypeOf(typ), rand)
+ if !ok {
+ panic(fmt.Sprintf("couldn't generate random value of type %T", typ))
+ }
+ return v.Interface()