diff options
Diffstat (limited to 'p2p/discover')
-rw-r--r-- | p2p/discover/database.go | 96 | ||||
-rw-r--r-- | p2p/discover/database_test.go | 103 | ||||
-rw-r--r-- | p2p/discover/table.go | 161 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 3 | ||||
-rw-r--r-- | p2p/discover/udp.go | 7 |
5 files changed, 198 insertions, 172 deletions
diff --git a/p2p/discover/database.go b/p2p/discover/database.go index d5c594364..e8e3371ff 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -21,6 +21,7 @@ package discover import ( "bytes" + "crypto/rand" "encoding/binary" "os" "sync" @@ -46,11 +47,8 @@ var ( // nodeDB stores all nodes we know about. type nodeDB struct { - lvl *leveldb.DB // Interface to the database itself - seeder iterator.Iterator // Iterator for fetching possible seed nodes - - self NodeID // Own node id to prevent adding it into the database - + lvl *leveldb.DB // Interface to the database itself + self NodeID // Own node id to prevent adding it into the database runner sync.Once // Ensures we can start at most one expirer quit chan struct{} // Channel to signal the expiring thread to stop } @@ -302,52 +300,70 @@ func (db *nodeDB) updateFindFails(id NodeID, fails int) error { return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails)) } -// querySeeds retrieves a batch of nodes to be used as potential seed servers -// during bootstrapping the node into the network. -// -// Ideal seeds are the most recently seen nodes (highest probability to be still -// alive), but yet untried. However, since leveldb only supports dumb iteration -// we will instead start pulling in potential seeds that haven't been yet pinged -// since the start of the boot procedure. -// -// If the database runs out of potential seeds, we restart the startup counter -// and start iterating over the peers again. -func (db *nodeDB) querySeeds(n int) []*Node { - // Create a new seed iterator if none exists - if db.seeder == nil { - db.seeder = db.lvl.NewIterator(nil, nil) +// querySeeds retrieves random nodes to be used as potential seed nodes +// for bootstrapping. +func (db *nodeDB) querySeeds(n int, maxAge time.Duration) []*Node { + var ( + now = time.Now() + nodes = make([]*Node, 0, n) + it = db.lvl.NewIterator(nil, nil) + id NodeID + ) + defer it.Release() + +seek: + for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { + // Seek to a random entry. The first byte is incremented by a + // random amount each time in order to increase the likelihood + // of hitting all existing nodes in very small databases. + ctr := id[0] + rand.Read(id[:]) + id[0] = ctr + id[0]%16 + it.Seek(makeKey(id, nodeDBDiscoverRoot)) + + n := nextNode(it) + if n == nil { + id[0] = 0 + continue seek // iterator exhausted + } + if n.ID == db.self { + continue seek + } + if now.Sub(db.lastPong(n.ID)) > maxAge { + continue seek + } + for i := range nodes { + if nodes[i].ID == n.ID { + continue seek // duplicate + } + } + nodes = append(nodes, n) } - // Iterate over the nodes and find suitable seeds - nodes := make([]*Node, 0, n) - for len(nodes) < n && db.seeder.Next() { - // Iterate until a discovery node is found - id, field := splitKey(db.seeder.Key()) + return nodes +} + +// reads the next node record from the iterator, skipping over other +// database entries. +func nextNode(it iterator.Iterator) *Node { + for end := false; !end; end = !it.Next() { + id, field := splitKey(it.Key()) if field != nodeDBDiscoverRoot { continue } - // Dump it if its a self reference - if bytes.Compare(id[:], db.self[:]) == 0 { - db.deleteNode(id) + var n Node + if err := rlp.DecodeBytes(it.Value(), &n); err != nil { + if glog.V(logger.Warn) { + glog.Errorf("invalid node %x: %v", id, err) + } continue } - // Load it as a potential seed - if node := db.node(id); node != nil { - nodes = append(nodes, node) - } - } - // Release the iterator if we reached the end - if len(nodes) == 0 { - db.seeder.Release() - db.seeder = nil + return &n } - return nodes + return nil } // close flushes and closes the database files. func (db *nodeDB) close() { - if db.seeder != nil { - db.seeder.Release() - } close(db.quit) db.lvl.Close() } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 569585903..80c1a6ff2 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -162,9 +162,33 @@ var nodeDBSeedQueryNodes = []struct { node *Node pong time.Time }{ + // This one should not be in the result set because its last + // pong time is too far in the past. { node: newNode( - MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + MustHexID("0x84d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + net.IP{127, 0, 0, 3}, + 30303, + 30303, + ), + pong: time.Now().Add(-3 * time.Hour), + }, + // This one shouldn't be in in the result set because its + // nodeID is the local node's ID. + { + node: newNode( + MustHexID("0x57d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + net.IP{127, 0, 0, 3}, + 30303, + 30303, + ), + pong: time.Now().Add(-4 * time.Second), + }, + + // These should be in the result set. + { + node: newNode( + MustHexID("0x22d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 1}, 30303, 30303, @@ -173,7 +197,7 @@ var nodeDBSeedQueryNodes = []struct { }, { node: newNode( - MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + MustHexID("0x44d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 2}, 30303, 30303, @@ -182,7 +206,7 @@ var nodeDBSeedQueryNodes = []struct { }, { node: newNode( - MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + MustHexID("0xe2d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 3}, 30303, 30303, @@ -192,7 +216,7 @@ var nodeDBSeedQueryNodes = []struct { } func TestNodeDBSeedQuery(t *testing.T) { - db, _ := newNodeDB("", Version, NodeID{}) + db, _ := newNodeDB("", Version, nodeDBSeedQueryNodes[1].node.ID) defer db.close() // Insert a batch of nodes for querying @@ -200,20 +224,24 @@ func TestNodeDBSeedQuery(t *testing.T) { if err := db.updateNode(seed.node); err != nil { t.Fatalf("node %d: failed to insert: %v", i, err) } + if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil { + t.Fatalf("node %d: failed to insert lastPong: %v", i, err) + } } + // Retrieve the entire batch and check for duplicates - seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) - if len(seeds) != len(nodeDBSeedQueryNodes) { - t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)) - } + seeds := db.querySeeds(len(nodeDBSeedQueryNodes)*2, time.Hour) have := make(map[NodeID]struct{}) for _, seed := range seeds { have[seed.ID] = struct{}{} } want := make(map[NodeID]struct{}) - for _, seed := range nodeDBSeedQueryNodes { + for _, seed := range nodeDBSeedQueryNodes[2:] { want[seed.node.ID] = struct{}{} } + if len(seeds) != len(want) { + t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(want)) + } for id, _ := range have { if _, ok := want[id]; !ok { t.Errorf("extra seed: %v", id) @@ -224,63 +252,6 @@ func TestNodeDBSeedQuery(t *testing.T) { t.Errorf("missing seed: %v", id) } } - // Make sure the next batch is empty (seed EOF) - seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes)) - if len(seeds) != 0 { - t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0) - } -} - -func TestNodeDBSeedQueryContinuation(t *testing.T) { - db, _ := newNodeDB("", Version, NodeID{}) - defer db.close() - - // Insert a batch of nodes for querying - for i, seed := range nodeDBSeedQueryNodes { - if err := db.updateNode(seed.node); err != nil { - t.Fatalf("node %d: failed to insert: %v", i, err) - } - } - // Iteratively retrieve the batch, checking for an empty batch on reset - for i := 0; i < len(nodeDBSeedQueryNodes); i++ { - if seeds := db.querySeeds(1); len(seeds) != 1 { - t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) - } - } - if seeds := db.querySeeds(1); len(seeds) != 0 { - t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0) - } - for i := 0; i < len(nodeDBSeedQueryNodes); i++ { - if seeds := db.querySeeds(1); len(seeds) != 1 { - t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) - } - } -} - -func TestNodeDBSelfSeedQuery(t *testing.T) { - // Assign a node as self to verify evacuation - self := nodeDBSeedQueryNodes[0].node.ID - db, _ := newNodeDB("", Version, self) - defer db.close() - - // Insert a batch of nodes for querying - for i, seed := range nodeDBSeedQueryNodes { - if err := db.updateNode(seed.node); err != nil { - t.Fatalf("node %d: failed to insert: %v", i, err) - } - } - // Retrieve the entire batch and check that self was evacuated - seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) - if len(seeds) != len(nodeDBSeedQueryNodes)-1 { - t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)-1) - } - have := make(map[NodeID]struct{}) - for _, seed := range seeds { - have[seed.ID] = struct{}{} - } - if _, ok := have[self]; ok { - t.Errorf("self not evacuated") - } } func TestNodeDBPersistency(t *testing.T) { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 972bc1077..66afa52ea 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -44,6 +44,10 @@ const ( maxBondingPingPongs = 16 maxFindnodeFailures = 5 + + autoRefreshInterval = 1 * time.Hour + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) type Table struct { @@ -52,6 +56,10 @@ type Table struct { nursery []*Node // bootstrap nodes db *nodeDB // database of known nodes + refreshReq chan struct{} + closeReq chan struct{} + closed chan struct{} + bondmu sync.Mutex bonding map[NodeID]*bondproc bondslots chan struct{} // limits total number of active bonding processes @@ -93,11 +101,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string db, _ = newNodeDB("", Version, ourID) } tab := &Table{ - net: t, - db: db, - self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), - bonding: make(map[NodeID]*bondproc), - bondslots: make(chan struct{}, maxBondingPingPongs), + net: t, + db: db, + self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), + bonding: make(map[NodeID]*bondproc), + bondslots: make(chan struct{}, maxBondingPingPongs), + refreshReq: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), } for i := 0; i < cap(tab.bondslots); i++ { tab.bondslots <- struct{}{} @@ -105,6 +116,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string for i := range tab.buckets { tab.buckets[i] = new(bucket) } + go tab.refreshLoop() return tab } @@ -163,10 +175,12 @@ func randUint(max uint32) uint32 { // Close terminates the network listener and flushes the node database. func (tab *Table) Close() { - if tab.net != nil { - tab.net.close() + select { + case <-tab.closed: + // already closed. + case tab.closeReq <- struct{}{}: + <-tab.closed // wait for refreshLoop to end. } - tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -183,7 +197,7 @@ func (tab *Table) Bootstrap(nodes []*Node) { tab.nursery = append(tab.nursery, &cpy) } tab.mutex.Unlock() - tab.refresh() + tab.requestRefresh() } // Lookup performs a network search for nodes close @@ -210,9 +224,9 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { result := tab.closest(target, bucketSize) tab.mutex.Unlock() - // If the result set is empty, all nodes were dropped, refresh + // If the result set is empty, all nodes were dropped, refresh. if len(result.entries) == 0 { - tab.refresh() + tab.requestRefresh() return nil } @@ -257,56 +271,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { return result.entries } -// refresh performs a lookup for a random target to keep buckets full, or seeds -// the table if it is empty (initial bootstrap or discarded faulty peers). -func (tab *Table) refresh() { - seed := true +func (tab *Table) requestRefresh() { + select { + case tab.refreshReq <- struct{}{}: + case <-tab.closed: + } +} - // If the discovery table is empty, seed with previously known nodes - tab.mutex.Lock() - for _, bucket := range tab.buckets { - if len(bucket.entries) > 0 { - seed = false - break +func (tab *Table) refreshLoop() { + defer func() { + tab.db.close() + if tab.net != nil { + tab.net.close() } - } - tab.mutex.Unlock() + close(tab.closed) + }() - // If the table is not empty, try to refresh using the live entries - if !seed { - // The Kademlia paper specifies that the bucket refresh should - // perform a refresh in the least recently used bucket. We cannot - // adhere to this because the findnode target is a 512bit value - // (not hash-sized) and it is not easily possible to generate a - // sha3 preimage that falls into a chosen bucket. - // - // We perform a lookup with a random target instead. - var target NodeID - rand.Read(target[:]) - - result := tab.Lookup(target) - if len(result) == 0 { - // Lookup failed, seed after all - seed = true + timer := time.NewTicker(autoRefreshInterval) + var done chan struct{} + for { + select { + case <-timer.C: + if done == nil { + done = make(chan struct{}) + go tab.doRefresh(done) + } + case <-tab.refreshReq: + if done == nil { + done = make(chan struct{}) + go tab.doRefresh(done) + } + case <-done: + done = nil + case <-tab.closeReq: + if done != nil { + <-done + } + return } } +} - if seed { - // Pick a batch of previously know seeds to lookup with - seeds := tab.db.querySeeds(10) - for _, seed := range seeds { - glog.V(logger.Debug).Infoln("Seeding network with", seed) - } - nodes := append(tab.nursery, seeds...) +// doRefresh performs a lookup for a random target to keep buckets +// full. seed nodes are inserted if the table is empty (initial +// bootstrap or discarded faulty peers). +func (tab *Table) doRefresh(done chan struct{}) { + defer close(done) + + // The Kademlia paper specifies that the bucket refresh should + // perform a lookup in the least recently used bucket. We cannot + // adhere to this because the findnode target is a 512bit value + // (not hash-sized) and it is not easily possible to generate a + // sha3 preimage that falls into a chosen bucket. + // We perform a lookup with a random target instead. + var target NodeID + rand.Read(target[:]) + result := tab.Lookup(target) + if len(result) > 0 { + return + } - // Bond with all the seed nodes (will pingpong only if failed recently) - bonded := tab.bondall(nodes) - if len(bonded) > 0 { - tab.Lookup(tab.self.ID) + // The table is empty. Load nodes from the database and insert + // them. This should yield a few previously seen nodes that are + // (hopefully) still alive. + seeds := tab.db.querySeeds(seedCount, seedMaxAge) + seeds = tab.bondall(append(seeds, tab.nursery...)) + if glog.V(logger.Debug) { + if len(seeds) == 0 { + glog.Infof("no seed nodes found") + } + for _, n := range seeds { + age := time.Since(tab.db.lastPong(n.ID)) + glog.Infof("seed node (age %v): %v", age, n) } - // TODO: the Kademlia paper says that we're supposed to perform - // random lookups in all buckets further away than our closest neighbor. } + tab.mutex.Lock() + tab.stuff(seeds) + tab.mutex.Unlock() + + // Finally, do a self lookup to fill up the buckets. + tab.Lookup(tab.self.ID) } // closest returns the n nodes in the table that are closest to the @@ -373,8 +417,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 } // If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch var result error - if node == nil || fails > 0 { - glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails) + age := time.Since(tab.db.lastPong(id)) + if node == nil || fails > 0 || age > nodeDBNodeExpiration { + glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age) tab.bondmu.Lock() w := tab.bonding[id] @@ -435,13 +480,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // ping a remote endpoint and wait for a reply, also updating the node // database accordingly. func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { - // Update the last ping and send the message tab.db.updateLastPing(id, time.Now()) if err := tab.net.ping(id, addr); err != nil { return err } - // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) + + // Start the background expiration goroutine after the first + // successful communication. Subsequent calls have no effect if it + // is already running. We do this here instead of somewhere else + // so that the search for seed nodes also considers older nodes + // that would otherwise be removed by the expiration. tab.db.ensureExpirer() return nil } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 426f4e9cc..84962a1a5 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -514,9 +514,6 @@ func (tn *preminedTestnet) findnode(toid NodeID, toaddr *net.UDPAddr, target Nod if toaddr.Port == 0 { panic("query to node at distance 0") } - if target != tn.target { - panic("findnode with wrong target") - } next := uint16(toaddr.Port) - 1 var result []*Node for i, id := range tn.dists[toaddr.Port] { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index afb31ee69..8f62598f2 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -52,8 +52,6 @@ const ( respTimeout = 500 * time.Millisecond sendTimeout = 500 * time.Millisecond expiration = 20 * time.Second - - refreshInterval = 1 * time.Hour ) // RPC packet types @@ -312,10 +310,8 @@ func (t *udp) loop() { plist = list.New() timeout = time.NewTimer(0) nextTimeout *pending // head of plist when timeout was last reset - refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout - defer refresh.Stop() defer timeout.Stop() resetTimeout := func() { @@ -344,9 +340,6 @@ func (t *udp) loop() { resetTimeout() select { - case <-refresh.C: - go t.refresh() - case <-t.closing: for el := plist.Front(); el != nil; el = el.Next() { el.Value.(*pending).errc <- errClosed |