aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-24 16:19:33 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-24 16:33:55 +0800
commit6def110c37d4d43402c4b658ce6b291400f840e5 (patch)
treefc1dbd81120ba8a51599bfb3dbc41f0eed9a2005 /p2p/discover
parent971702e7a1a5e698721fa6147c444abad9c20141 (diff)
downloadgo-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.tar
go-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.tar.gz
go-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.tar.bz2
go-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.tar.lz
go-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.tar.xz
go-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.tar.zst
go-tangerine-6def110c37d4d43402c4b658ce6b291400f840e5.zip
cmd/bootnode, eth, p2p, p2p/discover: clean up the seeder and mesh into eth.
Diffstat (limited to 'p2p/discover')
-rw-r--r--p2p/discover/cache.go134
-rw-r--r--p2p/discover/node.go114
-rw-r--r--p2p/discover/table.go23
-rw-r--r--p2p/discover/table_test.go6
-rw-r--r--p2p/discover/udp.go10
-rw-r--r--p2p/discover/udp_test.go4
6 files changed, 154 insertions, 137 deletions
diff --git a/p2p/discover/cache.go b/p2p/discover/cache.go
new file mode 100644
index 000000000..f6bab4591
--- /dev/null
+++ b/p2p/discover/cache.go
@@ -0,0 +1,134 @@
+// Contains the discovery cache, storing previously seen nodes to act as seed
+// servers during bootstrapping the network.
+
+package discover
+
+import (
+ "bytes"
+ "encoding/binary"
+ "net"
+ "os"
+
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+// Cache stores all nodes we know about.
+type Cache struct {
+ db *leveldb.DB
+}
+
+// Cache version to allow dumping old data if it changes.
+var cacheVersionKey = []byte("pv")
+
+// NewMemoryCache creates a new in-memory peer cache without a persistent backend.
+func NewMemoryCache() (*Cache, error) {
+ db, err := leveldb.Open(storage.NewMemStorage(), nil)
+ if err != nil {
+ return nil, err
+ }
+ return &Cache{db: db}, nil
+}
+
+// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also
+// flushing its contents in case of a version mismatch.
+func NewPersistentCache(path string) (*Cache, error) {
+ // Try to open the cache, recovering any corruption
+ db, err := leveldb.OpenFile(path, nil)
+ if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted {
+ db, err = leveldb.RecoverFile(path, nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+ // The nodes contained in the cache correspond to a certain protocol version.
+ // Flush all nodes if the version doesn't match.
+ currentVer := make([]byte, binary.MaxVarintLen64)
+ currentVer = currentVer[:binary.PutVarint(currentVer, Version)]
+
+ blob, err := db.Get(cacheVersionKey, nil)
+ switch err {
+ case leveldb.ErrNotFound:
+ // Version not found (i.e. empty cache), insert it
+ err = db.Put(cacheVersionKey, currentVer, nil)
+
+ case nil:
+ // Version present, flush if different
+ if !bytes.Equal(blob, currentVer) {
+ db.Close()
+ if err = os.RemoveAll(path); err != nil {
+ return nil, err
+ }
+ return NewPersistentCache(path)
+ }
+ }
+ // Clean up in case of an error
+ if err != nil {
+ db.Close()
+ return nil, err
+ }
+ return &Cache{db: db}, nil
+}
+
+// get retrieves a node with a given id from the seed da
+func (c *Cache) get(id NodeID) *Node {
+ blob, err := c.db.Get(id[:], nil)
+ if err != nil {
+ return nil
+ }
+ node := new(Node)
+ if err := rlp.DecodeBytes(blob, node); err != nil {
+ return nil
+ }
+ return node
+}
+
+// list retrieves a batch of nodes from the database.
+func (c *Cache) list(n int) []*Node {
+ it := c.db.NewIterator(nil, nil)
+ defer it.Release()
+
+ nodes := make([]*Node, 0, n)
+ for i := 0; i < n && it.Next(); i++ {
+ var id NodeID
+ copy(id[:], it.Key())
+
+ if node := c.get(id); node != nil {
+ nodes = append(nodes, node)
+ }
+ }
+ return nodes
+}
+
+// update inserts - potentially overwriting - a node in the seed database.
+func (c *Cache) update(node *Node) error {
+ blob, err := rlp.EncodeToBytes(node)
+ if err != nil {
+ return err
+ }
+ return c.db.Put(node.ID[:], blob, nil)
+}
+
+// add inserts a new node into the seed database.
+func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node {
+ node := &Node{
+ ID: id,
+ IP: addr.IP,
+ DiscPort: addr.Port,
+ TCPPort: int(tcpPort),
+ }
+ c.update(node)
+
+ return node
+}
+
+// delete removes a node from the database.
+func (c *Cache) delete(id NodeID) error {
+ return c.db.Delete(id[:], nil)
+}
+
+// Close flushes and closes the database files.
+func (c *Cache) Close() {
+ c.db.Close()
+}
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index 0ec9630d3..e66ca37a4 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -1,10 +1,8 @@
package discover
import (
- "bytes"
"crypto/ecdsa"
"crypto/elliptic"
- "encoding/binary"
"encoding/hex"
"errors"
"fmt"
@@ -13,16 +11,12 @@ import (
"math/rand"
"net"
"net/url"
- "os"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/rlp"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
)
const nodeIDBits = 512
@@ -310,111 +304,3 @@ func randomID(a NodeID, n int) (b NodeID) {
}
return b
}
-
-// nodeDB stores all nodes we know about.
-type nodeDB struct {
- ldb *leveldb.DB
-}
-
-var dbVersionKey = []byte("pv")
-
-// Opens the backing LevelDB. If path is "", we use an in-memory database.
-func newNodeDB(path string, version int64) (db *nodeDB, err error) {
- db = new(nodeDB)
- opts := new(opt.Options)
- if path == "" {
- db.ldb, err = leveldb.Open(storage.NewMemStorage(), opts)
- } else {
- db.ldb, err = openNodeDB(path, opts, version)
- }
- return db, err
-}
-
-// openNodeDB opens a persistent seed cache, flushing old versions.
-func openNodeDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) {
- ldb, err := leveldb.OpenFile(path, opts)
- if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted {
- ldb, err = leveldb.RecoverFile(path, opts)
- }
- if err != nil {
- return nil, err
- }
- // The nodes contained in the database correspond to a certain
- // protocol version. Flush all nodes if the DB version doesn't match.
- // There is no need to do this for memory databases because they
- // won't ever be used with a different protocol version.
- shouldVal := make([]byte, binary.MaxVarintLen64)
- shouldVal = shouldVal[:binary.PutVarint(shouldVal, version)]
- val, err := ldb.Get(dbVersionKey, nil)
- if err == leveldb.ErrNotFound {
- err = ldb.Put(dbVersionKey, shouldVal, nil)
- } else if err == nil && !bytes.Equal(val, shouldVal) {
- // Delete and start over.
- ldb.Close()
- if err = os.RemoveAll(path); err != nil {
- return nil, err
- }
- return openNodeDB(path, opts, version)
- }
- if err != nil {
- ldb.Close()
- ldb = nil
- }
- return ldb, err
-}
-
-// get retrieves a node with a given id from the seed da
-func (db *nodeDB) get(id NodeID) *Node {
- v, err := db.ldb.Get(id[:], nil)
- if err != nil {
- return nil
- }
- n := new(Node)
- if err := rlp.DecodeBytes(v, n); err != nil {
- return nil
- }
- return n
-}
-
-// list retrieves a batch of nodes from the database.
-func (db *nodeDB) list(n int) []*Node {
- it := db.ldb.NewIterator(nil, nil)
- defer it.Release()
-
- nodes := make([]*Node, 0, n)
- for i := 0; i < n && it.Next(); i++ {
- var id NodeID
- copy(id[:], it.Key())
-
- if node := db.get(id); node != nil {
- nodes = append(nodes, node)
- }
- }
- return nodes
-}
-
-// update inserts - potentially overwriting - a node in the seed database.
-func (db *nodeDB) update(n *Node) error {
- v, err := rlp.EncodeToBytes(n)
- if err != nil {
- return err
- }
- return db.ldb.Put(n.ID[:], v, nil)
-}
-
-// add inserts a new node into the seed database.
-func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node {
- n := &Node{ID: id, IP: addr.IP, DiscPort: addr.Port, TCPPort: int(tcpPort)}
- db.update(n)
- return n
-}
-
-// delete removes a node from the database.
-func (db *nodeDB) delete(id NodeID) error {
- return db.ldb.Delete(id[:], nil)
-}
-
-// close flushes and closes the database files.
-func (db *nodeDB) close() {
- db.ldb.Close()
-}
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index fa791c9f3..98371d6f9 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -27,6 +27,7 @@ type Table struct {
mutex sync.Mutex // protects buckets, their content, and nursery
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*Node // bootstrap nodes
+ cache *Cache // cache of known nodes
bondmu sync.Mutex
bonding map[NodeID]*bondproc
@@ -34,7 +35,6 @@ type Table struct {
net transport
self *Node // metadata of the local node
- db *nodeDB
}
type bondproc struct {
@@ -61,17 +61,15 @@ type bucket struct {
entries []*Node
}
-func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seedCache string) *Table {
- // Load the bootstrap seed cache (use in memory db upon failure)
- db, err := newNodeDB(seedCache, Version)
- if err != nil {
- glog.V(logger.Warn).Infoln("Failed to open bootstrap seed cache:", err)
- db, _ = newNodeDB("", Version)
+func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table {
+ // If no seed cache was given, use an in-memory one
+ if seeder == nil {
+ seeder, _ = NewMemoryCache()
}
// Create the bootstrap table
tab := &Table{
net: t,
- db: db,
+ cache: seeder,
self: newNode(ourID, ourAddr),
bonding: make(map[NodeID]*bondproc),
bondslots: make(chan struct{}, maxBondingPingPongs),
@@ -93,7 +91,6 @@ func (tab *Table) Self() *Node {
// Close terminates the network listener and flushes the seed cache.
func (tab *Table) Close() {
tab.net.close()
- tab.db.close()
}
// Bootstrap sets the bootstrap nodes. These nodes are used to connect
@@ -178,10 +175,10 @@ func (tab *Table) refresh() {
result := tab.Lookup(randomID(tab.self.ID, ld))
if len(result) == 0 {
// Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live)
- seeds := tab.db.list(10)
+ seeds := tab.cache.list(10)
for _, seed := range seeds {
glog.V(logger.Debug).Infoln("Seeding network with:", seed)
- tab.db.delete(seed.ID)
+ tab.cache.delete(seed.ID)
}
// Bootstrap the table with a self lookup
all := tab.bondall(append(tab.nursery, seeds...))
@@ -252,7 +249,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
// of the process can be skipped.
func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
var n *Node
- if n = tab.db.get(id); n == nil {
+ if n = tab.cache.get(id); n == nil {
tab.bondmu.Lock()
w := tab.bonding[id]
if w != nil {
@@ -297,7 +294,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// waitping will simply time out.
tab.net.waitping(id)
}
- w.n = tab.db.add(id, addr, tcpPort)
+ w.n = tab.cache.add(id, addr, tcpPort)
close(w.done)
}
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index e2bd3c8ad..8274731e3 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -15,7 +15,7 @@ import (
func TestTable_pingReplace(t *testing.T) {
doit := func(newNodeIsResponding, lastInBucketIsResponding bool) {
transport := newPingRecorder()
- tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "")
+ tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil)
last := fillBucket(tab, 200)
pingSender := randomID(tab.self.ID, 200)
@@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) {
test := func(test *closeTest) bool {
// for any node table, Target and N
- tab := newTable(nil, test.Self, &net.UDPAddr{}, "")
+ tab := newTable(nil, test.Self, &net.UDPAddr{}, nil)
tab.add(test.All)
// check that doClosest(Target, N) returns nodes
@@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) {
self := gen(NodeID{}, quickrand).(NodeID)
target := randomID(self, 200)
transport := findnodeOracle{t, target}
- tab := newTable(transport, self, &net.UDPAddr{}, "")
+ tab := newTable(transport, self, &net.UDPAddr{}, nil)
// lookup on empty table returns no nodes
if results := tab.Lookup(target); len(results) > 0 {
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index c26703f19..6805fb686 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -144,7 +144,7 @@ type reply struct {
}
// ListenUDP returns a new table that listens for UDP packets on laddr.
-func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCache string) (*Table, error) {
+func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) {
addr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, err
@@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCac
if err != nil {
return nil, err
}
- tab, _ := newUDP(priv, conn, natm, seedCache)
+ tab, _ := newUDP(priv, conn, natm, seeder)
glog.V(logger.Info).Infoln("Listening,", tab.self)
return tab, nil
}
-func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string) (*Table, *udp) {
+func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) {
udp := &udp{
conn: c,
priv: priv,
@@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
- udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seedCache)
+ udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder)
go udp.loop()
go udp.readLoop()
return udp.Table, udp
@@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
if expired(req.Expiration) {
return errExpired
}
- if t.db.get(fromID) == nil {
+ if t.cache.get(fromID) == nil {
// No bond exists, we don't process the packet. This prevents
// an attack vector where the discovery protocol could be used
// to amplify traffic in a DDOS attack. A malicious actor
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index 782895e46..299f94543 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest {
remotekey: newkey(),
remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303},
}
- test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "")
+ test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil)
return test
}
@@ -157,7 +157,7 @@ func TestUDP_findnode(t *testing.T) {
// ensure there's a bond with the test node,
// findnode won't be accepted otherwise.
- test.table.db.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99)
+ test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99)
// check that closest neighbors are returned.
test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})