diff options
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r-- | p2p/discover/table.go | 239 |
1 files changed, 61 insertions, 178 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 18920ccfd..8803daa56 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -25,7 +25,6 @@ package discover import ( crand "crypto/rand" "encoding/binary" - "errors" "fmt" mrand "math/rand" "net" @@ -54,15 +53,13 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - maxBondingPingPongs = 16 // Limit on the number of concurrent ping/pong interactions - maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped - - refreshInterval = 30 * time.Minute - revalidateInterval = 10 * time.Second - copyNodesInterval = 30 * time.Second - seedMinTableTime = 5 * time.Minute - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped + refreshInterval = 30 * time.Minute + revalidateInterval = 10 * time.Second + copyNodesInterval = 30 * time.Second + seedMinTableTime = 5 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) type Table struct { @@ -78,28 +75,17 @@ type Table struct { closeReq chan struct{} closed chan struct{} - bondmu sync.Mutex - bonding map[NodeID]*bondproc - bondslots chan struct{} // limits total number of active bonding processes - nodeAddedHook func(*Node) // for testing net transport self *Node // metadata of the local node } -type bondproc struct { - err error - n *Node - done chan struct{} -} - // transport is implemented by the UDP transport. // it is an interface so we can test without opening lots of UDP // sockets and without generating a private key. type transport interface { ping(NodeID, *net.UDPAddr) error - waitping(NodeID) error findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error) close() } @@ -114,7 +100,7 @@ type bucket struct { func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) { // If no node database was given, use an in-memory one - db, err := newNodeDB(nodeDBPath, Version, ourID) + db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID) if err != nil { return nil, err } @@ -122,8 +108,6 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string net: t, db: db, self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), - bonding: make(map[NodeID]*bondproc), - bondslots: make(chan struct{}, maxBondingPingPongs), refreshReq: make(chan chan struct{}), initDone: make(chan struct{}), closeReq: make(chan struct{}), @@ -134,16 +118,13 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err } - for i := 0; i < cap(tab.bondslots); i++ { - tab.bondslots <- struct{}{} - } for i := range tab.buckets { tab.buckets[i] = &bucket{ ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, } } tab.seedRand() - tab.loadSeedNodes(false) + tab.loadSeedNodes() // Start the background expiration goroutine after loading seeds so that the search for // seed nodes also considers older nodes that would otherwise be removed by the // expiration. @@ -315,22 +296,7 @@ func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { if !asked[n.ID] { asked[n.ID] = true pendingQueries++ - go func() { - // Find potential neighbors to bond with - r, err := tab.net.findnode(n.ID, n.addr(), targetID) - if err != nil { - // Bump the failure counter to detect and evacuate non-bonded entries - fails := tab.db.findFails(n.ID) + 1 - tab.db.updateFindFails(n.ID, fails) - log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails) - - if fails >= maxFindnodeFailures { - log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails) - tab.delete(n) - } - } - reply <- tab.bondall(r) - }() + go tab.findnode(n, targetID, reply) } } if pendingQueries == 0 { @@ -349,6 +315,29 @@ func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { return result.entries } +func (tab *Table) findnode(n *Node, targetID NodeID, reply chan<- []*Node) { + fails := tab.db.findFails(n.ID) + r, err := tab.net.findnode(n.ID, n.addr(), targetID) + if err != nil || len(r) == 0 { + fails++ + tab.db.updateFindFails(n.ID, fails) + log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err) + if fails >= maxFindnodeFailures { + log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails) + tab.delete(n) + } + } else if fails > 0 { + tab.db.updateFindFails(n.ID, fails-1) + } + + // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll + // just remove those again during revalidation. + for _, n := range r { + tab.add(n) + } + reply <- r +} + func (tab *Table) refresh() <-chan struct{} { done := make(chan struct{}) select { @@ -401,7 +390,7 @@ loop: case <-revalidateDone: revalidate.Reset(tab.nextRevalidateTime()) case <-copyNodes.C: - go tab.copyBondedNodes() + go tab.copyLiveNodes() case <-tab.closeReq: break loop } @@ -429,7 +418,7 @@ func (tab *Table) doRefresh(done chan struct{}) { // Load nodes from the database and insert // them. This should yield a few previously seen nodes that are // (hopefully) still alive. - tab.loadSeedNodes(true) + tab.loadSeedNodes() // Run self lookup to discover new neighbor nodes. tab.lookup(tab.self.ID, false) @@ -447,15 +436,12 @@ func (tab *Table) doRefresh(done chan struct{}) { } } -func (tab *Table) loadSeedNodes(bond bool) { +func (tab *Table) loadSeedNodes() { seeds := tab.db.querySeeds(seedCount, seedMaxAge) seeds = append(seeds, tab.nursery...) - if bond { - seeds = tab.bondall(seeds) - } for i := range seeds { seed := seeds[i] - age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }} + age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.lastPongReceived(seed.ID)) }} log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age) tab.add(seed) } @@ -473,7 +459,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { } // Ping the selected node and wait for a pong. - err := tab.ping(last.ID, last.addr()) + err := tab.net.ping(last.ID, last.addr()) tab.mutex.Lock() defer tab.mutex.Unlock() @@ -515,9 +501,9 @@ func (tab *Table) nextRevalidateTime() time.Duration { return time.Duration(tab.rand.Int63n(int64(revalidateInterval))) } -// copyBondedNodes adds nodes from the table to the database if they have been in the table +// copyLiveNodes adds nodes from the table to the database if they have been in the table // longer then minTableTime. -func (tab *Table) copyBondedNodes() { +func (tab *Table) copyLiveNodes() { tab.mutex.Lock() defer tab.mutex.Unlock() @@ -553,120 +539,6 @@ func (tab *Table) len() (n int) { return n } -// bondall bonds with all given nodes concurrently and returns -// those nodes for which bonding has probably succeeded. -func (tab *Table) bondall(nodes []*Node) (result []*Node) { - rc := make(chan *Node, len(nodes)) - for i := range nodes { - go func(n *Node) { - nn, _ := tab.bond(false, n.ID, n.addr(), n.TCP) - rc <- nn - }(nodes[i]) - } - for range nodes { - if n := <-rc; n != nil { - result = append(result, n) - } - } - return result -} - -// bond ensures the local node has a bond with the given remote node. -// It also attempts to insert the node into the table if bonding succeeds. -// The caller must not hold tab.mutex. -// -// A bond is must be established before sending findnode requests. -// Both sides must have completed a ping/pong exchange for a bond to -// exist. The total number of active bonding processes is limited in -// order to restrain network use. -// -// bond is meant to operate idempotently in that bonding with a remote -// node which still remembers a previously established bond will work. -// The remote node will simply not send a ping back, causing waitping -// to time out. -// -// If pinged is true, the remote node has just pinged us and one half -// of the process can be skipped. -func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { - if id == tab.self.ID { - return nil, errors.New("is self") - } - if pinged && !tab.isInitDone() { - return nil, errors.New("still initializing") - } - // Start bonding if we haven't seen this node for a while or if it failed findnode too often. - node, fails := tab.db.node(id), tab.db.findFails(id) - age := time.Since(tab.db.bondTime(id)) - var result error - if fails > 0 || age > nodeDBNodeExpiration { - log.Trace("Starting bonding ping/pong", "id", id, "known", node != nil, "failcount", fails, "age", age) - - tab.bondmu.Lock() - w := tab.bonding[id] - if w != nil { - // Wait for an existing bonding process to complete. - tab.bondmu.Unlock() - <-w.done - } else { - // Register a new bonding process. - w = &bondproc{done: make(chan struct{})} - tab.bonding[id] = w - tab.bondmu.Unlock() - // Do the ping/pong. The result goes into w. - tab.pingpong(w, pinged, id, addr, tcpPort) - // Unregister the process after it's done. - tab.bondmu.Lock() - delete(tab.bonding, id) - tab.bondmu.Unlock() - } - // Retrieve the bonding results - result = w.err - if result == nil { - node = w.n - } - } - // Add the node to the table even if the bonding ping/pong - // fails. It will be relaced quickly if it continues to be - // unresponsive. - if node != nil { - tab.add(node) - tab.db.updateFindFails(id, 0) - } - return node, result -} - -func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) { - // Request a bonding slot to limit network usage - <-tab.bondslots - defer func() { tab.bondslots <- struct{}{} }() - - // Ping the remote side and wait for a pong. - if w.err = tab.ping(id, addr); w.err != nil { - close(w.done) - return - } - if !pinged { - // Give the remote node a chance to ping us before we start - // sending findnode requests. If they still remember us, - // waitping will simply time out. - tab.net.waitping(id) - } - // Bonding succeeded, update the node database. - w.n = NewNode(id, addr.IP, uint16(addr.Port), tcpPort) - close(w.done) -} - -// ping a remote endpoint and wait for a reply, also updating the node -// database accordingly. -func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { - tab.db.updateLastPing(id, time.Now()) - if err := tab.net.ping(id, addr); err != nil { - return err - } - tab.db.updateBondTime(id, time.Now()) - return nil -} - // bucket returns the bucket for the given node ID hash. func (tab *Table) bucket(sha common.Hash) *bucket { d := logdist(tab.self.sha, sha) @@ -676,21 +548,33 @@ func (tab *Table) bucket(sha common.Hash) *bucket { return tab.buckets[d-bucketMinDistance-1] } -// add attempts to add the given node its corresponding bucket. If the -// bucket has space available, adding the node succeeds immediately. -// Otherwise, the node is added if the least recently active node in -// the bucket does not respond to a ping packet. +// add attempts to add the given node to its corresponding bucket. If the bucket has space +// available, adding the node succeeds immediately. Otherwise, the node is added if the +// least recently active node in the bucket does not respond to a ping packet. // // The caller must not hold tab.mutex. -func (tab *Table) add(new *Node) { +func (tab *Table) add(n *Node) { tab.mutex.Lock() defer tab.mutex.Unlock() - b := tab.bucket(new.sha) - if !tab.bumpOrAdd(b, new) { + b := tab.bucket(n.sha) + if !tab.bumpOrAdd(b, n) { // Node is not in table. Add it to the replacement list. - tab.addReplacement(b, new) + tab.addReplacement(b, n) + } +} + +// addThroughPing adds the given node to the table. Compared to plain +// 'add' there is an additional safety measure: if the table is still +// initializing the node is not added. This prevents an attack where the +// table could be filled by just sending ping repeatedly. +// +// The caller must not hold tab.mutex. +func (tab *Table) addThroughPing(n *Node) { + if !tab.isInitDone() { + return } + tab.add(n) } // stuff adds nodes the table to the end of their corresponding bucket @@ -710,8 +594,7 @@ func (tab *Table) stuff(nodes []*Node) { } } -// delete removes an entry from the node table (used to evacuate -// failed/non-bonded discovery peers). +// delete removes an entry from the node table. It is used to evacuate dead nodes. func (tab *Table) delete(node *Node) { tab.mutex.Lock() defer tab.mutex.Unlock() |