aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/table.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r--p2p/discover/table.go168
1 files changed, 106 insertions, 62 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 972bc1077..c128c2ed1 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -44,6 +44,10 @@ const (
maxBondingPingPongs = 16
maxFindnodeFailures = 5
+
+ autoRefreshInterval = 1 * time.Hour
+ seedCount = 30
+ seedMaxAge = 5 * 24 * time.Hour
)
type Table struct {
@@ -52,6 +56,10 @@ type Table struct {
nursery []*Node // bootstrap nodes
db *nodeDB // database of known nodes
+ refreshReq chan struct{}
+ closeReq chan struct{}
+ closed chan struct{}
+
bondmu sync.Mutex
bonding map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes
@@ -80,10 +88,7 @@ type transport interface {
// bucket contains nodes, ordered by their last activity. the entry
// that was most recently active is the first element in entries.
-type bucket struct {
- lastLookup time.Time
- entries []*Node
-}
+type bucket struct{ entries []*Node }
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
// If no node database was given, use an in-memory one
@@ -93,11 +98,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
db, _ = newNodeDB("", Version, ourID)
}
tab := &Table{
- net: t,
- db: db,
- self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
- bonding: make(map[NodeID]*bondproc),
- bondslots: make(chan struct{}, maxBondingPingPongs),
+ net: t,
+ db: db,
+ self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
+ bonding: make(map[NodeID]*bondproc),
+ bondslots: make(chan struct{}, maxBondingPingPongs),
+ refreshReq: make(chan struct{}),
+ closeReq: make(chan struct{}),
+ closed: make(chan struct{}),
}
for i := 0; i < cap(tab.bondslots); i++ {
tab.bondslots <- struct{}{}
@@ -105,6 +113,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
for i := range tab.buckets {
tab.buckets[i] = new(bucket)
}
+ go tab.refreshLoop()
return tab
}
@@ -163,10 +172,12 @@ func randUint(max uint32) uint32 {
// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
- if tab.net != nil {
- tab.net.close()
+ select {
+ case <-tab.closed:
+ // already closed.
+ case tab.closeReq <- struct{}{}:
+ <-tab.closed // wait for refreshLoop to end.
}
- tab.db.close()
}
// Bootstrap sets the bootstrap nodes. These nodes are used to connect
@@ -183,7 +194,7 @@ func (tab *Table) Bootstrap(nodes []*Node) {
tab.nursery = append(tab.nursery, &cpy)
}
tab.mutex.Unlock()
- tab.refresh()
+ tab.requestRefresh()
}
// Lookup performs a network search for nodes close
@@ -204,15 +215,13 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
asked[tab.self.ID] = true
tab.mutex.Lock()
- // update last lookup stamp (for refresh logic)
- tab.buckets[logdist(tab.self.sha, target)].lastLookup = time.Now()
// generate initial result set
result := tab.closest(target, bucketSize)
tab.mutex.Unlock()
- // If the result set is empty, all nodes were dropped, refresh
+ // If the result set is empty, all nodes were dropped, refresh.
if len(result.entries) == 0 {
- tab.refresh()
+ tab.requestRefresh()
return nil
}
@@ -257,56 +266,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
return result.entries
}
-// refresh performs a lookup for a random target to keep buckets full, or seeds
-// the table if it is empty (initial bootstrap or discarded faulty peers).
-func (tab *Table) refresh() {
- seed := true
+func (tab *Table) requestRefresh() {
+ select {
+ case tab.refreshReq <- struct{}{}:
+ case <-tab.closed:
+ }
+}
- // If the discovery table is empty, seed with previously known nodes
- tab.mutex.Lock()
- for _, bucket := range tab.buckets {
- if len(bucket.entries) > 0 {
- seed = false
- break
+func (tab *Table) refreshLoop() {
+ defer func() {
+ tab.db.close()
+ if tab.net != nil {
+ tab.net.close()
}
- }
- tab.mutex.Unlock()
+ close(tab.closed)
+ }()
- // If the table is not empty, try to refresh using the live entries
- if !seed {
- // The Kademlia paper specifies that the bucket refresh should
- // perform a refresh in the least recently used bucket. We cannot
- // adhere to this because the findnode target is a 512bit value
- // (not hash-sized) and it is not easily possible to generate a
- // sha3 preimage that falls into a chosen bucket.
- //
- // We perform a lookup with a random target instead.
- var target NodeID
- rand.Read(target[:])
-
- result := tab.Lookup(target)
- if len(result) == 0 {
- // Lookup failed, seed after all
- seed = true
+ timer := time.NewTicker(autoRefreshInterval)
+ var done chan struct{}
+ for {
+ select {
+ case <-timer.C:
+ if done == nil {
+ done = make(chan struct{})
+ go tab.doRefresh(done)
+ }
+ case <-tab.refreshReq:
+ if done == nil {
+ done = make(chan struct{})
+ go tab.doRefresh(done)
+ }
+ case <-done:
+ done = nil
+ case <-tab.closeReq:
+ if done != nil {
+ <-done
+ }
+ return
}
}
+}
- if seed {
- // Pick a batch of previously know seeds to lookup with
- seeds := tab.db.querySeeds(10)
- for _, seed := range seeds {
- glog.V(logger.Debug).Infoln("Seeding network with", seed)
- }
- nodes := append(tab.nursery, seeds...)
+// doRefresh performs a lookup for a random target to keep buckets
+// full. seed nodes are inserted if the table is empty (initial
+// bootstrap or discarded faulty peers).
+func (tab *Table) doRefresh(done chan struct{}) {
+ defer close(done)
+
+ // The Kademlia paper specifies that the bucket refresh should
+ // perform a lookup in the least recently used bucket. We cannot
+ // adhere to this because the findnode target is a 512bit value
+ // (not hash-sized) and it is not easily possible to generate a
+ // sha3 preimage that falls into a chosen bucket.
+ // We perform a lookup with a random target instead.
+ var target NodeID
+ rand.Read(target[:])
+ result := tab.Lookup(target)
+ if len(result) > 0 {
+ return
+ }
- // Bond with all the seed nodes (will pingpong only if failed recently)
- bonded := tab.bondall(nodes)
- if len(bonded) > 0 {
- tab.Lookup(tab.self.ID)
+ // The table is empty. Load nodes from the database and insert
+ // them. This should yield a few previously seen nodes that are
+ // (hopefully) still alive.
+ seeds := tab.db.querySeeds(seedCount, seedMaxAge)
+ seeds = tab.bondall(append(seeds, tab.nursery...))
+ if glog.V(logger.Debug) {
+ if len(seeds) == 0 {
+ glog.Infof("no seed nodes found")
+ }
+ for _, n := range seeds {
+ age := time.Since(tab.db.lastPong(n.ID))
+ glog.Infof("seed node (age %v): %v", age, n)
}
- // TODO: the Kademlia paper says that we're supposed to perform
- // random lookups in all buckets further away than our closest neighbor.
}
+ tab.mutex.Lock()
+ tab.stuff(seeds)
+ tab.mutex.Unlock()
+
+ // Finally, do a self lookup to fill up the buckets.
+ tab.Lookup(tab.self.ID)
}
// closest returns the n nodes in the table that are closest to the
@@ -373,8 +412,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
}
// If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
var result error
- if node == nil || fails > 0 {
- glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails)
+ age := time.Since(tab.db.lastPong(id))
+ if node == nil || fails > 0 || age > nodeDBNodeExpiration {
+ glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age)
tab.bondmu.Lock()
w := tab.bonding[id]
@@ -435,13 +475,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// ping a remote endpoint and wait for a reply, also updating the node
// database accordingly.
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
- // Update the last ping and send the message
tab.db.updateLastPing(id, time.Now())
if err := tab.net.ping(id, addr); err != nil {
return err
}
- // Pong received, update the database and return
tab.db.updateLastPong(id, time.Now())
+
+ // Start the background expiration goroutine after the first
+ // successful communication. Subsequent calls have no effect if it
+ // is already running. We do this here instead of somewhere else
+ // so that the search for seed nodes also considers older nodes
+ // that would otherwise be removed by the expiration.
tab.db.ensureExpirer()
return nil
}