aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discv5/net.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discv5/net.go')
-rw-r--r--p2p/discv5/net.go90
1 files changed, 53 insertions, 37 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index b08cd2bc7..5aedb39a0 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -41,9 +41,10 @@ var (
)
const (
- autoRefreshInterval = 1 * time.Hour
- seedCount = 30
- seedMaxAge = 5 * 24 * time.Hour
+ autoRefreshInterval = 1 * time.Hour
+ bucketRefreshInterval = 1 * time.Minute
+ seedCount = 30
+ seedMaxAge = 5 * 24 * time.Hour
)
const testTopic = "foo"
@@ -82,7 +83,6 @@ type Network struct {
tableOpResp chan struct{}
topicRegisterReq chan topicRegisterReq
topicSearchReq chan topicSearchReq
- bucketFillChn chan chan struct{}
// State of the main loop.
tab *Table
@@ -169,7 +169,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d
queryReq: make(chan *findnodeQuery),
topicRegisterReq: make(chan topicRegisterReq),
topicSearchReq: make(chan topicSearchReq),
- bucketFillChn: make(chan chan struct{}, 1),
nodes: make(map[NodeID]*Node),
}
go net.loop()
@@ -353,8 +352,9 @@ func (net *Network) reqTableOp(f func()) (called bool) {
func (net *Network) loop() {
var (
- refreshTimer = time.NewTicker(autoRefreshInterval)
- refreshDone chan struct{} // closed when the 'refresh' lookup has ended
+ refreshTimer = time.NewTicker(autoRefreshInterval)
+ bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
+ refreshDone chan struct{} // closed when the 'refresh' lookup has ended
)
// Tracking the next ticket to register.
@@ -389,6 +389,7 @@ func (net *Network) loop() {
topicRegisterLookupDone chan []*Node
topicRegisterLookupTick = time.NewTimer(0)
topicSearchLookupTarget lookupInfo
+ searchReqWhenRefreshDone []topicSearchReq
)
topicSearchLookupDone := make(chan []*Node, 1)
<-topicRegisterLookupTick.C
@@ -406,6 +407,7 @@ loop:
// Ingress packet handling.
case pkt := <-net.read:
+ //fmt.Println("read", pkt.ev)
debugLog("<-net.read")
n := net.internNode(&pkt)
prestate := n.state
@@ -503,14 +505,18 @@ loop:
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
case req := <-net.topicSearchReq:
- debugLog("<-net.topicSearchReq")
- if req.found == nil {
- net.ticketStore.removeSearchTopic(req.topic)
- continue
- }
- net.ticketStore.addSearchTopic(req.topic, req.found)
- if (topicSearchLookupTarget.target == common.Hash{}) {
- topicSearchLookupDone <- nil
+ if refreshDone == nil {
+ debugLog("<-net.topicSearchReq")
+ if req.found == nil {
+ net.ticketStore.removeSearchTopic(req.topic)
+ continue
+ }
+ net.ticketStore.addSearchTopic(req.topic, req.found)
+ if (topicSearchLookupTarget.target == common.Hash{}) {
+ topicSearchLookupDone <- nil
+ }
+ } else {
+ searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
}
case nodes := <-topicSearchLookupDone:
@@ -519,7 +525,14 @@ loop:
net.ping(n, n.addr())
return n.pingEcho
}, func(n *Node, topic Topic) []byte {
- return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
+ if n.state == known {
+ return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
+ } else {
+ if n.state == unknown {
+ net.ping(n, n.addr())
+ }
+ return nil
+ }
})
topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
target := topicSearchLookupTarget.target
@@ -564,9 +577,12 @@ loop:
refreshDone = make(chan struct{})
net.refresh(refreshDone)
}
- case doneChn := <-net.bucketFillChn:
- debugLog("bucketFill")
- net.bucketFill(doneChn)
+ case <-bucketRefreshTimer.C:
+ target := net.tab.chooseBucketRefreshTarget()
+ go func() {
+ net.lookup(target, false)
+ bucketRefreshTimer.Reset(bucketRefreshInterval)
+ }()
case newNursery := <-net.refreshReq:
debugLog("<-net.refreshReq")
if newNursery != nil {
@@ -580,6 +596,13 @@ loop:
case <-refreshDone:
debugLog("<-net.refreshDone")
refreshDone = nil
+ list := searchReqWhenRefreshDone
+ searchReqWhenRefreshDone = nil
+ go func() {
+ for _, req := range list {
+ net.topicSearchReq <- req
+ }
+ }()
}
}
debugLog("loop stopped")
@@ -643,28 +666,13 @@ func (net *Network) refresh(done chan<- struct{}) {
}()
}
-func (net *Network) bucketFill(done chan<- struct{}) {
- target := net.tab.chooseBucketFillTarget()
- go func() {
- net.lookup(target, false)
- close(done)
- }()
-}
-
-func (net *Network) BucketFill() {
- done := make(chan struct{})
- select {
- case net.bucketFillChn <- done:
- <-done
- case <-net.closed:
- close(done)
- }
-}
-
// Node Interning.
func (net *Network) internNode(pkt *ingressPacket) *Node {
if n := net.nodes[pkt.remoteID]; n != nil {
+ n.IP = pkt.remoteAddr.IP
+ n.UDP = uint16(pkt.remoteAddr.Port)
+ n.TCP = uint16(pkt.remoteAddr.Port)
return n
}
n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
@@ -967,8 +975,10 @@ func init() {
// handle processes packets sent by n and events related to n.
func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
+ //fmt.Println("handle", n.addr().String(), n.state, ev)
if pkt != nil {
if err := net.checkPacket(n, ev, pkt); err != nil {
+ //fmt.Println("check err:", err)
return err
}
// Start the background expiration goroutine after the first
@@ -985,6 +995,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
}
next, err := n.state.handle(net, n, ev, pkt)
net.transition(n, next)
+ //fmt.Println("new state:", n.state)
return err
}
@@ -1040,6 +1051,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
}
func (net *Network) ping(n *Node, addr *net.UDPAddr) {
+ //fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
+ if n.pingEcho != nil || n.ID == net.tab.self.ID {
+ //fmt.Println(" not sent")
+ return
+ }
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
n.pingTopics = net.ticketStore.regTopicSet()
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)