1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package kademlia
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
type NodeData interface {
json.Marshaler
json.Unmarshaler
}
// allow inactive peers under
type NodeRecord struct {
Addr Address // address of node
Url string // Url, used to connect to node
After time.Time // next call after time
Seen time.Time // last connected at time
Meta *json.RawMessage // arbitrary metadata saved for a peer
node Node
}
func (self *NodeRecord) setSeen() {
t := time.Now()
self.Seen = t
self.After = t
}
func (self *NodeRecord) String() string {
return fmt.Sprintf("<%v>", self.Addr)
}
// persisted node record database ()
type KadDb struct {
Address Address
Nodes [][]*NodeRecord
index map[Address]*NodeRecord
cursors []int
lock sync.RWMutex
purgeInterval time.Duration
initialRetryInterval time.Duration
connRetryExp int
}
func newKadDb(addr Address, params *KadParams) *KadDb {
return &KadDb{
Address: addr,
Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load
cursors: make([]int, params.MaxProx+1),
index: make(map[Address]*NodeRecord),
purgeInterval: params.PurgeInterval,
initialRetryInterval: params.InitialRetryInterval,
connRetryExp: params.ConnRetryExp,
}
}
func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
defer self.lock.Unlock()
self.lock.Lock()
record, found := self.index[a]
if !found {
record = &NodeRecord{
Addr: a,
Url: url,
}
log.Info(fmt.Sprintf("add new record %v to kaddb", record))
// insert in kaddb
self.index[a] = record
self.Nodes[index] = append(self.Nodes[index], record)
} else {
log.Info(fmt.Sprintf("found record %v in kaddb", record))
}
// update last seen time
record.setSeen()
// update with url in case IP/port changes
record.Url = url
return record
}
// add adds node records to kaddb (persisted node record db)
func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
defer self.lock.Unlock()
self.lock.Lock()
var n int
var nodes []*NodeRecord
for _, node := range nrs {
_, found := self.index[node.Addr]
if !found && node.Addr != self.Address {
node.setSeen()
self.index[node.Addr] = node
index := proximityBin(node.Addr)
dbcursor := self.cursors[index]
nodes = self.Nodes[index]
// this is inefficient for allocation, need to just append then shift
newnodes := make([]*NodeRecord, len(nodes)+1)
copy(newnodes[:], nodes[:dbcursor])
newnodes[dbcursor] = node
copy(newnodes[dbcursor+1:], nodes[dbcursor:])
log.Trace(fmt.Sprintf("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes))
self.Nodes[index] = newnodes
n++
}
}
if n > 0 {
log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs)))
}
}
/*
next return one node record with the highest priority for desired
connection.
This is used to pick candidates for live nodes that are most wanted for
a higly connected low centrality network structure for Swarm which best suits
for a Kademlia-style routing.
* Starting as naive node with empty db, this implements Kademlia bootstrapping
* As a mature node, it fills short lines. All on demand.
The candidate is chosen using the following strategy:
We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds.
On each round we proceed from the low to high proximity order buckets.
If the number of active nodes (=connected peers) is < rounds, then start looking
for a known candidate. To determine if there is a candidate to recommend the
kaddb node record database row corresponding to the bucket is checked.
If the row cursor is on position i, the ith element in the row is chosen.
If the record is scheduled not to be retried before NOW, the next element is taken.
If the record is scheduled to be retried, it is set as checked, scheduled for
checking and is returned. The time of the next check is in X (duration) such that
X = ConnRetryExp * delta where delta is the time past since the last check and
ConnRetryExp is constant obsoletion factor. (Note that when node records are added
from peer messages, they are marked as checked and placed at the cursor, ie.
given priority over older entries). Entries which were checked more than
purgeInterval ago are deleted from the kaddb row. If no candidate is found after
a full round of checking the next bucket up is considered. If no candidate is
found when we reach the maximum-proximity bucket, the next round starts.
node record a is more favoured to b a > b iff a is a passive node (record of
offline past peer)
|proxBin(a)| < |proxBin(b)|
|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|)
|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b))
The second argument returned names the first missing slot found
*/
func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) {
// return nil, proxLimit indicates that all buckets are filled
defer self.lock.Unlock()
self.lock.Lock()
var interval time.Duration
var found bool
var purge []bool
var delta time.Duration
var cursor int
var count int
var after time.Time
// iterate over columns maximum bucketsize times
for rounds := 1; rounds <= maxBinSize; rounds++ {
ROUND:
// iterate over rows from PO 0 upto MaxProx
for po, dbrow := range self.Nodes {
// if row has rounds connected peers, then take the next
if binSize(po) >= rounds {
continue ROUND
}
if !need {
// set proxlimit to the PO where the first missing slot is found
proxLimit = po
need = true
}
purge = make([]bool, len(dbrow))
// there is a missing slot - finding a node to connect to
// select a node record from the relavant kaddb row (of identical prox order)
ROW:
for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) {
count++
node = dbrow[cursor]
// skip already connected nodes
if node.node != nil {
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)))
continue ROW
}
// if node is scheduled to connect
if time.Time(node.After).After(time.Now()) {
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
continue ROW
}
delta = time.Since(time.Time(node.Seen))
if delta < self.initialRetryInterval {
delta = self.initialRetryInterval
}
if delta > self.purgeInterval {
// remove node
purge[cursor] = true
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen))
continue ROW
}
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
// scheduling next check
interval = time.Duration(delta * time.Duration(self.connRetryExp))
after = time.Now().Add(interval)
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval))
node.After = after
found = true
} // ROW
self.cursors[po] = cursor
self.delete(po, purge)
if found {
return node, need, proxLimit
}
} // ROUND
} // ROUNDS
return nil, need, proxLimit
}
// deletes the noderecords of a kaddb row corresponding to the indexes
// caller must hold the dblock
// the call is unsafe, no index checks
func (self *KadDb) delete(row int, purge []bool) {
var nodes []*NodeRecord
dbrow := self.Nodes[row]
for i, del := range purge {
if i == self.cursors[row] {
//reset cursor
self.cursors[row] = len(nodes)
}
// delete the entry to be purged
if del {
delete(self.index, dbrow[i].Addr)
continue
}
// otherwise append to new list
nodes = append(nodes, dbrow[i])
}
self.Nodes[row] = nodes
}
// save persists kaddb on disk (written to file on path in json format.
func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
defer self.lock.Unlock()
self.lock.Lock()
var n int
for _, b := range self.Nodes {
for _, node := range b {
n++
node.After = time.Now()
node.Seen = time.Now()
if cb != nil {
cb(node, node.node)
}
}
}
data, err := json.MarshalIndent(self, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(path, data, os.ModePerm)
if err != nil {
log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: err", n, path, err))
} else {
log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path))
}
return err
}
// Load(path) loads the node record database (kaddb) from file on path.
func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) {
defer self.lock.Unlock()
self.lock.Lock()
var data []byte
data, err = ioutil.ReadFile(path)
if err != nil {
return
}
err = json.Unmarshal(data, self)
if err != nil {
return
}
var n int
var purge []bool
for po, b := range self.Nodes {
purge = make([]bool, len(b))
ROW:
for i, node := range b {
if cb != nil {
err = cb(node, node.node)
if err != nil {
purge[i] = true
continue ROW
}
}
n++
if (node.After == time.Time{}) {
node.After = time.Now()
}
self.index[node.Addr] = node
}
self.delete(po, purge)
}
log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path))
return
}
// accessor for KAD offline db count
func (self *KadDb) count() int {
defer self.lock.Unlock()
self.lock.Lock()
return len(self.index)
}
|