From 6def110c37d4d43402c4b658ce6b291400f840e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 24 Apr 2015 11:19:33 +0300 Subject: cmd/bootnode, eth, p2p, p2p/discover: clean up the seeder and mesh into eth. --- cmd/bootnode/main.go | 2 +- eth/backend.go | 13 ++++- p2p/discover/cache.go | 134 +++++++++++++++++++++++++++++++++++++++++++++ p2p/discover/node.go | 114 -------------------------------------- p2p/discover/table.go | 23 ++++---- p2p/discover/table_test.go | 6 +- p2p/discover/udp.go | 10 ++-- p2p/discover/udp_test.go | 4 +- p2p/server.go | 6 +- 9 files changed, 168 insertions(+), 144 deletions(-) create mode 100644 p2p/discover/cache.go diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 26912525d..826604cdc 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -71,7 +71,7 @@ func main() { } } - if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, ""); err != nil { + if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, nil); err != nil { log.Fatal(err) } select {} diff --git a/eth/backend.go b/eth/backend.go index 382cfc832..039f730f1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -125,6 +125,8 @@ type Ethereum struct { blockDb common.Database // Block chain database stateDb common.Database // State changes database extraDb common.Database // Extra database (txs, etc) + seedDb *discover.Cache // Peer database seeding the bootstrap + // Closed when databases are flushed and closed databasesClosed chan bool @@ -179,7 +181,10 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } - seedDbPath := path.Join(config.DataDir, "seeds") + seedDb, err := discover.NewPersistentCache(path.Join(config.DataDir, "seeds")) + if err != nil { + return nil, err + } // Perform database sanity checks d, _ := blockDb.Get([]byte("ProtocolVersion")) @@ -207,6 +212,7 @@ func New(config *Config) (*Ethereum, error) { blockDb: blockDb, stateDb: stateDb, extraDb: extraDb, + seedDb: seedDb, eventMux: &event.TypeMux{}, accountManager: config.AccountManager, DataDir: config.DataDir, @@ -244,7 +250,7 @@ func New(config *Config) (*Ethereum, error) { NAT: config.NAT, NoDial: !config.Dial, BootstrapNodes: config.parseBootNodes(), - SeedCache: seedDbPath, + SeedCache: seedDb, } if len(config.Port) > 0 { eth.net.ListenAddr = ":" + config.Port @@ -423,6 +429,7 @@ done: } } + s.seedDb.Close() s.blockDb.Close() s.stateDb.Close() s.extraDb.Close() @@ -450,7 +457,7 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - s.txSub.Unsubscribe() // quits txBroadcastLoop + s.txSub.Unsubscribe() // quits txBroadcastLoop s.protocolManager.Stop() s.txPool.Stop() diff --git a/p2p/discover/cache.go b/p2p/discover/cache.go new file mode 100644 index 000000000..f6bab4591 --- /dev/null +++ b/p2p/discover/cache.go @@ -0,0 +1,134 @@ +// Contains the discovery cache, storing previously seen nodes to act as seed +// servers during bootstrapping the network. + +package discover + +import ( + "bytes" + "encoding/binary" + "net" + "os" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +// Cache stores all nodes we know about. +type Cache struct { + db *leveldb.DB +} + +// Cache version to allow dumping old data if it changes. +var cacheVersionKey = []byte("pv") + +// NewMemoryCache creates a new in-memory peer cache without a persistent backend. +func NewMemoryCache() (*Cache, error) { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return nil, err + } + return &Cache{db: db}, nil +} + +// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also +// flushing its contents in case of a version mismatch. +func NewPersistentCache(path string) (*Cache, error) { + // Try to open the cache, recovering any corruption + db, err := leveldb.OpenFile(path, nil) + if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { + db, err = leveldb.RecoverFile(path, nil) + } + if err != nil { + return nil, err + } + // The nodes contained in the cache correspond to a certain protocol version. + // Flush all nodes if the version doesn't match. + currentVer := make([]byte, binary.MaxVarintLen64) + currentVer = currentVer[:binary.PutVarint(currentVer, Version)] + + blob, err := db.Get(cacheVersionKey, nil) + switch err { + case leveldb.ErrNotFound: + // Version not found (i.e. empty cache), insert it + err = db.Put(cacheVersionKey, currentVer, nil) + + case nil: + // Version present, flush if different + if !bytes.Equal(blob, currentVer) { + db.Close() + if err = os.RemoveAll(path); err != nil { + return nil, err + } + return NewPersistentCache(path) + } + } + // Clean up in case of an error + if err != nil { + db.Close() + return nil, err + } + return &Cache{db: db}, nil +} + +// get retrieves a node with a given id from the seed da +func (c *Cache) get(id NodeID) *Node { + blob, err := c.db.Get(id[:], nil) + if err != nil { + return nil + } + node := new(Node) + if err := rlp.DecodeBytes(blob, node); err != nil { + return nil + } + return node +} + +// list retrieves a batch of nodes from the database. +func (c *Cache) list(n int) []*Node { + it := c.db.NewIterator(nil, nil) + defer it.Release() + + nodes := make([]*Node, 0, n) + for i := 0; i < n && it.Next(); i++ { + var id NodeID + copy(id[:], it.Key()) + + if node := c.get(id); node != nil { + nodes = append(nodes, node) + } + } + return nodes +} + +// update inserts - potentially overwriting - a node in the seed database. +func (c *Cache) update(node *Node) error { + blob, err := rlp.EncodeToBytes(node) + if err != nil { + return err + } + return c.db.Put(node.ID[:], blob, nil) +} + +// add inserts a new node into the seed database. +func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { + node := &Node{ + ID: id, + IP: addr.IP, + DiscPort: addr.Port, + TCPPort: int(tcpPort), + } + c.update(node) + + return node +} + +// delete removes a node from the database. +func (c *Cache) delete(id NodeID) error { + return c.db.Delete(id[:], nil) +} + +// Close flushes and closes the database files. +func (c *Cache) Close() { + c.db.Close() +} diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 0ec9630d3..e66ca37a4 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -1,10 +1,8 @@ package discover import ( - "bytes" "crypto/ecdsa" "crypto/elliptic" - "encoding/binary" "encoding/hex" "errors" "fmt" @@ -13,16 +11,12 @@ import ( "math/rand" "net" "net/url" - "os" "strconv" "strings" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/storage" ) const nodeIDBits = 512 @@ -310,111 +304,3 @@ func randomID(a NodeID, n int) (b NodeID) { } return b } - -// nodeDB stores all nodes we know about. -type nodeDB struct { - ldb *leveldb.DB -} - -var dbVersionKey = []byte("pv") - -// Opens the backing LevelDB. If path is "", we use an in-memory database. -func newNodeDB(path string, version int64) (db *nodeDB, err error) { - db = new(nodeDB) - opts := new(opt.Options) - if path == "" { - db.ldb, err = leveldb.Open(storage.NewMemStorage(), opts) - } else { - db.ldb, err = openNodeDB(path, opts, version) - } - return db, err -} - -// openNodeDB opens a persistent seed cache, flushing old versions. -func openNodeDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) { - ldb, err := leveldb.OpenFile(path, opts) - if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { - ldb, err = leveldb.RecoverFile(path, opts) - } - if err != nil { - return nil, err - } - // The nodes contained in the database correspond to a certain - // protocol version. Flush all nodes if the DB version doesn't match. - // There is no need to do this for memory databases because they - // won't ever be used with a different protocol version. - shouldVal := make([]byte, binary.MaxVarintLen64) - shouldVal = shouldVal[:binary.PutVarint(shouldVal, version)] - val, err := ldb.Get(dbVersionKey, nil) - if err == leveldb.ErrNotFound { - err = ldb.Put(dbVersionKey, shouldVal, nil) - } else if err == nil && !bytes.Equal(val, shouldVal) { - // Delete and start over. - ldb.Close() - if err = os.RemoveAll(path); err != nil { - return nil, err - } - return openNodeDB(path, opts, version) - } - if err != nil { - ldb.Close() - ldb = nil - } - return ldb, err -} - -// get retrieves a node with a given id from the seed da -func (db *nodeDB) get(id NodeID) *Node { - v, err := db.ldb.Get(id[:], nil) - if err != nil { - return nil - } - n := new(Node) - if err := rlp.DecodeBytes(v, n); err != nil { - return nil - } - return n -} - -// list retrieves a batch of nodes from the database. -func (db *nodeDB) list(n int) []*Node { - it := db.ldb.NewIterator(nil, nil) - defer it.Release() - - nodes := make([]*Node, 0, n) - for i := 0; i < n && it.Next(); i++ { - var id NodeID - copy(id[:], it.Key()) - - if node := db.get(id); node != nil { - nodes = append(nodes, node) - } - } - return nodes -} - -// update inserts - potentially overwriting - a node in the seed database. -func (db *nodeDB) update(n *Node) error { - v, err := rlp.EncodeToBytes(n) - if err != nil { - return err - } - return db.ldb.Put(n.ID[:], v, nil) -} - -// add inserts a new node into the seed database. -func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { - n := &Node{ID: id, IP: addr.IP, DiscPort: addr.Port, TCPPort: int(tcpPort)} - db.update(n) - return n -} - -// delete removes a node from the database. -func (db *nodeDB) delete(id NodeID) error { - return db.ldb.Delete(id[:], nil) -} - -// close flushes and closes the database files. -func (db *nodeDB) close() { - db.ldb.Close() -} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index fa791c9f3..98371d6f9 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -27,6 +27,7 @@ type Table struct { mutex sync.Mutex // protects buckets, their content, and nursery buckets [nBuckets]*bucket // index of known nodes by distance nursery []*Node // bootstrap nodes + cache *Cache // cache of known nodes bondmu sync.Mutex bonding map[NodeID]*bondproc @@ -34,7 +35,6 @@ type Table struct { net transport self *Node // metadata of the local node - db *nodeDB } type bondproc struct { @@ -61,17 +61,15 @@ type bucket struct { entries []*Node } -func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seedCache string) *Table { - // Load the bootstrap seed cache (use in memory db upon failure) - db, err := newNodeDB(seedCache, Version) - if err != nil { - glog.V(logger.Warn).Infoln("Failed to open bootstrap seed cache:", err) - db, _ = newNodeDB("", Version) +func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table { + // If no seed cache was given, use an in-memory one + if seeder == nil { + seeder, _ = NewMemoryCache() } // Create the bootstrap table tab := &Table{ net: t, - db: db, + cache: seeder, self: newNode(ourID, ourAddr), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), @@ -93,7 +91,6 @@ func (tab *Table) Self() *Node { // Close terminates the network listener and flushes the seed cache. func (tab *Table) Close() { tab.net.close() - tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -178,10 +175,10 @@ func (tab *Table) refresh() { result := tab.Lookup(randomID(tab.self.ID, ld)) if len(result) == 0 { // Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live) - seeds := tab.db.list(10) + seeds := tab.cache.list(10) for _, seed := range seeds { glog.V(logger.Debug).Infoln("Seeding network with:", seed) - tab.db.delete(seed.ID) + tab.cache.delete(seed.ID) } // Bootstrap the table with a self lookup all := tab.bondall(append(tab.nursery, seeds...)) @@ -252,7 +249,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { // of the process can be skipped. func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { var n *Node - if n = tab.db.get(id); n == nil { + if n = tab.cache.get(id); n == nil { tab.bondmu.Lock() w := tab.bonding[id] if w != nil { @@ -297,7 +294,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // waitping will simply time out. tab.net.waitping(id) } - w.n = tab.db.add(id, addr, tcpPort) + w.n = tab.cache.add(id, addr, tcpPort) close(w.done) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index e2bd3c8ad..8274731e3 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -15,7 +15,7 @@ import ( func TestTable_pingReplace(t *testing.T) { doit := func(newNodeIsResponding, lastInBucketIsResponding bool) { transport := newPingRecorder() - tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "") + tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil) last := fillBucket(tab, 200) pingSender := randomID(tab.self.ID, 200) @@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N - tab := newTable(nil, test.Self, &net.UDPAddr{}, "") + tab := newTable(nil, test.Self, &net.UDPAddr{}, nil) tab.add(test.All) // check that doClosest(Target, N) returns nodes @@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) { self := gen(NodeID{}, quickrand).(NodeID) target := randomID(self, 200) transport := findnodeOracle{t, target} - tab := newTable(transport, self, &net.UDPAddr{}, "") + tab := newTable(transport, self, &net.UDPAddr{}, nil) // lookup on empty table returns no nodes if results := tab.Lookup(target); len(results) > 0 { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index c26703f19..6805fb686 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -144,7 +144,7 @@ type reply struct { } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCache string) (*Table, error) { +func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) { addr, err := net.ResolveUDPAddr("udp", laddr) if err != nil { return nil, err @@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCac if err != nil { return nil, err } - tab, _ := newUDP(priv, conn, natm, seedCache) + tab, _ := newUDP(priv, conn, natm, seeder) glog.V(logger.Info).Infoln("Listening,", tab.self) return tab, nil } -func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string) (*Table, *udp) { +func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) { udp := &udp{ conn: c, priv: priv, @@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } - udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seedCache) + udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder) go udp.loop() go udp.readLoop() return udp.Table, udp @@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte if expired(req.Expiration) { return errExpired } - if t.db.get(fromID) == nil { + if t.cache.get(fromID) == nil { // No bond exists, we don't process the packet. This prevents // an attack vector where the discovery protocol could be used // to amplify traffic in a DDOS attack. A malicious actor diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 782895e46..299f94543 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest { remotekey: newkey(), remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303}, } - test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "") + test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil) return test } @@ -157,7 +157,7 @@ func TestUDP_findnode(t *testing.T) { // ensure there's a bond with the test node, // findnode won't be accepted otherwise. - test.table.db.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99) + test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99) // check that closest neighbors are returned. test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp}) diff --git a/p2p/server.go b/p2p/server.go index 39b0b8b6e..5f1b80f51 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -59,9 +59,9 @@ type Server struct { // with the rest of the network. BootstrapNodes []*discover.Node - // SeedCache is the path to the database containing the previously seen live - // nodes in the network to use as potential bootstrap seeds. - SeedCache string + // SeedCache is the database containing the previously seen live nodes in + // the network to use as potential bootstrap seeds. + SeedCache *discover.Cache // Protocols should contain the protocols supported // by the server. Matching protocols are launched for -- cgit v1.2.3