// Copyright 2016 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package kademlia import ( "encoding/json" "fmt" "io/ioutil" "os" "sync" "time" "github.com/ethereum/go-ethereum/log" ) type NodeData interface { json.Marshaler json.Unmarshaler } // allow inactive peers under type NodeRecord struct { Addr Address // address of node Url string // Url, used to connect to node After time.Time // next call after time Seen time.Time // last connected at time Meta *json.RawMessage // arbitrary metadata saved for a peer node Node } func (self *NodeRecord) setSeen() { t := time.Now() self.Seen = t self.After = t } func (self *NodeRecord) String() string { return fmt.Sprintf("<%v>", self.Addr) } // persisted node record database () type KadDb struct { Address Address Nodes [][]*NodeRecord index map[Address]*NodeRecord cursors []int lock sync.RWMutex purgeInterval time.Duration initialRetryInterval time.Duration connRetryExp int } func newKadDb(addr Address, params *KadParams) *KadDb { return &KadDb{ Address: addr, Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load cursors: make([]int, params.MaxProx+1), index: make(map[Address]*NodeRecord), purgeInterval: params.PurgeInterval, initialRetryInterval: params.InitialRetryInterval, connRetryExp: params.ConnRetryExp, } } func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord { defer self.lock.Unlock() self.lock.Lock() record, found := self.index[a] if !found { record = &NodeRecord{ Addr: a, Url: url, } log.Info(fmt.Sprintf("add new record %v to kaddb", record)) // insert in kaddb self.index[a] = record self.Nodes[index] = append(self.Nodes[index], record) } else { log.Info(fmt.Sprintf("found record %v in kaddb", record)) } // update last seen time record.setSeen() // update with url in case IP/port changes record.Url = url return record } // add adds node records to kaddb (persisted node record db) func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) { defer self.lock.Unlock() self.lock.Lock() var n int var nodes []*NodeRecord for _, node := range nrs { _, found := self.index[node.Addr] if !found && node.Addr != self.Address { node.setSeen() self.index[node.Addr] = node index := proximityBin(node.Addr) dbcursor := self.cursors[index] nodes = self.Nodes[index] // this is inefficient for allocation, need to just append then shift newnodes := make([]*NodeRecord, len(nodes)+1) copy(newnodes[:], nodes[:dbcursor]) newnodes[dbcursor] = node copy(newnodes[dbcursor+1:], nodes[dbcursor:]) log.Trace(fmt.Sprintf("new nodes: %v, nodes: %v", newnodes, nodes)) self.Nodes[index] = newnodes n++ } } if n > 0 { log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs))) } } /* next return one node record with the highest priority for desired connection. This is used to pick candidates for live nodes that are most wanted for a higly connected low centrality network structure for Swarm which best suits for a Kademlia-style routing. * Starting as naive node with empty db, this implements Kademlia bootstrapping * As a mature node, it fills short lines. All on demand. The candidate is chosen using the following strategy: We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds. On each round we proceed from the low to high proximity order buckets. If the number of active nodes (=connected peers) is < rounds, then start looking for a known candidate. To determine if there is a candidate to recommend the kaddb node record database row corresponding to the bucket is checked. If the row cursor is on position i, the ith element in the row is chosen. If the record is scheduled not to be retried before NOW, the next element is taken. If the record is scheduled to be retried, it is set as checked, scheduled for checking and is returned. The time of the next check is in X (duration) such that X = ConnRetryExp * delta where delta is the time past since the last check and ConnRetryExp is constant obsoletion factor. (Note that when node records are added from peer messages, they are marked as checked and placed at the cursor, ie. given priority over older entries). Entries which were checked more than purgeInterval ago are deleted from the kaddb row. If no candidate is found after a full round of checking the next bucket up is considered. If no candidate is found when we reach the maximum-proximity bucket, the next round starts. node record a is more favoured to b a > b iff a is a passive node (record of offline past peer) |proxBin(a)| < |proxBin(b)| || (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|) || (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b)) The second argument returned names the first missing slot found */ func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) { // return nil, proxLimit indicates that all buckets are filled defer self.lock.Unlock() self.lock.Lock() var interval time.Duration var found bool var purge []bool var delta time.Duration var cursor int var count int var after time.Time // iterate over columns maximum bucketsize times for rounds := 1; rounds <= maxBinSize; rounds++ { ROUND: // iterate over rows from PO 0 upto MaxProx for po, dbrow := range self.Nodes { // if row has rounds connected peers, then take the next if binSize(po) >= rounds { continue ROUND } if !need { // set proxlimit to the PO where the first missing slot is found proxLimit = po need = true } purge = make([]bool, len(dbrow)) // there is a missing slot - finding a node to connect to // select a node record from the relavant kaddb row (of identical prox order) ROW: for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) { count++ node = dbrow[cursor] // skip already connected nodes if node.node != nil { log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow))) continue ROW } // if node is scheduled to connect if node.After.After(time.Now()) { log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)) continue ROW } delta = time.Since(node.Seen) if delta < self.initialRetryInterval { delta = self.initialRetryInterval } if delta > self.purgeInterval { // remove node purge[cursor] = true log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen)) continue ROW } log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)) // scheduling next check interval = delta * time.Duration(self.connRetryExp) after = time.Now().Add(interval) log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval)) node.After = after found = true } // ROW self.cursors[po] = cursor self.delete(po, purge) if found { return node, need, proxLimit } } // ROUND } // ROUNDS return nil, need, proxLimit } // deletes the noderecords of a kaddb row corresponding to the indexes // caller must hold the dblock // the call is unsafe, no index checks func (self *KadDb) delete(row int, purge []bool) { var nodes []*NodeRecord dbrow := self.Nodes[row] for i, del := range purge { if i == self.cursors[row] { //reset cursor self.cursors[row] = len(nodes) } // delete the entry to be purged if del { delete(self.index, dbrow[i].Addr) continue } // otherwise append to new list nodes = append(nodes, dbrow[i]) } self.Nodes[row] = nodes } // save persists kaddb on disk (written to file on path in json format. func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error { defer self.lock.Unlock() self.lock.Lock() var n int for _, b := range self.Nodes { for _, node := range b { n++ node.After = time.Now() node.Seen = time.Now() if cb != nil { cb(node, node.node) } } } data, err := json.MarshalIndent(self, "", " ") if err != nil { return err } err = ioutil.WriteFile(path, data, os.ModePerm) if err != nil { log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: %v", n, path, err)) } else { log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path)) } return err } // Load(path) loads the node record database (kaddb) from file on path. func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) { defer self.lock.Unlock() self.lock.Lock() var data []byte data, err = ioutil.ReadFile(path) if err != nil { return } err = json.Unmarshal(data, self) if err != nil { return } var n int var purge []bool for po, b := range self.Nodes { purge = make([]bool, len(b)) ROW: for i, node := range b { if cb != nil { err = cb(node, node.node) if err != nil { purge[i] = true continue ROW } } n++ if (node.After == time.Time{}) { node.After = time.Now() } self.index[node.Addr] = node } self.delete(po, purge) } log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path)) return } // accessor for KAD offline db count func (self *KadDb) count() int { defer self.lock.Unlock() self.lock.Lock() return len(self.index) }