diff options
author | ΞTHΞЯSPHΞЯΞ <{viktor.tron,nagydani,zsfelfoldi}@gmail.com> | 2016-08-30 03:18:00 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-08-31 22:19:40 +0800 |
commit | 4d300e4dece56535f56ccc32330340ce89e42581 (patch) | |
tree | 135838bfae03437eb2a50c6d66de4d66b20c3220 /swarm/network/kademlia | |
parent | 1f58b2d084b65eaec9aa2c2ecb1d3aae50d894b3 (diff) | |
download | go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.gz go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.bz2 go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.lz go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.xz go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.zst go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.zip |
swarm: plan bee for content storage and distribution on web3
This change imports the Swarm protocol codebase. Compared to the 'swarm'
branch, a few mostly cosmetic changes had to be made:
* The various redundant log message prefixes are gone.
* All files now have LGPLv3 license headers.
* Minor code changes were needed to please go vet and make the tests
pass on Windows.
* Further changes were required to adapt to the go-ethereum develop
branch and its new Go APIs.
Some code has not (yet) been brought over:
* swarm/cmd/bzzhash: will reappear as cmd/bzzhash later
* swarm/cmd/bzzup.sh: will be reimplemented in cmd/bzzup
* swarm/cmd/makegenesis: will reappear somehow
* swarm/examples/album: will move to a separate repository
* swarm/examples/filemanager: ditto
* swarm/examples/files: will not be merged
* swarm/test/*: will not be merged
* swarm/services/swear: will reappear as contracts/swear when needed
Diffstat (limited to 'swarm/network/kademlia')
-rw-r--r-- | swarm/network/kademlia/address.go | 173 | ||||
-rw-r--r-- | swarm/network/kademlia/address_test.go | 96 | ||||
-rw-r--r-- | swarm/network/kademlia/kaddb.go | 351 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia.go | 429 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia_test.go | 392 |
5 files changed, 1441 insertions, 0 deletions
diff --git a/swarm/network/kademlia/address.go b/swarm/network/kademlia/address.go new file mode 100644 index 000000000..16c5ce532 --- /dev/null +++ b/swarm/network/kademlia/address.go @@ -0,0 +1,173 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "fmt" + "math/rand" + "strings" + + "github.com/ethereum/go-ethereum/common" +) + +type Address common.Hash + +func (a Address) String() string { + return fmt.Sprintf("%x", a[:]) +} + +func (a *Address) MarshalJSON() (out []byte, err error) { + return []byte(`"` + a.String() + `"`), nil +} + +func (a *Address) UnmarshalJSON(value []byte) error { + *a = Address(common.HexToHash(string(value[1 : len(value)-1]))) + return nil +} + +// the string form of the binary representation of an address (only first 8 bits) +func (a Address) Bin() string { + var bs []string + for _, b := range a[:] { + bs = append(bs, fmt.Sprintf("%08b", b)) + } + return strings.Join(bs, "") +} + +/* +Proximity(x, y) returns the proximity order of the MSB distance between x and y + +The distance metric MSB(x, y) of two equal length byte sequences x an y is the +value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. +the binary cast is big endian: most significant bit first (=MSB). + +Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. +It is defined as the reverse rank of the integer part of the base 2 +logarithm of the distance. +It is calculated by counting the number of common leading zeros in the (MSB) +binary representation of the x^y. + +(0 farthest, 255 closest, 256 self) +*/ +func proximity(one, other Address) (ret int) { + for i := 0; i < len(one); i++ { + oxo := one[i] ^ other[i] + for j := 0; j < 8; j++ { + if (uint8(oxo)>>uint8(7-j))&0x01 != 0 { + return i*8 + j + } + } + } + return len(one) * 8 +} + +// Address.ProxCmp compares the distances a->target and b->target. +// Returns -1 if a is closer to target, 1 if b is closer to target +// and 0 if they are equal. +func (target Address) ProxCmp(a, b Address) int { + for i := range target { + da := a[i] ^ target[i] + db := b[i] ^ target[i] + if da > db { + return 1 + } else if da < db { + return -1 + } + } + return 0 +} + +// randomAddressAt(address, prox) generates a random address +// at proximity order prox relative to address +// if prox is negative a random address is generated +func RandomAddressAt(self Address, prox int) (addr Address) { + addr = self + var pos int + if prox >= 0 { + pos = prox / 8 + trans := prox % 8 + transbytea := byte(0) + for j := 0; j <= trans; j++ { + transbytea |= 1 << uint8(7-j) + } + flipbyte := byte(1 << uint8(7-trans)) + transbyteb := transbytea ^ byte(255) + randbyte := byte(rand.Intn(255)) + addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb + } + for i := pos + 1; i < len(addr); i++ { + addr[i] = byte(rand.Intn(255)) + } + + return +} + +// KeyRange(a0, a1, proxLimit) returns the address inclusive address +// range that contain addresses closer to one than other +func KeyRange(one, other Address, proxLimit int) (start, stop Address) { + prox := proximity(one, other) + if prox >= proxLimit { + prox = proxLimit + } + start = CommonBitsAddrByte(one, other, byte(0x00), prox) + stop = CommonBitsAddrByte(one, other, byte(0xff), prox) + return +} + +func CommonBitsAddrF(self, other Address, f func() byte, p int) (addr Address) { + prox := proximity(self, other) + var pos int + if p <= prox { + prox = p + } + pos = prox / 8 + addr = self + trans := byte(prox % 8) + var transbytea byte + if p > prox { + transbytea = byte(0x7f) + } else { + transbytea = byte(0xff) + } + transbytea >>= trans + transbyteb := transbytea ^ byte(0xff) + addrpos := addr[pos] + addrpos &= transbyteb + if p > prox { + addrpos ^= byte(0x80 >> trans) + } + addrpos |= transbytea & f() + addr[pos] = addrpos + for i := pos + 1; i < len(addr); i++ { + addr[i] = f() + } + + return +} + +func CommonBitsAddr(self, other Address, prox int) (addr Address) { + return CommonBitsAddrF(self, other, func() byte { return byte(rand.Intn(255)) }, prox) +} + +func CommonBitsAddrByte(self, other Address, b byte, prox int) (addr Address) { + return CommonBitsAddrF(self, other, func() byte { return b }, prox) +} + +// randomAddressAt() generates a random address +func RandomAddress() Address { + return RandomAddressAt(Address{}, -1) +} diff --git a/swarm/network/kademlia/address_test.go b/swarm/network/kademlia/address_test.go new file mode 100644 index 000000000..c062c8eaf --- /dev/null +++ b/swarm/network/kademlia/address_test.go @@ -0,0 +1,96 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "math/rand" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func (Address) Generate(rand *rand.Rand, size int) reflect.Value { + var id Address + for i := 0; i < len(id); i++ { + id[i] = byte(uint8(rand.Intn(255))) + } + return reflect.ValueOf(id) +} + +func TestCommonBitsAddrF(t *testing.T) { + a := Address(common.HexToHash("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + b := Address(common.HexToHash("0x8123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + c := Address(common.HexToHash("0x4123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + d := Address(common.HexToHash("0x0023456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + e := Address(common.HexToHash("0x01A3456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + ab := CommonBitsAddrF(a, b, func() byte { return byte(0x00) }, 10) + expab := Address(common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000")) + + if ab != expab { + t.Fatalf("%v != %v", ab, expab) + } + ac := CommonBitsAddrF(a, c, func() byte { return byte(0x00) }, 10) + expac := Address(common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000")) + + if ac != expac { + t.Fatalf("%v != %v", ac, expac) + } + ad := CommonBitsAddrF(a, d, func() byte { return byte(0x00) }, 10) + expad := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")) + + if ad != expad { + t.Fatalf("%v != %v", ad, expad) + } + ae := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 10) + expae := Address(common.HexToHash("0x0180000000000000000000000000000000000000000000000000000000000000")) + + if ae != expae { + t.Fatalf("%v != %v", ae, expae) + } + acf := CommonBitsAddrF(a, c, func() byte { return byte(0xff) }, 10) + expacf := Address(common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + + if acf != expacf { + t.Fatalf("%v != %v", acf, expacf) + } + aeo := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 2) + expaeo := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")) + + if aeo != expaeo { + t.Fatalf("%v != %v", aeo, expaeo) + } + aep := CommonBitsAddrF(a, e, func() byte { return byte(0xff) }, 2) + expaep := Address(common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + + if aep != expaep { + t.Fatalf("%v != %v", aep, expaep) + } + +} + +func TestRandomAddressAt(t *testing.T) { + var a Address + for i := 0; i < 100; i++ { + a = RandomAddress() + prox := rand.Intn(255) + b := RandomAddressAt(a, prox) + if proximity(a, b) != prox { + t.Fatalf("incorrect address prox(%v, %v) == %v (expected %v)", a, b, proximity(a, b), prox) + } + } +} diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go new file mode 100644 index 000000000..53a7db451 --- /dev/null +++ b/swarm/network/kademlia/kaddb.go @@ -0,0 +1,351 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +type NodeData interface { + json.Marshaler + json.Unmarshaler +} + +// allow inactive peers under +type NodeRecord struct { + Addr Address // address of node + Url string // Url, used to connect to node + After time.Time // next call after time + Seen time.Time // last connected at time + Meta *json.RawMessage // arbitrary metadata saved for a peer + + node Node +} + +func (self *NodeRecord) setSeen() { + t := time.Now() + self.Seen = t + self.After = t +} + +func (self *NodeRecord) String() string { + return fmt.Sprintf("<%v>", self.Addr) +} + +// persisted node record database () +type KadDb struct { + Address Address + Nodes [][]*NodeRecord + index map[Address]*NodeRecord + cursors []int + lock sync.RWMutex + purgeInterval time.Duration + initialRetryInterval time.Duration + connRetryExp int +} + +func newKadDb(addr Address, params *KadParams) *KadDb { + return &KadDb{ + Address: addr, + Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load + cursors: make([]int, params.MaxProx+1), + index: make(map[Address]*NodeRecord), + purgeInterval: params.PurgeInterval, + initialRetryInterval: params.InitialRetryInterval, + connRetryExp: params.ConnRetryExp, + } +} + +func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord { + defer self.lock.Unlock() + self.lock.Lock() + + record, found := self.index[a] + if !found { + record = &NodeRecord{ + Addr: a, + Url: url, + } + glog.V(logger.Info).Infof("add new record %v to kaddb", record) + // insert in kaddb + self.index[a] = record + self.Nodes[index] = append(self.Nodes[index], record) + } else { + glog.V(logger.Info).Infof("found record %v in kaddb", record) + } + // update last seen time + record.setSeen() + // update with url in case IP/port changes + record.Url = url + return record +} + +// add adds node records to kaddb (persisted node record db) +func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) { + defer self.lock.Unlock() + self.lock.Lock() + var n int + var nodes []*NodeRecord + for _, node := range nrs { + _, found := self.index[node.Addr] + if !found && node.Addr != self.Address { + node.setSeen() + self.index[node.Addr] = node + index := proximityBin(node.Addr) + dbcursor := self.cursors[index] + nodes = self.Nodes[index] + // this is inefficient for allocation, need to just append then shift + newnodes := make([]*NodeRecord, len(nodes)+1) + copy(newnodes[:], nodes[:dbcursor]) + newnodes[dbcursor] = node + copy(newnodes[dbcursor+1:], nodes[dbcursor:]) + glog.V(logger.Detail).Infof("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes) + self.Nodes[index] = newnodes + n++ + } + } + if n > 0 { + glog.V(logger.Debug).Infof("%d/%d node records (new/known)", n, len(nrs)) + } +} + +/* +next return one node record with the highest priority for desired +connection. +This is used to pick candidates for live nodes that are most wanted for +a higly connected low centrality network structure for Swarm which best suits +for a Kademlia-style routing. + +* Starting as naive node with empty db, this implements Kademlia bootstrapping +* As a mature node, it fills short lines. All on demand. + +The candidate is chosen using the following strategy: +We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds. +On each round we proceed from the low to high proximity order buckets. +If the number of active nodes (=connected peers) is < rounds, then start looking +for a known candidate. To determine if there is a candidate to recommend the +kaddb node record database row corresponding to the bucket is checked. + +If the row cursor is on position i, the ith element in the row is chosen. +If the record is scheduled not to be retried before NOW, the next element is taken. +If the record is scheduled to be retried, it is set as checked, scheduled for +checking and is returned. The time of the next check is in X (duration) such that +X = ConnRetryExp * delta where delta is the time past since the last check and +ConnRetryExp is constant obsoletion factor. (Note that when node records are added +from peer messages, they are marked as checked and placed at the cursor, ie. +given priority over older entries). Entries which were checked more than +purgeInterval ago are deleted from the kaddb row. If no candidate is found after +a full round of checking the next bucket up is considered. If no candidate is +found when we reach the maximum-proximity bucket, the next round starts. + +node record a is more favoured to b a > b iff a is a passive node (record of +offline past peer) +|proxBin(a)| < |proxBin(b)| +|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|) +|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b)) + + +The second argument returned names the first missing slot found +*/ +func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) { + // return nil, proxLimit indicates that all buckets are filled + defer self.lock.Unlock() + self.lock.Lock() + + var interval time.Duration + var found bool + var purge []bool + var delta time.Duration + var cursor int + var count int + var after time.Time + + // iterate over columns maximum bucketsize times + for rounds := 1; rounds <= maxBinSize; rounds++ { + ROUND: + // iterate over rows from PO 0 upto MaxProx + for po, dbrow := range self.Nodes { + // if row has rounds connected peers, then take the next + if binSize(po) >= rounds { + continue ROUND + } + if !need { + // set proxlimit to the PO where the first missing slot is found + proxLimit = po + need = true + } + purge = make([]bool, len(dbrow)) + + // there is a missing slot - finding a node to connect to + // select a node record from the relavant kaddb row (of identical prox order) + ROW: + for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) { + count++ + node = dbrow[cursor] + + // skip already connected nodes + if node.node != nil { + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)) + continue ROW + } + + // if node is scheduled to connect + if time.Time(node.After).After(time.Now()) { + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After) + continue ROW + } + + delta = time.Since(time.Time(node.Seen)) + if delta < self.initialRetryInterval { + delta = self.initialRetryInterval + } + if delta > self.purgeInterval { + // remove node + purge[cursor] = true + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen) + continue ROW + } + + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After) + + // scheduling next check + interval = time.Duration(delta * time.Duration(self.connRetryExp)) + after = time.Now().Add(interval) + + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval) + node.After = after + found = true + } // ROW + self.cursors[po] = cursor + self.delete(po, purge) + if found { + return node, need, proxLimit + } + } // ROUND + } // ROUNDS + + return nil, need, proxLimit +} + +// deletes the noderecords of a kaddb row corresponding to the indexes +// caller must hold the dblock +// the call is unsafe, no index checks +func (self *KadDb) delete(row int, purge []bool) { + var nodes []*NodeRecord + dbrow := self.Nodes[row] + for i, del := range purge { + if i == self.cursors[row] { + //reset cursor + self.cursors[row] = len(nodes) + } + // delete the entry to be purged + if del { + delete(self.index, dbrow[i].Addr) + continue + } + // otherwise append to new list + nodes = append(nodes, dbrow[i]) + } + self.Nodes[row] = nodes +} + +// save persists kaddb on disk (written to file on path in json format. +func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error { + defer self.lock.Unlock() + self.lock.Lock() + + var n int + + for _, b := range self.Nodes { + for _, node := range b { + n++ + node.After = time.Now() + node.Seen = time.Now() + if cb != nil { + cb(node, node.node) + } + } + } + + data, err := json.MarshalIndent(self, "", " ") + if err != nil { + return err + } + err = ioutil.WriteFile(path, data, os.ModePerm) + if err != nil { + glog.V(logger.Warn).Infof("unable to save kaddb with %v nodes to %v: err", n, path, err) + } else { + glog.V(logger.Info).Infof("saved kaddb with %v nodes to %v", n, path) + } + return err +} + +// Load(path) loads the node record database (kaddb) from file on path. +func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) { + defer self.lock.Unlock() + self.lock.Lock() + + var data []byte + data, err = ioutil.ReadFile(path) + if err != nil { + return + } + + err = json.Unmarshal(data, self) + if err != nil { + return + } + var n int + var purge []bool + for po, b := range self.Nodes { + purge = make([]bool, len(b)) + ROW: + for i, node := range b { + if cb != nil { + err = cb(node, node.node) + if err != nil { + purge[i] = true + continue ROW + } + } + n++ + if (node.After == time.Time{}) { + node.After = time.Now() + } + self.index[node.Addr] = node + } + self.delete(po, purge) + } + glog.V(logger.Info).Infof("loaded kaddb with %v nodes from %v", n, path) + + return +} + +// accessor for KAD offline db count +func (self *KadDb) count() int { + defer self.lock.Unlock() + self.lock.Lock() + return len(self.index) +} diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go new file mode 100644 index 000000000..87c57cefe --- /dev/null +++ b/swarm/network/kademlia/kademlia.go @@ -0,0 +1,429 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +const ( + bucketSize = 4 + proxBinSize = 2 + maxProx = 8 + connRetryExp = 2 + maxPeers = 100 +) + +var ( + purgeInterval = 42 * time.Hour + initialRetryInterval = 42 * time.Millisecond + maxIdleInterval = 42 * 1000 * time.Millisecond + // maxIdleInterval = 42 * 10 0 * time.Millisecond +) + +type KadParams struct { + // adjustable parameters + MaxProx int + ProxBinSize int + BucketSize int + PurgeInterval time.Duration + InitialRetryInterval time.Duration + MaxIdleInterval time.Duration + ConnRetryExp int +} + +func NewKadParams() *KadParams { + return &KadParams{ + MaxProx: maxProx, + ProxBinSize: proxBinSize, + BucketSize: bucketSize, + PurgeInterval: purgeInterval, + InitialRetryInterval: initialRetryInterval, + MaxIdleInterval: maxIdleInterval, + ConnRetryExp: connRetryExp, + } +} + +// Kademlia is a table of active nodes +type Kademlia struct { + addr Address // immutable baseaddress of the table + *KadParams // Kademlia configuration parameters + proxLimit int // state, the PO of the first row of the most proximate bin + proxSize int // state, the number of peers in the most proximate bin + count int // number of active peers (w live connection) + buckets [][]Node // the actual bins + db *KadDb // kaddb, node record database + lock sync.RWMutex // mutex to access buckets +} + +type Node interface { + Addr() Address + Url() string + LastActive() time.Time + Drop() +} + +// public constructor +// add is the base address of the table +// params is KadParams configuration +func New(addr Address, params *KadParams) *Kademlia { + buckets := make([][]Node, params.MaxProx+1) + return &Kademlia{ + addr: addr, + KadParams: params, + buckets: buckets, + db: newKadDb(addr, params), + } +} + +// accessor for KAD base address +func (self *Kademlia) Addr() Address { + return self.addr +} + +// accessor for KAD active node count +func (self *Kademlia) Count() int { + defer self.lock.Unlock() + self.lock.Lock() + return self.count +} + +// accessor for KAD active node count +func (self *Kademlia) DBCount() int { + return self.db.count() +} + +// On is the entry point called when a new nodes is added +// unsafe in that node is not checked to be already active node (to be called once) +func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) { + glog.V(logger.Warn).Infof("%v", self) + defer self.lock.Unlock() + self.lock.Lock() + + index := self.proximityBin(node.Addr()) + record := self.db.findOrCreate(index, node.Addr(), node.Url()) + + if cb != nil { + err = cb(record, node) + glog.V(logger.Detail).Infof("cb(%v, %v) ->%v", record, node, err) + if err != nil { + return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err) + } + glog.V(logger.Debug).Infof("add node record %v with node %v", record, node) + } + + // insert in kademlia table of active nodes + bucket := self.buckets[index] + // if bucket is full insertion replaces the worst node + // TODO: give priority to peers with active traffic + if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation + self.buckets[index] = append(bucket, node) + glog.V(logger.Debug).Infof("add node %v to table", node) + self.setProxLimit(index, true) + record.node = node + self.count++ + return nil + } + + // always rotate peers + idle := self.MaxIdleInterval + var pos int + var replaced Node + for i, p := range bucket { + idleInt := time.Since(p.LastActive()) + if idleInt > idle { + idle = idleInt + pos = i + replaced = p + } + } + if replaced == nil { + glog.V(logger.Debug).Infof("all peers wanted, PO%03d bucket full", index) + return fmt.Errorf("bucket full") + } + glog.V(logger.Debug).Infof("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval) + replaced.Drop() + // actually replace in the row. When off(node) is called, the peer is no longer in the row + bucket[pos] = node + // there is no change in bucket cardinalities so no prox limit adjustment is needed + record.node = node + self.count++ + return nil + +} + +// Off is the called when a node is taken offline (from the protocol main loop exit) +func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) { + self.lock.Lock() + defer self.lock.Unlock() + + index := self.proximityBin(node.Addr()) + bucket := self.buckets[index] + for i := 0; i < len(bucket); i++ { + if node.Addr() == bucket[i].Addr() { + self.buckets[index] = append(bucket[:i], bucket[(i+1):]...) + self.setProxLimit(index, false) + break + } + } + + record := self.db.index[node.Addr()] + // callback on remove + if cb != nil { + cb(record, record.node) + } + record.node = nil + self.count-- + glog.V(logger.Debug).Infof("remove node %v from table, population now is %v", node, self.count) + + return +} + +// proxLimit is dynamically adjusted so that +// 1) there is no empty buckets in bin < proxLimit and +// 2) the sum of all items are the minimum possible but higher than ProxBinSize +// adjust Prox (proxLimit and proxSize after an insertion/removal of nodes) +// caller holds the lock +func (self *Kademlia) setProxLimit(r int, on bool) { + // if the change is outside the core (PO lower) + // and the change does not leave a bucket empty then + // no adjustment needed + if r < self.proxLimit && len(self.buckets[r]) > 0 { + return + } + // if on=a node was added, then r must be within prox limit so increment cardinality + if on { + self.proxSize++ + curr := len(self.buckets[self.proxLimit]) + // if now core is big enough without the furthest bucket, then contract + // this can result in more than one bucket change + for self.proxSize >= self.ProxBinSize+curr && curr > 0 { + self.proxSize -= curr + self.proxLimit++ + curr = len(self.buckets[self.proxLimit]) + + glog.V(logger.Detail).Infof("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r) + } + return + } + // otherwise + if r >= self.proxLimit { + self.proxSize-- + } + // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality + for (self.proxSize < self.ProxBinSize || r < self.proxLimit) && + self.proxLimit > 0 { + // + self.proxLimit-- + self.proxSize += len(self.buckets[self.proxLimit]) + glog.V(logger.Detail).Infof("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r) + } +} + +/* +returns the list of nodes belonging to the same proximity bin +as the target. The most proximate bin will be the union of the bins between +proxLimit and MaxProx. +*/ +func (self *Kademlia) FindClosest(target Address, max int) []Node { + self.lock.Lock() + defer self.lock.Unlock() + + r := nodesByDistance{ + target: target, + } + + po := self.proximityBin(target) + index := po + step := 1 + glog.V(logger.Detail).Infof("serving %v nodes at %v (PO%02d)", max, index, po) + + // if max is set to 0, just want a full bucket, dynamic number + min := max + // set limit to max + limit := max + if max == 0 { + min = 1 + limit = maxPeers + } + + var n int + for index >= 0 { + // add entire bucket + for _, p := range self.buckets[index] { + r.push(p, limit) + n++ + } + // terminate if index reached the bottom or enough peers > min + glog.V(logger.Detail).Infof("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po) + if n >= min && (step < 0 || max == 0) { + break + } + // reach top most non-empty PO bucket, turn around + if index == self.MaxProx { + index = po + step = -1 + } + index += step + } + glog.V(logger.Detail).Infof("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po) + return r.nodes +} + +func (self *Kademlia) Suggest() (*NodeRecord, bool, int) { + defer self.lock.RUnlock() + self.lock.RLock() + return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) }) +} + +// adds node records to kaddb (persisted node record db) +func (self *Kademlia) Add(nrs []*NodeRecord) { + self.db.add(nrs, self.proximityBin) +} + +// nodesByDistance is a list of nodes, ordered by distance to target. +type nodesByDistance struct { + nodes []Node + target Address +} + +func sortedByDistanceTo(target Address, slice []Node) bool { + var last Address + for i, node := range slice { + if i > 0 { + if target.ProxCmp(node.Addr(), last) < 0 { + return false + } + } + last = node.Addr() + } + return true +} + +// push(node, max) adds the given node to the list, keeping the total size +// below max elements. +func (h *nodesByDistance) push(node Node, max int) { + // returns the firt index ix such that func(i) returns true + ix := sort.Search(len(h.nodes), func(i int) bool { + return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0 + }) + + if len(h.nodes) < max { + h.nodes = append(h.nodes, node) + } + if ix < len(h.nodes) { + copy(h.nodes[ix+1:], h.nodes[ix:]) + h.nodes[ix] = node + } +} + +/* +Taking the proximity order relative to a fix point x classifies the points in +the space (n byte long byte sequences) into bins. Items in each are at +most half as distant from x as items in the previous bin. Given a sample of +uniformly distributed items (a hash function over arbitrary sequence) the +proximity scale maps onto series of subsets with cardinalities on a negative +exponential scale. + +It also has the property that any two item belonging to the same bin are at +most half as distant from each other as they are from x. + +If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local +decisions for graph traversal where the task is to find a route between two +points. Since in every hop, the finite distance halves, there is +a guaranteed constant maximum limit on the number of hops needed to reach one +node from the other. +*/ + +func (self *Kademlia) proximityBin(other Address) (ret int) { + ret = proximity(self.addr, other) + if ret > self.MaxProx { + ret = self.MaxProx + } + return +} + +// provides keyrange for chunk db iteration +func (self *Kademlia) KeyRange(other Address) (start, stop Address) { + defer self.lock.RUnlock() + self.lock.RLock() + return KeyRange(self.addr, other, self.proxLimit) +} + +// save persists kaddb on disk (written to file on path in json format. +func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error { + return self.db.save(path, cb) +} + +// Load(path) loads the node record database (kaddb) from file on path. +func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) { + return self.db.load(path, cb) +} + +// kademlia table + kaddb table displayed with ascii +func (self *Kademlia) String() string { + defer self.lock.RUnlock() + self.lock.RLock() + defer self.db.lock.RUnlock() + self.db.lock.RLock() + + var rows []string + rows = append(rows, "=========================================================================") + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6])) + rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize)) + rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize)) + + for i, bucket := range self.buckets { + + if i == self.proxLimit { + rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i)) + } + row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))} + var k int + c := self.db.cursors[i] + for ; k < len(bucket); k++ { + p := bucket[(c+k)%len(bucket)] + row = append(row, p.Addr().String()[:6]) + if k == 4 { + break + } + } + for ; k < 4; k++ { + row = append(row, " ") + } + row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i])) + + for j, p := range self.db.Nodes[i] { + row = append(row, p.Addr.String()[:6]) + if j == 3 { + break + } + } + rows = append(rows, strings.Join(row, " ")) + if i == self.MaxProx { + } + } + rows = append(rows, "=========================================================================") + return strings.Join(rows, "\n") +} diff --git a/swarm/network/kademlia/kademlia_test.go b/swarm/network/kademlia/kademlia_test.go new file mode 100644 index 000000000..66edfe654 --- /dev/null +++ b/swarm/network/kademlia/kademlia_test.go @@ -0,0 +1,392 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "fmt" + "math" + "math/rand" + "os" + "path/filepath" + "reflect" + "testing" + "testing/quick" + "time" +) + +var ( + quickrand = rand.New(rand.NewSource(time.Now().Unix())) + quickcfgFindClosest = &quick.Config{MaxCount: 50, Rand: quickrand} + quickcfgBootStrap = &quick.Config{MaxCount: 100, Rand: quickrand} +) + +type testNode struct { + addr Address +} + +func (n *testNode) String() string { + return fmt.Sprintf("%x", n.addr[:]) +} + +func (n *testNode) Addr() Address { + return n.addr +} + +func (n *testNode) Drop() { +} + +func (n *testNode) Url() string { + return "" +} + +func (n *testNode) LastActive() time.Time { + return time.Now() +} + +func TestOn(t *testing.T) { + addr, ok := gen(Address{}, quickrand).(Address) + other, ok := gen(Address{}, quickrand).(Address) + if !ok { + t.Errorf("oops") + } + kad := New(addr, NewKadParams()) + err := kad.On(&testNode{addr: other}, nil) + _ = err +} + +func TestBootstrap(t *testing.T) { + + test := func(test *bootstrapTest) bool { + // for any node kad.le, Target and N + params := NewKadParams() + params.MaxProx = test.MaxProx + params.BucketSize = test.BucketSize + params.ProxBinSize = test.BucketSize + kad := New(test.Self, params) + var err error + + for p := 0; p < 9; p++ { + var nrs []*NodeRecord + n := math.Pow(float64(2), float64(7-p)) + for i := 0; i < int(n); i++ { + addr := RandomAddressAt(test.Self, p) + nrs = append(nrs, &NodeRecord{ + Addr: addr, + }) + } + kad.Add(nrs) + } + + node := &testNode{test.Self} + + n := 0 + for n < 100 { + err = kad.On(node, nil) + if err != nil { + t.Fatalf("backend not accepting node: %v", err) + } + + record, need, _ := kad.Suggest() + if !need { + break + } + n++ + if record == nil { + continue + } + node = &testNode{record.Addr} + } + exp := test.BucketSize * (test.MaxProx + 1) + if kad.Count() != exp { + t.Errorf("incorrect number of peers, expected %d, got %d\n%v", exp, kad.Count(), kad) + return false + } + return true + } + if err := quick.Check(test, quickcfgBootStrap); err != nil { + t.Error(err) + } + +} + +func TestFindClosest(t *testing.T) { + + test := func(test *FindClosestTest) bool { + // for any node kad.le, Target and N + params := NewKadParams() + params.MaxProx = 7 + kad := New(test.Self, params) + var err error + for _, node := range test.All { + err = kad.On(node, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + } + + if len(test.All) == 0 || test.N == 0 { + return true + } + nodes := kad.FindClosest(test.Target, test.N) + + // check that the number of results is min(N, kad.len) + wantN := test.N + if tlen := kad.Count(); tlen < test.N { + wantN = tlen + } + + if len(nodes) != wantN { + t.Errorf("wrong number of nodes: got %d, want %d", len(nodes), wantN) + return false + } + + if hasDuplicates(nodes) { + t.Errorf("result contains duplicates") + return false + } + + if !sortedByDistanceTo(test.Target, nodes) { + t.Errorf("result is not sorted by distance to target") + return false + } + + // check that the result nodes have minimum distance to target. + farthestResult := nodes[len(nodes)-1].Addr() + for i, b := range kad.buckets { + for j, n := range b { + if contains(nodes, n.Addr()) { + continue // don't run the check below for nodes in result + } + if test.Target.ProxCmp(n.Addr(), farthestResult) < 0 { + _ = i * j + t.Errorf("kad.le contains node that is closer to target but it's not in result") + return false + } + } + } + return true + } + if err := quick.Check(test, quickcfgFindClosest); err != nil { + t.Error(err) + } +} + +type proxTest struct { + add bool + index int + addr Address +} + +var ( + addresses []Address +) + +func TestProxAdjust(t *testing.T) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + self := gen(Address{}, r).(Address) + params := NewKadParams() + params.MaxProx = 7 + kad := New(self, params) + + var err error + for i := 0; i < 100; i++ { + a := gen(Address{}, r).(Address) + addresses = append(addresses, a) + err = kad.On(&testNode{addr: a}, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + if !kad.proxCheck(t) { + return + } + } + test := func(test *proxTest) bool { + node := &testNode{test.addr} + if test.add { + kad.On(node, nil) + } else { + kad.Off(node, nil) + } + return kad.proxCheck(t) + } + if err := quick.Check(test, quickcfgFindClosest); err != nil { + t.Error(err) + } +} + +func TestSaveLoad(t *testing.T) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + addresses := gen([]Address{}, r).([]Address) + self := RandomAddress() + params := NewKadParams() + params.MaxProx = 7 + kad := New(self, params) + + var err error + + for _, a := range addresses { + err = kad.On(&testNode{addr: a}, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + } + nodes := kad.FindClosest(self, 100) + + path := filepath.Join(os.TempDir(), "bzz-kad-test-save-load.peers") + err = kad.Save(path, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("unepected error saving kaddb: %v", err) + } + kad = New(self, params) + err = kad.Load(path, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("unepected error loading kaddb: %v", err) + } + for _, b := range kad.db.Nodes { + for _, node := range b { + err = kad.On(&testNode{node.Addr}, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + } + } + loadednodes := kad.FindClosest(self, 100) + for i, node := range loadednodes { + if nodes[i].Addr() != node.Addr() { + t.Errorf("node mismatch at %d/%d: %v != %v", i, len(nodes), nodes[i].Addr(), node.Addr()) + } + } +} + +func (self *Kademlia) proxCheck(t *testing.T) bool { + var sum int + for i, b := range self.buckets { + l := len(b) + // if we are in the high prox multibucket + if i >= self.proxLimit { + sum += l + } else if l == 0 { + t.Errorf("bucket %d empty, yet proxLimit is %d\n%v", len(b), self.proxLimit, self) + return false + } + } + // check if merged high prox bucket does not exceed size + if sum > 0 { + if sum != self.proxSize { + t.Errorf("proxSize incorrect, expected %v, got %v", sum, self.proxSize) + return false + } + last := len(self.buckets[self.proxLimit]) + if last > 0 && sum >= self.ProxBinSize+last { + t.Errorf("proxLimit %v incorrect, redundant non-empty bucket %d added to proxBin with %v (target %v)\n%v", self.proxLimit, last, sum-last, self.ProxBinSize, self) + return false + } + if self.proxLimit > 0 && sum < self.ProxBinSize { + t.Errorf("proxLimit %v incorrect. proxSize %v is less than target %v, yet there is more peers", self.proxLimit, sum, self.ProxBinSize) + return false + } + } + return true +} + +type bootstrapTest struct { + MaxProx int + BucketSize int + Self Address +} + +func (*bootstrapTest) Generate(rand *rand.Rand, size int) reflect.Value { + t := &bootstrapTest{ + Self: gen(Address{}, rand).(Address), + MaxProx: 5 + rand.Intn(2), + BucketSize: rand.Intn(3) + 1, + } + return reflect.ValueOf(t) +} + +type FindClosestTest struct { + Self Address + Target Address + All []Node + N int +} + +func (c FindClosestTest) String() string { + return fmt.Sprintf("A: %064x\nT: %064x\n(%d)\n", c.Self[:], c.Target[:], c.N) +} + +func (*FindClosestTest) Generate(rand *rand.Rand, size int) reflect.Value { + t := &FindClosestTest{ + Self: gen(Address{}, rand).(Address), + Target: gen(Address{}, rand).(Address), + N: rand.Intn(bucketSize), + } + for _, a := range gen([]Address{}, rand).([]Address) { + t.All = append(t.All, &testNode{addr: a}) + } + return reflect.ValueOf(t) +} + +func (*proxTest) Generate(rand *rand.Rand, size int) reflect.Value { + var add bool + if rand.Intn(1) == 0 { + add = true + } + var t *proxTest + if add { + t = &proxTest{ + addr: gen(Address{}, rand).(Address), + add: add, + } + } else { + t = &proxTest{ + index: rand.Intn(len(addresses)), + add: add, + } + } + return reflect.ValueOf(t) +} + +func hasDuplicates(slice []Node) bool { + seen := make(map[Address]bool) + for _, node := range slice { + if seen[node.Addr()] { + return true + } + seen[node.Addr()] = true + } + return false +} + +func contains(nodes []Node, addr Address) bool { + for _, n := range nodes { + if n.Addr() == addr { + return true + } + } + return false +} + +// gen wraps quick.Value so it's easier to use. +// it generates a random value of the given value's type. +func gen(typ interface{}, rand *rand.Rand) interface{} { + v, ok := quick.Value(reflect.TypeOf(typ), rand) + if !ok { + panic(fmt.Sprintf("couldn't generate random value of type %T", typ)) + } + return v.Interface() +} |