diff options
Diffstat (limited to 'p2p/discv5/net.go')
-rw-r--r-- | p2p/discv5/net.go | 93 |
1 files changed, 55 insertions, 38 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index b08cd2bc7..71eaec3c4 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -41,9 +41,10 @@ var ( ) const ( - autoRefreshInterval = 1 * time.Hour - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + autoRefreshInterval = 1 * time.Hour + bucketRefreshInterval = 1 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) const testTopic = "foo" @@ -62,8 +63,9 @@ func debugLog(s string) { // BootNodes are the enode URLs of the P2P bootstrap nodes for the experimental RLPx v5 "Topic Discovery" network // warning: local bootnodes for testing!!! var BootNodes = []*Node{ - //MustParseNode("enode://6f974ede10d07334e7e651c1501cb540d087dd3a6dea81432620895c913f281790b49459d72cb8011bfbbfbd24fad956356189c31b7181a96cd44ccfb68bfc71@127.0.0.1:30301"), MustParseNode("enode://0cc5f5ffb5d9098c8b8c62325f3797f56509bff942704687b6530992ac706e2cb946b90a34f1f19548cd3c7baccbcaea354531e5983c7d1bc0dee16ce4b6440b@40.118.3.223:30305"), + MustParseNode("enode://1c7a64d76c0334b0418c004af2f67c50e36a3be60b5e4790bdac0439d21603469a85fad36f2473c9a80eb043ae60936df905fa28f1ff614c3e5dc34f15dcd2dc@40.118.3.223:30308"), + MustParseNode("enode://85c85d7143ae8bb96924f2b54f1b3e70d8c4d367af305325d30a61385a432f247d2c75c45c6b4a60335060d072d7f5b35dd1d4c45f76941f62a4f83b6e75daaf@40.118.3.223:30309"), } // Network manages the table and all protocol interaction. @@ -82,7 +84,6 @@ type Network struct { tableOpResp chan struct{} topicRegisterReq chan topicRegisterReq topicSearchReq chan topicSearchReq - bucketFillChn chan chan struct{} // State of the main loop. tab *Table @@ -169,7 +170,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d queryReq: make(chan *findnodeQuery), topicRegisterReq: make(chan topicRegisterReq), topicSearchReq: make(chan topicSearchReq), - bucketFillChn: make(chan chan struct{}, 1), nodes: make(map[NodeID]*Node), } go net.loop() @@ -353,8 +353,9 @@ func (net *Network) reqTableOp(f func()) (called bool) { func (net *Network) loop() { var ( - refreshTimer = time.NewTicker(autoRefreshInterval) - refreshDone chan struct{} // closed when the 'refresh' lookup has ended + refreshTimer = time.NewTicker(autoRefreshInterval) + bucketRefreshTimer = time.NewTimer(bucketRefreshInterval) + refreshDone chan struct{} // closed when the 'refresh' lookup has ended ) // Tracking the next ticket to register. @@ -389,6 +390,7 @@ func (net *Network) loop() { topicRegisterLookupDone chan []*Node topicRegisterLookupTick = time.NewTimer(0) topicSearchLookupTarget lookupInfo + searchReqWhenRefreshDone []topicSearchReq ) topicSearchLookupDone := make(chan []*Node, 1) <-topicRegisterLookupTick.C @@ -406,6 +408,7 @@ loop: // Ingress packet handling. case pkt := <-net.read: + //fmt.Println("read", pkt.ev) debugLog("<-net.read") n := net.internNode(&pkt) prestate := n.state @@ -503,14 +506,18 @@ loop: net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) case req := <-net.topicSearchReq: - debugLog("<-net.topicSearchReq") - if req.found == nil { - net.ticketStore.removeSearchTopic(req.topic) - continue - } - net.ticketStore.addSearchTopic(req.topic, req.found) - if (topicSearchLookupTarget.target == common.Hash{}) { - topicSearchLookupDone <- nil + if refreshDone == nil { + debugLog("<-net.topicSearchReq") + if req.found == nil { + net.ticketStore.removeSearchTopic(req.topic) + continue + } + net.ticketStore.addSearchTopic(req.topic, req.found) + if (topicSearchLookupTarget.target == common.Hash{}) { + topicSearchLookupDone <- nil + } + } else { + searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req) } case nodes := <-topicSearchLookupDone: @@ -519,7 +526,14 @@ loop: net.ping(n, n.addr()) return n.pingEcho }, func(n *Node, topic Topic) []byte { - return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration + if n.state == known { + return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration + } else { + if n.state == unknown { + net.ping(n, n.addr()) + } + return nil + } }) topicSearchLookupTarget = net.ticketStore.nextSearchLookup() target := topicSearchLookupTarget.target @@ -564,9 +578,12 @@ loop: refreshDone = make(chan struct{}) net.refresh(refreshDone) } - case doneChn := <-net.bucketFillChn: - debugLog("bucketFill") - net.bucketFill(doneChn) + case <-bucketRefreshTimer.C: + target := net.tab.chooseBucketRefreshTarget() + go func() { + net.lookup(target, false) + bucketRefreshTimer.Reset(bucketRefreshInterval) + }() case newNursery := <-net.refreshReq: debugLog("<-net.refreshReq") if newNursery != nil { @@ -580,6 +597,13 @@ loop: case <-refreshDone: debugLog("<-net.refreshDone") refreshDone = nil + list := searchReqWhenRefreshDone + searchReqWhenRefreshDone = nil + go func() { + for _, req := range list { + net.topicSearchReq <- req + } + }() } } debugLog("loop stopped") @@ -643,28 +667,13 @@ func (net *Network) refresh(done chan<- struct{}) { }() } -func (net *Network) bucketFill(done chan<- struct{}) { - target := net.tab.chooseBucketFillTarget() - go func() { - net.lookup(target, false) - close(done) - }() -} - -func (net *Network) BucketFill() { - done := make(chan struct{}) - select { - case net.bucketFillChn <- done: - <-done - case <-net.closed: - close(done) - } -} - // Node Interning. func (net *Network) internNode(pkt *ingressPacket) *Node { if n := net.nodes[pkt.remoteID]; n != nil { + n.IP = pkt.remoteAddr.IP + n.UDP = uint16(pkt.remoteAddr.Port) + n.TCP = uint16(pkt.remoteAddr.Port) return n } n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port)) @@ -967,8 +976,10 @@ func init() { // handle processes packets sent by n and events related to n. func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error { + //fmt.Println("handle", n.addr().String(), n.state, ev) if pkt != nil { if err := net.checkPacket(n, ev, pkt); err != nil { + //fmt.Println("check err:", err) return err } // Start the background expiration goroutine after the first @@ -985,6 +996,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error { } next, err := n.state.handle(net, n, ev, pkt) net.transition(n, next) + //fmt.Println("new state:", n.state) return err } @@ -1040,6 +1052,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) { } func (net *Network) ping(n *Node, addr *net.UDPAddr) { + //fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex()) + if n.pingEcho != nil || n.ID == net.tab.self.ID { + //fmt.Println(" not sent") + return + } debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8])) n.pingTopics = net.ticketStore.regTopicSet() n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics) |