From 437cf4b3acf1b5a4efde64aacaacdf14289010d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 17:38:28 +0300 Subject: p2p/discover: add node expirer and related tests --- p2p/discover/database.go | 71 ++++++++++++++++++++++++++++++++++++++++--- p2p/discover/database_test.go | 47 ++++++++++++++++++++++++++++ p2p/discover/table.go | 2 ++ 3 files changed, 116 insertions(+), 4 deletions(-) (limited to 'p2p') diff --git a/p2p/discover/database.go b/p2p/discover/database.go index b7c0c0498..48539a6c9 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/binary" "os" + "sync" "time" "github.com/ethereum/go-ethereum/logger" @@ -17,13 +18,19 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -// Special node ID to use as a nil element. -var nodeDBNilNodeID = NodeID{} +var ( + nodeDBNilNodeID = NodeID{} // Special node ID to use as a nil element. + nodeDBNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped. + nodeDBCleanupCycle = time.Hour // Time period for running the expiration task. +) // nodeDB stores all nodes we know about. type nodeDB struct { lvl *leveldb.DB // Interface to the database itself seeder iterator.Iterator // Iterator for fetching possible seed nodes + + runner sync.Once // Ensures we can start at most one expirer + quit chan struct{} // Channel to signal the expiring thread to stop } // Schema layout for the node database @@ -53,7 +60,10 @@ func newMemoryNodeDB() (*nodeDB, error) { if err != nil { return nil, err } - return &nodeDB{lvl: db}, nil + return &nodeDB{ + lvl: db, + quit: make(chan struct{}), + }, nil } // newPersistentNodeDB creates/opens a leveldb backed persistent node database, @@ -91,7 +101,10 @@ func newPersistentNodeDB(path string, version int) (*nodeDB, error) { return newPersistentNodeDB(path, version) } } - return &nodeDB{lvl: db}, nil + return &nodeDB{ + lvl: db, + quit: make(chan struct{}), + }, nil } // makeKey generates the leveldb key-blob from a node id and its particular @@ -164,6 +177,55 @@ func (db *nodeDB) updateNode(node *Node) error { return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil) } +// expirer should be started in a go routine, and is responsible for looping ad +// infinitum and dropping stale data from the database. +func (db *nodeDB) expirer() { + db.runner.Do(func() { + tick := time.Tick(nodeDBCleanupCycle) + for { + select { + case <-tick: + if err := db.expireNodes(); err != nil { + glog.V(logger.Error).Infof("Failed to expire nodedb items: %v", err) + } + + case <-db.quit: + return + } + } + }) +} + +// expireNodes iterates over the database and deletes all nodes that have not +// been seen (i.e. received a pong from) for some alloted time. +func (db *nodeDB) expireNodes() error { + threshold := time.Now().Add(-nodeDBNodeExpiration) + + // Find discovered nodes that are older than the allowance + it := db.lvl.NewIterator(nil, nil) + defer it.Release() + + for it.Next() { + // Skip the item if not a discovery node + id, field := splitKey(it.Key()) + if field != nodeDBDiscoverRoot { + continue + } + // Skip the node if not expired yet + if seen := db.lastPong(id); seen.After(threshold) { + continue + } + // Otherwise delete all associated information + prefix := makeKey(id, "") + for ok := it.Seek(prefix); ok && bytes.HasPrefix(it.Key(), prefix); ok = it.Next() { + if err := db.lvl.Delete(it.Key(), nil); err != nil { + return err + } + } + } + return nil +} + // lastPing retrieves the time of the last ping packet send to a remote node, // requesting binding. func (db *nodeDB) lastPing(id NodeID) time.Time { @@ -226,5 +288,6 @@ func (db *nodeDB) close() { if db.seeder != nil { db.seeder.Release() } + close(db.quit) db.lvl.Close() } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 0412a4770..f327cf73b 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -264,3 +264,50 @@ func TestNodeDBPersistency(t *testing.T) { } db.close() } + +var nodeDBExpirationNodes = []struct { + node Node + pong time.Time + exp bool +}{ + { + node: Node{ + ID: MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 1}, + }, + pong: time.Now().Add(-nodeDBNodeExpiration + time.Minute), + exp: false, + }, { + node: Node{ + ID: MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 2}, + }, + pong: time.Now().Add(-nodeDBNodeExpiration - time.Minute), + exp: true, + }, +} + +func TestNodeDBExpiration(t *testing.T) { + db, _ := newNodeDB("", Version) + defer db.close() + + // Add all the test nodes and set their last pong time + for i, seed := range nodeDBExpirationNodes { + if err := db.updateNode(&seed.node); err != nil { + t.Fatalf("node %d: failed to insert: %v", i, err) + } + if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil { + t.Fatalf("node %d: failed to update pong: %v", i, err) + } + } + // Expire some of them, and check the rest + if err := db.expireNodes(); err != nil { + t.Fatalf("failed to expire nodes: %v", err) + } + for i, seed := range nodeDBExpirationNodes { + node := db.node(seed.node.ID) + if (node == nil && !seed.exp) || (node != nil && seed.exp) { + t.Errorf("node %d: expiration mismatch: have %v, want %v", i, node, seed.exp) + } + } +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 11bdff198..060aa7c09 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -335,6 +335,8 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { } // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) + go tab.db.expirer() + return nil } -- cgit v1.2.3