diff options
Diffstat (limited to 'swarm/network/kademlia')
-rw-r--r-- | swarm/network/kademlia/address.go | 173 | ||||
-rw-r--r-- | swarm/network/kademlia/address_test.go | 96 | ||||
-rw-r--r-- | swarm/network/kademlia/kaddb.go | 350 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia.go | 454 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia_test.go | 392 |
5 files changed, 0 insertions, 1465 deletions
diff --git a/swarm/network/kademlia/address.go b/swarm/network/kademlia/address.go deleted file mode 100644 index ef82d2e8b..000000000 --- a/swarm/network/kademlia/address.go +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package kademlia - -import ( - "fmt" - "math/rand" - "strings" - - "github.com/ethereum/go-ethereum/common" -) - -type Address common.Hash - -func (a Address) String() string { - return fmt.Sprintf("%x", a[:]) -} - -func (a *Address) MarshalJSON() (out []byte, err error) { - return []byte(`"` + a.String() + `"`), nil -} - -func (a *Address) UnmarshalJSON(value []byte) error { - *a = Address(common.HexToHash(string(value[1 : len(value)-1]))) - return nil -} - -// the string form of the binary representation of an address (only first 8 bits) -func (a Address) Bin() string { - var bs []string - for _, b := range a[:] { - bs = append(bs, fmt.Sprintf("%08b", b)) - } - return strings.Join(bs, "") -} - -/* -Proximity(x, y) returns the proximity order of the MSB distance between x and y - -The distance metric MSB(x, y) of two equal length byte sequences x and y is the -value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. -the binary cast is big endian: most significant bit first (=MSB). - -Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. -It is defined as the reverse rank of the integer part of the base 2 -logarithm of the distance. -It is calculated by counting the number of common leading zeros in the (MSB) -binary representation of the x^y. - -(0 farthest, 255 closest, 256 self) -*/ -func proximity(one, other Address) (ret int) { - for i := 0; i < len(one); i++ { - oxo := one[i] ^ other[i] - for j := 0; j < 8; j++ { - if (oxo>>uint8(7-j))&0x01 != 0 { - return i*8 + j - } - } - } - return len(one) * 8 -} - -// Address.ProxCmp compares the distances a->target and b->target. -// Returns -1 if a is closer to target, 1 if b is closer to target -// and 0 if they are equal. -func (target Address) ProxCmp(a, b Address) int { - for i := range target { - da := a[i] ^ target[i] - db := b[i] ^ target[i] - if da > db { - return 1 - } else if da < db { - return -1 - } - } - return 0 -} - -// randomAddressAt(address, prox) generates a random address -// at proximity order prox relative to address -// if prox is negative a random address is generated -func RandomAddressAt(self Address, prox int) (addr Address) { - addr = self - var pos int - if prox >= 0 { - pos = prox / 8 - trans := prox % 8 - transbytea := byte(0) - for j := 0; j <= trans; j++ { - transbytea |= 1 << uint8(7-j) - } - flipbyte := byte(1 << uint8(7-trans)) - transbyteb := transbytea ^ byte(255) - randbyte := byte(rand.Intn(255)) - addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb - } - for i := pos + 1; i < len(addr); i++ { - addr[i] = byte(rand.Intn(255)) - } - - return -} - -// KeyRange(a0, a1, proxLimit) returns the address inclusive address -// range that contain addresses closer to one than other -func KeyRange(one, other Address, proxLimit int) (start, stop Address) { - prox := proximity(one, other) - if prox >= proxLimit { - prox = proxLimit - } - start = CommonBitsAddrByte(one, other, byte(0x00), prox) - stop = CommonBitsAddrByte(one, other, byte(0xff), prox) - return -} - -func CommonBitsAddrF(self, other Address, f func() byte, p int) (addr Address) { - prox := proximity(self, other) - var pos int - if p <= prox { - prox = p - } - pos = prox / 8 - addr = self - trans := byte(prox % 8) - var transbytea byte - if p > prox { - transbytea = byte(0x7f) - } else { - transbytea = byte(0xff) - } - transbytea >>= trans - transbyteb := transbytea ^ byte(0xff) - addrpos := addr[pos] - addrpos &= transbyteb - if p > prox { - addrpos ^= byte(0x80 >> trans) - } - addrpos |= transbytea & f() - addr[pos] = addrpos - for i := pos + 1; i < len(addr); i++ { - addr[i] = f() - } - - return -} - -func CommonBitsAddr(self, other Address, prox int) (addr Address) { - return CommonBitsAddrF(self, other, func() byte { return byte(rand.Intn(255)) }, prox) -} - -func CommonBitsAddrByte(self, other Address, b byte, prox int) (addr Address) { - return CommonBitsAddrF(self, other, func() byte { return b }, prox) -} - -// randomAddressAt() generates a random address -func RandomAddress() Address { - return RandomAddressAt(Address{}, -1) -} diff --git a/swarm/network/kademlia/address_test.go b/swarm/network/kademlia/address_test.go deleted file mode 100644 index c062c8eaf..000000000 --- a/swarm/network/kademlia/address_test.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package kademlia - -import ( - "math/rand" - "reflect" - "testing" - - "github.com/ethereum/go-ethereum/common" -) - -func (Address) Generate(rand *rand.Rand, size int) reflect.Value { - var id Address - for i := 0; i < len(id); i++ { - id[i] = byte(uint8(rand.Intn(255))) - } - return reflect.ValueOf(id) -} - -func TestCommonBitsAddrF(t *testing.T) { - a := Address(common.HexToHash("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) - b := Address(common.HexToHash("0x8123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) - c := Address(common.HexToHash("0x4123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) - d := Address(common.HexToHash("0x0023456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) - e := Address(common.HexToHash("0x01A3456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) - ab := CommonBitsAddrF(a, b, func() byte { return byte(0x00) }, 10) - expab := Address(common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000")) - - if ab != expab { - t.Fatalf("%v != %v", ab, expab) - } - ac := CommonBitsAddrF(a, c, func() byte { return byte(0x00) }, 10) - expac := Address(common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000")) - - if ac != expac { - t.Fatalf("%v != %v", ac, expac) - } - ad := CommonBitsAddrF(a, d, func() byte { return byte(0x00) }, 10) - expad := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")) - - if ad != expad { - t.Fatalf("%v != %v", ad, expad) - } - ae := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 10) - expae := Address(common.HexToHash("0x0180000000000000000000000000000000000000000000000000000000000000")) - - if ae != expae { - t.Fatalf("%v != %v", ae, expae) - } - acf := CommonBitsAddrF(a, c, func() byte { return byte(0xff) }, 10) - expacf := Address(common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) - - if acf != expacf { - t.Fatalf("%v != %v", acf, expacf) - } - aeo := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 2) - expaeo := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")) - - if aeo != expaeo { - t.Fatalf("%v != %v", aeo, expaeo) - } - aep := CommonBitsAddrF(a, e, func() byte { return byte(0xff) }, 2) - expaep := Address(common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) - - if aep != expaep { - t.Fatalf("%v != %v", aep, expaep) - } - -} - -func TestRandomAddressAt(t *testing.T) { - var a Address - for i := 0; i < 100; i++ { - a = RandomAddress() - prox := rand.Intn(255) - b := RandomAddressAt(a, prox) - if proximity(a, b) != prox { - t.Fatalf("incorrect address prox(%v, %v) == %v (expected %v)", a, b, proximity(a, b), prox) - } - } -} diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go deleted file mode 100644 index b37ced5ba..000000000 --- a/swarm/network/kademlia/kaddb.go +++ /dev/null @@ -1,350 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package kademlia - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" -) - -type NodeData interface { - json.Marshaler - json.Unmarshaler -} - -// allow inactive peers under -type NodeRecord struct { - Addr Address // address of node - Url string // Url, used to connect to node - After time.Time // next call after time - Seen time.Time // last connected at time - Meta *json.RawMessage // arbitrary metadata saved for a peer - - node Node -} - -func (self *NodeRecord) setSeen() { - t := time.Now() - self.Seen = t - self.After = t -} - -func (self *NodeRecord) String() string { - return fmt.Sprintf("<%v>", self.Addr) -} - -// persisted node record database () -type KadDb struct { - Address Address - Nodes [][]*NodeRecord - index map[Address]*NodeRecord - cursors []int - lock sync.RWMutex - purgeInterval time.Duration - initialRetryInterval time.Duration - connRetryExp int -} - -func newKadDb(addr Address, params *KadParams) *KadDb { - return &KadDb{ - Address: addr, - Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load - cursors: make([]int, params.MaxProx+1), - index: make(map[Address]*NodeRecord), - purgeInterval: params.PurgeInterval, - initialRetryInterval: params.InitialRetryInterval, - connRetryExp: params.ConnRetryExp, - } -} - -func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord { - defer self.lock.Unlock() - self.lock.Lock() - - record, found := self.index[a] - if !found { - record = &NodeRecord{ - Addr: a, - Url: url, - } - log.Info(fmt.Sprintf("add new record %v to kaddb", record)) - // insert in kaddb - self.index[a] = record - self.Nodes[index] = append(self.Nodes[index], record) - } else { - log.Info(fmt.Sprintf("found record %v in kaddb", record)) - } - // update last seen time - record.setSeen() - // update with url in case IP/port changes - record.Url = url - return record -} - -// add adds node records to kaddb (persisted node record db) -func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) { - defer self.lock.Unlock() - self.lock.Lock() - var n int - var nodes []*NodeRecord - for _, node := range nrs { - _, found := self.index[node.Addr] - if !found && node.Addr != self.Address { - node.setSeen() - self.index[node.Addr] = node - index := proximityBin(node.Addr) - dbcursor := self.cursors[index] - nodes = self.Nodes[index] - // this is inefficient for allocation, need to just append then shift - newnodes := make([]*NodeRecord, len(nodes)+1) - copy(newnodes[:], nodes[:dbcursor]) - newnodes[dbcursor] = node - copy(newnodes[dbcursor+1:], nodes[dbcursor:]) - log.Trace(fmt.Sprintf("new nodes: %v, nodes: %v", newnodes, nodes)) - self.Nodes[index] = newnodes - n++ - } - } - if n > 0 { - log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs))) - } -} - -/* -next return one node record with the highest priority for desired -connection. -This is used to pick candidates for live nodes that are most wanted for -a higly connected low centrality network structure for Swarm which best suits -for a Kademlia-style routing. - -* Starting as naive node with empty db, this implements Kademlia bootstrapping -* As a mature node, it fills short lines. All on demand. - -The candidate is chosen using the following strategy: -We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds. -On each round we proceed from the low to high proximity order buckets. -If the number of active nodes (=connected peers) is < rounds, then start looking -for a known candidate. To determine if there is a candidate to recommend the -kaddb node record database row corresponding to the bucket is checked. - -If the row cursor is on position i, the ith element in the row is chosen. -If the record is scheduled not to be retried before NOW, the next element is taken. -If the record is scheduled to be retried, it is set as checked, scheduled for -checking and is returned. The time of the next check is in X (duration) such that -X = ConnRetryExp * delta where delta is the time past since the last check and -ConnRetryExp is constant obsoletion factor. (Note that when node records are added -from peer messages, they are marked as checked and placed at the cursor, ie. -given priority over older entries). Entries which were checked more than -purgeInterval ago are deleted from the kaddb row. If no candidate is found after -a full round of checking the next bucket up is considered. If no candidate is -found when we reach the maximum-proximity bucket, the next round starts. - -node record a is more favoured to b a > b iff a is a passive node (record of -offline past peer) -|proxBin(a)| < |proxBin(b)| -|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|) -|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b)) - - -The second argument returned names the first missing slot found -*/ -func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) { - // return nil, proxLimit indicates that all buckets are filled - defer self.lock.Unlock() - self.lock.Lock() - - var interval time.Duration - var found bool - var purge []bool - var delta time.Duration - var cursor int - var count int - var after time.Time - - // iterate over columns maximum bucketsize times - for rounds := 1; rounds <= maxBinSize; rounds++ { - ROUND: - // iterate over rows from PO 0 upto MaxProx - for po, dbrow := range self.Nodes { - // if row has rounds connected peers, then take the next - if binSize(po) >= rounds { - continue ROUND - } - if !need { - // set proxlimit to the PO where the first missing slot is found - proxLimit = po - need = true - } - purge = make([]bool, len(dbrow)) - - // there is a missing slot - finding a node to connect to - // select a node record from the relavant kaddb row (of identical prox order) - ROW: - for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) { - count++ - node = dbrow[cursor] - - // skip already connected nodes - if node.node != nil { - log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow))) - continue ROW - } - - // if node is scheduled to connect - if node.After.After(time.Now()) { - log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)) - continue ROW - } - - delta = time.Since(node.Seen) - if delta < self.initialRetryInterval { - delta = self.initialRetryInterval - } - if delta > self.purgeInterval { - // remove node - purge[cursor] = true - log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen)) - continue ROW - } - - log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)) - - // scheduling next check - interval = delta * time.Duration(self.connRetryExp) - after = time.Now().Add(interval) - - log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval)) - node.After = after - found = true - } // ROW - self.cursors[po] = cursor - self.delete(po, purge) - if found { - return node, need, proxLimit - } - } // ROUND - } // ROUNDS - - return nil, need, proxLimit -} - -// deletes the noderecords of a kaddb row corresponding to the indexes -// caller must hold the dblock -// the call is unsafe, no index checks -func (self *KadDb) delete(row int, purge []bool) { - var nodes []*NodeRecord - dbrow := self.Nodes[row] - for i, del := range purge { - if i == self.cursors[row] { - //reset cursor - self.cursors[row] = len(nodes) - } - // delete the entry to be purged - if del { - delete(self.index, dbrow[i].Addr) - continue - } - // otherwise append to new list - nodes = append(nodes, dbrow[i]) - } - self.Nodes[row] = nodes -} - -// save persists kaddb on disk (written to file on path in json format. -func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error { - defer self.lock.Unlock() - self.lock.Lock() - - var n int - - for _, b := range self.Nodes { - for _, node := range b { - n++ - node.After = time.Now() - node.Seen = time.Now() - if cb != nil { - cb(node, node.node) - } - } - } - - data, err := json.MarshalIndent(self, "", " ") - if err != nil { - return err - } - err = ioutil.WriteFile(path, data, os.ModePerm) - if err != nil { - log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: %v", n, path, err)) - } else { - log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path)) - } - return err -} - -// Load(path) loads the node record database (kaddb) from file on path. -func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) { - defer self.lock.Unlock() - self.lock.Lock() - - var data []byte - data, err = ioutil.ReadFile(path) - if err != nil { - return - } - - err = json.Unmarshal(data, self) - if err != nil { - return - } - var n int - var purge []bool - for po, b := range self.Nodes { - purge = make([]bool, len(b)) - ROW: - for i, node := range b { - if cb != nil { - err = cb(node, node.node) - if err != nil { - purge[i] = true - continue ROW - } - } - n++ - if node.After.IsZero() { - node.After = time.Now() - } - self.index[node.Addr] = node - } - self.delete(po, purge) - } - log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path)) - - return -} - -// accessor for KAD offline db count -func (self *KadDb) count() int { - defer self.lock.Unlock() - self.lock.Lock() - return len(self.index) -} diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go deleted file mode 100644 index b5999b52d..000000000 --- a/swarm/network/kademlia/kademlia.go +++ /dev/null @@ -1,454 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package kademlia - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" -) - -//metrics variables -//For metrics, we want to count how many times peers are added/removed -//at a certain index. Thus we do that with an array of counters with -//entry for each index -var ( - bucketAddIndexCount []metrics.Counter - bucketRmIndexCount []metrics.Counter -) - -const ( - bucketSize = 4 - proxBinSize = 2 - maxProx = 8 - connRetryExp = 2 - maxPeers = 100 -) - -var ( - purgeInterval = 42 * time.Hour - initialRetryInterval = 42 * time.Millisecond - maxIdleInterval = 42 * 1000 * time.Millisecond - // maxIdleInterval = 42 * 10 0 * time.Millisecond -) - -type KadParams struct { - // adjustable parameters - MaxProx int - ProxBinSize int - BucketSize int - PurgeInterval time.Duration - InitialRetryInterval time.Duration - MaxIdleInterval time.Duration - ConnRetryExp int -} - -func NewDefaultKadParams() *KadParams { - return &KadParams{ - MaxProx: maxProx, - ProxBinSize: proxBinSize, - BucketSize: bucketSize, - PurgeInterval: purgeInterval, - InitialRetryInterval: initialRetryInterval, - MaxIdleInterval: maxIdleInterval, - ConnRetryExp: connRetryExp, - } -} - -// Kademlia is a table of active nodes -type Kademlia struct { - addr Address // immutable baseaddress of the table - *KadParams // Kademlia configuration parameters - proxLimit int // state, the PO of the first row of the most proximate bin - proxSize int // state, the number of peers in the most proximate bin - count int // number of active peers (w live connection) - buckets [][]Node // the actual bins - db *KadDb // kaddb, node record database - lock sync.RWMutex // mutex to access buckets -} - -type Node interface { - Addr() Address - Url() string - LastActive() time.Time - Drop() -} - -// public constructor -// add is the base address of the table -// params is KadParams configuration -func New(addr Address, params *KadParams) *Kademlia { - buckets := make([][]Node, params.MaxProx+1) - kad := &Kademlia{ - addr: addr, - KadParams: params, - buckets: buckets, - db: newKadDb(addr, params), - } - kad.initMetricsVariables() - return kad -} - -// accessor for KAD base address -func (self *Kademlia) Addr() Address { - return self.addr -} - -// accessor for KAD active node count -func (self *Kademlia) Count() int { - defer self.lock.Unlock() - self.lock.Lock() - return self.count -} - -// accessor for KAD active node count -func (self *Kademlia) DBCount() int { - return self.db.count() -} - -// On is the entry point called when a new nodes is added -// unsafe in that node is not checked to be already active node (to be called once) -func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) { - log.Debug(fmt.Sprintf("%v", self)) - defer self.lock.Unlock() - self.lock.Lock() - - index := self.proximityBin(node.Addr()) - record := self.db.findOrCreate(index, node.Addr(), node.Url()) - - if cb != nil { - err = cb(record, node) - log.Trace(fmt.Sprintf("cb(%v, %v) ->%v", record, node, err)) - if err != nil { - return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err) - } - log.Debug(fmt.Sprintf("add node record %v with node %v", record, node)) - } - - // insert in kademlia table of active nodes - bucket := self.buckets[index] - // if bucket is full insertion replaces the worst node - // TODO: give priority to peers with active traffic - if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation - self.buckets[index] = append(bucket, node) - bucketAddIndexCount[index].Inc(1) - log.Debug(fmt.Sprintf("add node %v to table", node)) - self.setProxLimit(index, true) - record.node = node - self.count++ - return nil - } - - // always rotate peers - idle := self.MaxIdleInterval - var pos int - var replaced Node - for i, p := range bucket { - idleInt := time.Since(p.LastActive()) - if idleInt > idle { - idle = idleInt - pos = i - replaced = p - } - } - if replaced == nil { - log.Debug(fmt.Sprintf("all peers wanted, PO%03d bucket full", index)) - return fmt.Errorf("bucket full") - } - log.Debug(fmt.Sprintf("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval)) - replaced.Drop() - // actually replace in the row. When off(node) is called, the peer is no longer in the row - bucket[pos] = node - // there is no change in bucket cardinalities so no prox limit adjustment is needed - record.node = node - self.count++ - return nil - -} - -// Off is the called when a node is taken offline (from the protocol main loop exit) -func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) { - self.lock.Lock() - defer self.lock.Unlock() - - index := self.proximityBin(node.Addr()) - bucketRmIndexCount[index].Inc(1) - bucket := self.buckets[index] - for i := 0; i < len(bucket); i++ { - if node.Addr() == bucket[i].Addr() { - self.buckets[index] = append(bucket[:i], bucket[(i+1):]...) - self.setProxLimit(index, false) - break - } - } - - record := self.db.index[node.Addr()] - // callback on remove - if cb != nil { - cb(record, record.node) - } - record.node = nil - self.count-- - log.Debug(fmt.Sprintf("remove node %v from table, population now is %v", node, self.count)) - - return -} - -// proxLimit is dynamically adjusted so that -// 1) there is no empty buckets in bin < proxLimit and -// 2) the sum of all items are the minimum possible but higher than ProxBinSize -// adjust Prox (proxLimit and proxSize after an insertion/removal of nodes) -// caller holds the lock -func (self *Kademlia) setProxLimit(r int, on bool) { - // if the change is outside the core (PO lower) - // and the change does not leave a bucket empty then - // no adjustment needed - if r < self.proxLimit && len(self.buckets[r]) > 0 { - return - } - // if on=a node was added, then r must be within prox limit so increment cardinality - if on { - self.proxSize++ - curr := len(self.buckets[self.proxLimit]) - // if now core is big enough without the furthest bucket, then contract - // this can result in more than one bucket change - for self.proxSize >= self.ProxBinSize+curr && curr > 0 { - self.proxSize -= curr - self.proxLimit++ - curr = len(self.buckets[self.proxLimit]) - - log.Trace(fmt.Sprintf("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)) - } - return - } - // otherwise - if r >= self.proxLimit { - self.proxSize-- - } - // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality - for (self.proxSize < self.ProxBinSize || r < self.proxLimit) && - self.proxLimit > 0 { - // - self.proxLimit-- - self.proxSize += len(self.buckets[self.proxLimit]) - log.Trace(fmt.Sprintf("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)) - } -} - -/* -returns the list of nodes belonging to the same proximity bin -as the target. The most proximate bin will be the union of the bins between -proxLimit and MaxProx. -*/ -func (self *Kademlia) FindClosest(target Address, max int) []Node { - self.lock.Lock() - defer self.lock.Unlock() - - r := nodesByDistance{ - target: target, - } - - po := self.proximityBin(target) - index := po - step := 1 - log.Trace(fmt.Sprintf("serving %v nodes at %v (PO%02d)", max, index, po)) - - // if max is set to 0, just want a full bucket, dynamic number - min := max - // set limit to max - limit := max - if max == 0 { - min = 1 - limit = maxPeers - } - - var n int - for index >= 0 { - // add entire bucket - for _, p := range self.buckets[index] { - r.push(p, limit) - n++ - } - // terminate if index reached the bottom or enough peers > min - log.Trace(fmt.Sprintf("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po)) - if n >= min && (step < 0 || max == 0) { - break - } - // reach top most non-empty PO bucket, turn around - if index == self.MaxProx { - index = po - step = -1 - } - index += step - } - log.Trace(fmt.Sprintf("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po)) - return r.nodes -} - -func (self *Kademlia) Suggest() (*NodeRecord, bool, int) { - defer self.lock.RUnlock() - self.lock.RLock() - return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) }) -} - -// adds node records to kaddb (persisted node record db) -func (self *Kademlia) Add(nrs []*NodeRecord) { - self.db.add(nrs, self.proximityBin) -} - -// nodesByDistance is a list of nodes, ordered by distance to target. -type nodesByDistance struct { - nodes []Node - target Address -} - -func sortedByDistanceTo(target Address, slice []Node) bool { - var last Address - for i, node := range slice { - if i > 0 { - if target.ProxCmp(node.Addr(), last) < 0 { - return false - } - } - last = node.Addr() - } - return true -} - -// push(node, max) adds the given node to the list, keeping the total size -// below max elements. -func (h *nodesByDistance) push(node Node, max int) { - // returns the firt index ix such that func(i) returns true - ix := sort.Search(len(h.nodes), func(i int) bool { - return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0 - }) - - if len(h.nodes) < max { - h.nodes = append(h.nodes, node) - } - if ix < len(h.nodes) { - copy(h.nodes[ix+1:], h.nodes[ix:]) - h.nodes[ix] = node - } -} - -/* -Taking the proximity order relative to a fix point x classifies the points in -the space (n byte long byte sequences) into bins. Items in each are at -most half as distant from x as items in the previous bin. Given a sample of -uniformly distributed items (a hash function over arbitrary sequence) the -proximity scale maps onto series of subsets with cardinalities on a negative -exponential scale. - -It also has the property that any two item belonging to the same bin are at -most half as distant from each other as they are from x. - -If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local -decisions for graph traversal where the task is to find a route between two -points. Since in every hop, the finite distance halves, there is -a guaranteed constant maximum limit on the number of hops needed to reach one -node from the other. -*/ - -func (self *Kademlia) proximityBin(other Address) (ret int) { - ret = proximity(self.addr, other) - if ret > self.MaxProx { - ret = self.MaxProx - } - return -} - -// provides keyrange for chunk db iteration -func (self *Kademlia) KeyRange(other Address) (start, stop Address) { - defer self.lock.RUnlock() - self.lock.RLock() - return KeyRange(self.addr, other, self.proxLimit) -} - -// save persists kaddb on disk (written to file on path in json format. -func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error { - return self.db.save(path, cb) -} - -// Load(path) loads the node record database (kaddb) from file on path. -func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) { - return self.db.load(path, cb) -} - -// kademlia table + kaddb table displayed with ascii -func (self *Kademlia) String() string { - defer self.lock.RUnlock() - self.lock.RLock() - defer self.db.lock.RUnlock() - self.db.lock.RLock() - - var rows []string - rows = append(rows, "=========================================================================") - rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6])) - rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize)) - rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize)) - - for i, bucket := range self.buckets { - - if i == self.proxLimit { - rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i)) - } - row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))} - var k int - c := self.db.cursors[i] - for ; k < len(bucket); k++ { - p := bucket[(c+k)%len(bucket)] - row = append(row, p.Addr().String()[:6]) - if k == 4 { - break - } - } - for ; k < 4; k++ { - row = append(row, " ") - } - row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i])) - - for j, p := range self.db.Nodes[i] { - row = append(row, p.Addr.String()[:6]) - if j == 3 { - break - } - } - rows = append(rows, strings.Join(row, " ")) - if i == self.MaxProx { - } - } - rows = append(rows, "=========================================================================") - return strings.Join(rows, "\n") -} - -//We have to build up the array of counters for each index -func (self *Kademlia) initMetricsVariables() { - //create the arrays - bucketAddIndexCount = make([]metrics.Counter, self.MaxProx+1) - bucketRmIndexCount = make([]metrics.Counter, self.MaxProx+1) - //at each index create a metrics counter - for i := 0; i < (self.KadParams.MaxProx + 1); i++ { - bucketAddIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.add.%d.index", i), nil) - bucketRmIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.rm.%d.index", i), nil) - } -} diff --git a/swarm/network/kademlia/kademlia_test.go b/swarm/network/kademlia/kademlia_test.go deleted file mode 100644 index 88858908a..000000000 --- a/swarm/network/kademlia/kademlia_test.go +++ /dev/null @@ -1,392 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package kademlia - -import ( - "fmt" - "math" - "math/rand" - "os" - "path/filepath" - "reflect" - "testing" - "testing/quick" - "time" -) - -var ( - quickrand = rand.New(rand.NewSource(time.Now().Unix())) - quickcfgFindClosest = &quick.Config{MaxCount: 50, Rand: quickrand} - quickcfgBootStrap = &quick.Config{MaxCount: 100, Rand: quickrand} -) - -type testNode struct { - addr Address -} - -func (n *testNode) String() string { - return fmt.Sprintf("%x", n.addr[:]) -} - -func (n *testNode) Addr() Address { - return n.addr -} - -func (n *testNode) Drop() { -} - -func (n *testNode) Url() string { - return "" -} - -func (n *testNode) LastActive() time.Time { - return time.Now() -} - -func TestOn(t *testing.T) { - addr, ok1 := gen(Address{}, quickrand).(Address) - other, ok2 := gen(Address{}, quickrand).(Address) - if !ok1 || !ok2 { - t.Errorf("oops") - } - kad := New(addr, NewDefaultKadParams()) - err := kad.On(&testNode{addr: other}, nil) - _ = err -} - -func TestBootstrap(t *testing.T) { - - test := func(test *bootstrapTest) bool { - // for any node kad.le, Target and N - params := NewDefaultKadParams() - params.MaxProx = test.MaxProx - params.BucketSize = test.BucketSize - params.ProxBinSize = test.BucketSize - kad := New(test.Self, params) - var err error - - for p := 0; p < 9; p++ { - var nrs []*NodeRecord - n := math.Pow(float64(2), float64(7-p)) - for i := 0; i < int(n); i++ { - addr := RandomAddressAt(test.Self, p) - nrs = append(nrs, &NodeRecord{ - Addr: addr, - }) - } - kad.Add(nrs) - } - - node := &testNode{test.Self} - - n := 0 - for n < 100 { - err = kad.On(node, nil) - if err != nil { - t.Fatalf("backend not accepting node: %v", err) - } - - record, need, _ := kad.Suggest() - if !need { - break - } - n++ - if record == nil { - continue - } - node = &testNode{record.Addr} - } - exp := test.BucketSize * (test.MaxProx + 1) - if kad.Count() != exp { - t.Errorf("incorrect number of peers, expected %d, got %d\n%v", exp, kad.Count(), kad) - return false - } - return true - } - if err := quick.Check(test, quickcfgBootStrap); err != nil { - t.Error(err) - } - -} - -func TestFindClosest(t *testing.T) { - - test := func(test *FindClosestTest) bool { - // for any node kad.le, Target and N - params := NewDefaultKadParams() - params.MaxProx = 7 - kad := New(test.Self, params) - var err error - for _, node := range test.All { - err = kad.On(node, nil) - if err != nil && err.Error() != "bucket full" { - t.Fatalf("backend not accepting node: %v", err) - } - } - - if len(test.All) == 0 || test.N == 0 { - return true - } - nodes := kad.FindClosest(test.Target, test.N) - - // check that the number of results is min(N, kad.len) - wantN := test.N - if tlen := kad.Count(); tlen < test.N { - wantN = tlen - } - - if len(nodes) != wantN { - t.Errorf("wrong number of nodes: got %d, want %d", len(nodes), wantN) - return false - } - - if hasDuplicates(nodes) { - t.Errorf("result contains duplicates") - return false - } - - if !sortedByDistanceTo(test.Target, nodes) { - t.Errorf("result is not sorted by distance to target") - return false - } - - // check that the result nodes have minimum distance to target. - farthestResult := nodes[len(nodes)-1].Addr() - for i, b := range kad.buckets { - for j, n := range b { - if contains(nodes, n.Addr()) { - continue // don't run the check below for nodes in result - } - if test.Target.ProxCmp(n.Addr(), farthestResult) < 0 { - _ = i * j - t.Errorf("kad.le contains node that is closer to target but it's not in result") - return false - } - } - } - return true - } - if err := quick.Check(test, quickcfgFindClosest); err != nil { - t.Error(err) - } -} - -type proxTest struct { - add bool - index int - addr Address -} - -var ( - addresses []Address -) - -func TestProxAdjust(t *testing.T) { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - self := gen(Address{}, r).(Address) - params := NewDefaultKadParams() - params.MaxProx = 7 - kad := New(self, params) - - var err error - for i := 0; i < 100; i++ { - a := gen(Address{}, r).(Address) - addresses = append(addresses, a) - err = kad.On(&testNode{addr: a}, nil) - if err != nil && err.Error() != "bucket full" { - t.Fatalf("backend not accepting node: %v", err) - } - if !kad.proxCheck(t) { - return - } - } - test := func(test *proxTest) bool { - node := &testNode{test.addr} - if test.add { - kad.On(node, nil) - } else { - kad.Off(node, nil) - } - return kad.proxCheck(t) - } - if err := quick.Check(test, quickcfgFindClosest); err != nil { - t.Error(err) - } -} - -func TestSaveLoad(t *testing.T) { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - addresses := gen([]Address{}, r).([]Address) - self := RandomAddress() - params := NewDefaultKadParams() - params.MaxProx = 7 - kad := New(self, params) - - var err error - - for _, a := range addresses { - err = kad.On(&testNode{addr: a}, nil) - if err != nil && err.Error() != "bucket full" { - t.Fatalf("backend not accepting node: %v", err) - } - } - nodes := kad.FindClosest(self, 100) - - path := filepath.Join(os.TempDir(), "bzz-kad-test-save-load.peers") - err = kad.Save(path, nil) - if err != nil && err.Error() != "bucket full" { - t.Fatalf("unepected error saving kaddb: %v", err) - } - kad = New(self, params) - err = kad.Load(path, nil) - if err != nil && err.Error() != "bucket full" { - t.Fatalf("unepected error loading kaddb: %v", err) - } - for _, b := range kad.db.Nodes { - for _, node := range b { - err = kad.On(&testNode{node.Addr}, nil) - if err != nil && err.Error() != "bucket full" { - t.Fatalf("backend not accepting node: %v", err) - } - } - } - loadednodes := kad.FindClosest(self, 100) - for i, node := range loadednodes { - if nodes[i].Addr() != node.Addr() { - t.Errorf("node mismatch at %d/%d: %v != %v", i, len(nodes), nodes[i].Addr(), node.Addr()) - } - } -} - -func (self *Kademlia) proxCheck(t *testing.T) bool { - var sum int - for i, b := range self.buckets { - l := len(b) - // if we are in the high prox multibucket - if i >= self.proxLimit { - sum += l - } else if l == 0 { - t.Errorf("bucket %d empty, yet proxLimit is %d\n%v", len(b), self.proxLimit, self) - return false - } - } - // check if merged high prox bucket does not exceed size - if sum > 0 { - if sum != self.proxSize { - t.Errorf("proxSize incorrect, expected %v, got %v", sum, self.proxSize) - return false - } - last := len(self.buckets[self.proxLimit]) - if last > 0 && sum >= self.ProxBinSize+last { - t.Errorf("proxLimit %v incorrect, redundant non-empty bucket %d added to proxBin with %v (target %v)\n%v", self.proxLimit, last, sum-last, self.ProxBinSize, self) - return false - } - if self.proxLimit > 0 && sum < self.ProxBinSize { - t.Errorf("proxLimit %v incorrect. proxSize %v is less than target %v, yet there is more peers", self.proxLimit, sum, self.ProxBinSize) - return false - } - } - return true -} - -type bootstrapTest struct { - MaxProx int - BucketSize int - Self Address -} - -func (*bootstrapTest) Generate(rand *rand.Rand, size int) reflect.Value { - t := &bootstrapTest{ - Self: gen(Address{}, rand).(Address), - MaxProx: 5 + rand.Intn(2), - BucketSize: rand.Intn(3) + 1, - } - return reflect.ValueOf(t) -} - -type FindClosestTest struct { - Self Address - Target Address - All []Node - N int -} - -func (c FindClosestTest) String() string { - return fmt.Sprintf("A: %064x\nT: %064x\n(%d)\n", c.Self[:], c.Target[:], c.N) -} - -func (*FindClosestTest) Generate(rand *rand.Rand, size int) reflect.Value { - t := &FindClosestTest{ - Self: gen(Address{}, rand).(Address), - Target: gen(Address{}, rand).(Address), - N: rand.Intn(bucketSize), - } - for _, a := range gen([]Address{}, rand).([]Address) { - t.All = append(t.All, &testNode{addr: a}) - } - return reflect.ValueOf(t) -} - -func (*proxTest) Generate(rand *rand.Rand, size int) reflect.Value { - var add bool - if rand.Intn(1) == 0 { - add = true - } - var t *proxTest - if add { - t = &proxTest{ - addr: gen(Address{}, rand).(Address), - add: add, - } - } else { - t = &proxTest{ - index: rand.Intn(len(addresses)), - add: add, - } - } - return reflect.ValueOf(t) -} - -func hasDuplicates(slice []Node) bool { - seen := make(map[Address]bool) - for _, node := range slice { - if seen[node.Addr()] { - return true - } - seen[node.Addr()] = true - } - return false -} - -func contains(nodes []Node, addr Address) bool { - for _, n := range nodes { - if n.Addr() == addr { - return true - } - } - return false -} - -// gen wraps quick.Value so it's easier to use. -// it generates a random value of the given value's type. -func gen(typ interface{}, rand *rand.Rand) interface{} { - v, ok := quick.Value(reflect.TypeOf(typ), rand) - if !ok { - panic(fmt.Sprintf("couldn't generate random value of type %T", typ)) - } - return v.Interface() -} |