aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-27 22:38:28 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-27 22:38:28 +0800
commit437cf4b3acf1b5a4efde64aacaacdf14289010d1 (patch)
treeaea6cf153def4fae3ef217ac9112334fc8de2067
parenta136e2bb222ed0eaa9b5e5a31a07fcc664de8eb7 (diff)
downloaddexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.tar
dexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.tar.gz
dexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.tar.bz2
dexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.tar.lz
dexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.tar.xz
dexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.tar.zst
dexon-437cf4b3acf1b5a4efde64aacaacdf14289010d1.zip
p2p/discover: add node expirer and related tests
-rw-r--r--p2p/discover/database.go71
-rw-r--r--p2p/discover/database_test.go47
-rw-r--r--p2p/discover/table.go2
3 files changed, 116 insertions, 4 deletions
diff --git a/p2p/discover/database.go b/p2p/discover/database.go
index b7c0c0498..48539a6c9 100644
--- a/p2p/discover/database.go
+++ b/p2p/discover/database.go
@@ -7,6 +7,7 @@ import (
"bytes"
"encoding/binary"
"os"
+ "sync"
"time"
"github.com/ethereum/go-ethereum/logger"
@@ -17,13 +18,19 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage"
)
-// Special node ID to use as a nil element.
-var nodeDBNilNodeID = NodeID{}
+var (
+ nodeDBNilNodeID = NodeID{} // Special node ID to use as a nil element.
+ nodeDBNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
+ nodeDBCleanupCycle = time.Hour // Time period for running the expiration task.
+)
// nodeDB stores all nodes we know about.
type nodeDB struct {
lvl *leveldb.DB // Interface to the database itself
seeder iterator.Iterator // Iterator for fetching possible seed nodes
+
+ runner sync.Once // Ensures we can start at most one expirer
+ quit chan struct{} // Channel to signal the expiring thread to stop
}
// Schema layout for the node database
@@ -53,7 +60,10 @@ func newMemoryNodeDB() (*nodeDB, error) {
if err != nil {
return nil, err
}
- return &nodeDB{lvl: db}, nil
+ return &nodeDB{
+ lvl: db,
+ quit: make(chan struct{}),
+ }, nil
}
// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
@@ -91,7 +101,10 @@ func newPersistentNodeDB(path string, version int) (*nodeDB, error) {
return newPersistentNodeDB(path, version)
}
}
- return &nodeDB{lvl: db}, nil
+ return &nodeDB{
+ lvl: db,
+ quit: make(chan struct{}),
+ }, nil
}
// makeKey generates the leveldb key-blob from a node id and its particular
@@ -164,6 +177,55 @@ func (db *nodeDB) updateNode(node *Node) error {
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
}
+// expirer should be started in a go routine, and is responsible for looping ad
+// infinitum and dropping stale data from the database.
+func (db *nodeDB) expirer() {
+ db.runner.Do(func() {
+ tick := time.Tick(nodeDBCleanupCycle)
+ for {
+ select {
+ case <-tick:
+ if err := db.expireNodes(); err != nil {
+ glog.V(logger.Error).Infof("Failed to expire nodedb items: %v", err)
+ }
+
+ case <-db.quit:
+ return
+ }
+ }
+ })
+}
+
+// expireNodes iterates over the database and deletes all nodes that have not
+// been seen (i.e. received a pong from) for some alloted time.
+func (db *nodeDB) expireNodes() error {
+ threshold := time.Now().Add(-nodeDBNodeExpiration)
+
+ // Find discovered nodes that are older than the allowance
+ it := db.lvl.NewIterator(nil, nil)
+ defer it.Release()
+
+ for it.Next() {
+ // Skip the item if not a discovery node
+ id, field := splitKey(it.Key())
+ if field != nodeDBDiscoverRoot {
+ continue
+ }
+ // Skip the node if not expired yet
+ if seen := db.lastPong(id); seen.After(threshold) {
+ continue
+ }
+ // Otherwise delete all associated information
+ prefix := makeKey(id, "")
+ for ok := it.Seek(prefix); ok && bytes.HasPrefix(it.Key(), prefix); ok = it.Next() {
+ if err := db.lvl.Delete(it.Key(), nil); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
// lastPing retrieves the time of the last ping packet send to a remote node,
// requesting binding.
func (db *nodeDB) lastPing(id NodeID) time.Time {
@@ -226,5 +288,6 @@ func (db *nodeDB) close() {
if db.seeder != nil {
db.seeder.Release()
}
+ close(db.quit)
db.lvl.Close()
}
diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go
index 0412a4770..f327cf73b 100644
--- a/p2p/discover/database_test.go
+++ b/p2p/discover/database_test.go
@@ -264,3 +264,50 @@ func TestNodeDBPersistency(t *testing.T) {
}
db.close()
}
+
+var nodeDBExpirationNodes = []struct {
+ node Node
+ pong time.Time
+ exp bool
+}{
+ {
+ node: Node{
+ ID: MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ IP: []byte{127, 0, 0, 1},
+ },
+ pong: time.Now().Add(-nodeDBNodeExpiration + time.Minute),
+ exp: false,
+ }, {
+ node: Node{
+ ID: MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ IP: []byte{127, 0, 0, 2},
+ },
+ pong: time.Now().Add(-nodeDBNodeExpiration - time.Minute),
+ exp: true,
+ },
+}
+
+func TestNodeDBExpiration(t *testing.T) {
+ db, _ := newNodeDB("", Version)
+ defer db.close()
+
+ // Add all the test nodes and set their last pong time
+ for i, seed := range nodeDBExpirationNodes {
+ if err := db.updateNode(&seed.node); err != nil {
+ t.Fatalf("node %d: failed to insert: %v", i, err)
+ }
+ if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil {
+ t.Fatalf("node %d: failed to update pong: %v", i, err)
+ }
+ }
+ // Expire some of them, and check the rest
+ if err := db.expireNodes(); err != nil {
+ t.Fatalf("failed to expire nodes: %v", err)
+ }
+ for i, seed := range nodeDBExpirationNodes {
+ node := db.node(seed.node.ID)
+ if (node == nil && !seed.exp) || (node != nil && seed.exp) {
+ t.Errorf("node %d: expiration mismatch: have %v, want %v", i, node, seed.exp)
+ }
+ }
+}
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 11bdff198..060aa7c09 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -335,6 +335,8 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
}
// Pong received, update the database and return
tab.db.updateLastPong(id, time.Now())
+ go tab.db.expirer()
+
return nil
}