aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discv5/net.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discv5/net.go')
-rw-r--r--p2p/discv5/net.go121
1 files changed, 86 insertions, 35 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index d1c48904e..74d485836 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -126,8 +126,15 @@ type topicRegisterReq struct {
}
type topicSearchReq struct {
- topic Topic
- found chan<- string
+ topic Topic
+ found chan<- *Node
+ lookup chan<- bool
+ delay time.Duration
+}
+
+type topicSearchResult struct {
+ target lookupInfo
+ nodes []*Node
}
type timeoutEvent struct {
@@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node {
break
}
// Wait for the next reply.
- for _, n := range <-reply {
- if n != nil && !seen[n.ID] {
- seen[n.ID] = true
- result.push(n, bucketSize)
- if stopOnMatch && n.sha == target {
- return result.entries
+ select {
+ case nodes := <-reply:
+ for _, n := range nodes {
+ if n != nil && !seen[n.ID] {
+ seen[n.ID] = true
+ result.push(n, bucketSize)
+ if stopOnMatch && n.sha == target {
+ return result.entries
+ }
}
}
+ pendingQueries--
+ case <-time.After(respTimeout):
+ // forget all pending requests, start new ones
+ pendingQueries = 0
+ reply = make(chan []*Node, alpha)
}
- pendingQueries--
}
return result.entries
}
@@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) {
}
}
-func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) {
- select {
- case net.topicSearchReq <- topicSearchReq{topic, found}:
- case <-net.closed:
- return
- }
- select {
- case <-net.closed:
- case <-stop:
+func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) {
+ for {
select {
- case net.topicSearchReq <- topicSearchReq{topic, nil}:
case <-net.closed:
+ return
+ case delay, ok := <-setPeriod:
+ select {
+ case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}:
+ case <-net.closed:
+ return
+ }
+ if !ok {
+ return
+ }
}
}
}
@@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) {
// TODO: external address handling.
+type topicSearchInfo struct {
+ lookupChn chan<- bool
+ period time.Duration
+}
+
+const maxSearchCount = 5
+
func (net *Network) loop() {
var (
refreshTimer = time.NewTicker(autoRefreshInterval)
@@ -385,10 +408,12 @@ func (net *Network) loop() {
topicRegisterLookupTarget lookupInfo
topicRegisterLookupDone chan []*Node
topicRegisterLookupTick = time.NewTimer(0)
- topicSearchLookupTarget lookupInfo
searchReqWhenRefreshDone []topicSearchReq
+ searchInfo = make(map[Topic]topicSearchInfo)
+ activeSearchCount int
)
- topicSearchLookupDone := make(chan []*Node, 1)
+ topicSearchLookupDone := make(chan topicSearchResult, 100)
+ topicSearch := make(chan Topic, 100)
<-topicRegisterLookupTick.C
statsDump := time.NewTicker(10 * time.Second)
@@ -504,21 +529,52 @@ loop:
case req := <-net.topicSearchReq:
if refreshDone == nil {
debugLog("<-net.topicSearchReq")
- if req.found == nil {
- net.ticketStore.removeSearchTopic(req.topic)
+ info, ok := searchInfo[req.topic]
+ if ok {
+ if req.delay == time.Duration(0) {
+ delete(searchInfo, req.topic)
+ net.ticketStore.removeSearchTopic(req.topic)
+ } else {
+ info.period = req.delay
+ searchInfo[req.topic] = info
+ }
continue
}
- net.ticketStore.addSearchTopic(req.topic, req.found)
- if (topicSearchLookupTarget.target == common.Hash{}) {
- topicSearchLookupDone <- nil
+ if req.delay != time.Duration(0) {
+ var info topicSearchInfo
+ info.period = req.delay
+ info.lookupChn = req.lookup
+ searchInfo[req.topic] = info
+ net.ticketStore.addSearchTopic(req.topic, req.found)
+ topicSearch <- req.topic
}
} else {
searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
}
- case nodes := <-topicSearchLookupDone:
- debugLog("<-topicSearchLookupDone")
- net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte {
+ case topic := <-topicSearch:
+ if activeSearchCount < maxSearchCount {
+ activeSearchCount++
+ target := net.ticketStore.nextSearchLookup(topic)
+ go func() {
+ nodes := net.lookup(target.target, false)
+ topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes}
+ }()
+ }
+ period := searchInfo[topic].period
+ if period != time.Duration(0) {
+ go func() {
+ time.Sleep(period)
+ topicSearch <- topic
+ }()
+ }
+
+ case res := <-topicSearchLookupDone:
+ activeSearchCount--
+ if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil {
+ lookupChn <- net.ticketStore.radius[res.target.topic].converged
+ }
+ net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte {
net.ping(n, n.addr())
return n.pingEcho
}, func(n *Node, topic Topic) []byte {
@@ -531,11 +587,6 @@ loop:
return nil
}
})
- topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
- target := topicSearchLookupTarget.target
- if (target != common.Hash{}) {
- go func() { topicSearchLookupDone <- net.lookup(target, false) }()
- }
case <-statsDump.C:
debugLog("<-statsDump.C")
@@ -708,7 +759,7 @@ func (net *Network) internNodeFromNeighbours(sender *net.UDPAddr, rn rpcNode) (n
}
return n, err
}
- if !bytes.Equal(n.IP, rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP {
+ if !n.IP.Equal(rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP {
err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n)
}
return n, err