diff options
Diffstat (limited to 'p2p/discv5/net.go')
-rw-r--r-- | p2p/discv5/net.go | 121 |
1 files changed, 86 insertions, 35 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index d1c48904e..74d485836 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -126,8 +126,15 @@ type topicRegisterReq struct { } type topicSearchReq struct { - topic Topic - found chan<- string + topic Topic + found chan<- *Node + lookup chan<- bool + delay time.Duration +} + +type topicSearchResult struct { + target lookupInfo + nodes []*Node } type timeoutEvent struct { @@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node { break } // Wait for the next reply. - for _, n := range <-reply { - if n != nil && !seen[n.ID] { - seen[n.ID] = true - result.push(n, bucketSize) - if stopOnMatch && n.sha == target { - return result.entries + select { + case nodes := <-reply: + for _, n := range nodes { + if n != nil && !seen[n.ID] { + seen[n.ID] = true + result.push(n, bucketSize) + if stopOnMatch && n.sha == target { + return result.entries + } } } + pendingQueries-- + case <-time.After(respTimeout): + // forget all pending requests, start new ones + pendingQueries = 0 + reply = make(chan []*Node, alpha) } - pendingQueries-- } return result.entries } @@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) { } } -func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) { - select { - case net.topicSearchReq <- topicSearchReq{topic, found}: - case <-net.closed: - return - } - select { - case <-net.closed: - case <-stop: +func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) { + for { select { - case net.topicSearchReq <- topicSearchReq{topic, nil}: case <-net.closed: + return + case delay, ok := <-setPeriod: + select { + case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}: + case <-net.closed: + return + } + if !ok { + return + } } } } @@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) { // TODO: external address handling. +type topicSearchInfo struct { + lookupChn chan<- bool + period time.Duration +} + +const maxSearchCount = 5 + func (net *Network) loop() { var ( refreshTimer = time.NewTicker(autoRefreshInterval) @@ -385,10 +408,12 @@ func (net *Network) loop() { topicRegisterLookupTarget lookupInfo topicRegisterLookupDone chan []*Node topicRegisterLookupTick = time.NewTimer(0) - topicSearchLookupTarget lookupInfo searchReqWhenRefreshDone []topicSearchReq + searchInfo = make(map[Topic]topicSearchInfo) + activeSearchCount int ) - topicSearchLookupDone := make(chan []*Node, 1) + topicSearchLookupDone := make(chan topicSearchResult, 100) + topicSearch := make(chan Topic, 100) <-topicRegisterLookupTick.C statsDump := time.NewTicker(10 * time.Second) @@ -504,21 +529,52 @@ loop: case req := <-net.topicSearchReq: if refreshDone == nil { debugLog("<-net.topicSearchReq") - if req.found == nil { - net.ticketStore.removeSearchTopic(req.topic) + info, ok := searchInfo[req.topic] + if ok { + if req.delay == time.Duration(0) { + delete(searchInfo, req.topic) + net.ticketStore.removeSearchTopic(req.topic) + } else { + info.period = req.delay + searchInfo[req.topic] = info + } continue } - net.ticketStore.addSearchTopic(req.topic, req.found) - if (topicSearchLookupTarget.target == common.Hash{}) { - topicSearchLookupDone <- nil + if req.delay != time.Duration(0) { + var info topicSearchInfo + info.period = req.delay + info.lookupChn = req.lookup + searchInfo[req.topic] = info + net.ticketStore.addSearchTopic(req.topic, req.found) + topicSearch <- req.topic } } else { searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req) } - case nodes := <-topicSearchLookupDone: - debugLog("<-topicSearchLookupDone") - net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte { + case topic := <-topicSearch: + if activeSearchCount < maxSearchCount { + activeSearchCount++ + target := net.ticketStore.nextSearchLookup(topic) + go func() { + nodes := net.lookup(target.target, false) + topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes} + }() + } + period := searchInfo[topic].period + if period != time.Duration(0) { + go func() { + time.Sleep(period) + topicSearch <- topic + }() + } + + case res := <-topicSearchLookupDone: + activeSearchCount-- + if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil { + lookupChn <- net.ticketStore.radius[res.target.topic].converged + } + net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte { net.ping(n, n.addr()) return n.pingEcho }, func(n *Node, topic Topic) []byte { @@ -531,11 +587,6 @@ loop: return nil } }) - topicSearchLookupTarget = net.ticketStore.nextSearchLookup() - target := topicSearchLookupTarget.target - if (target != common.Hash{}) { - go func() { topicSearchLookupDone <- net.lookup(target, false) }() - } case <-statsDump.C: debugLog("<-statsDump.C") @@ -708,7 +759,7 @@ func (net *Network) internNodeFromNeighbours(sender *net.UDPAddr, rn rpcNode) (n } return n, err } - if !bytes.Equal(n.IP, rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP { + if !n.IP.Equal(rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP { err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n) } return n, err |