diff options
Diffstat (limited to 'p2p/discv5')
-rw-r--r-- | p2p/discv5/net.go | 56 | ||||
-rw-r--r-- | p2p/discv5/node.go | 5 | ||||
-rw-r--r-- | p2p/discv5/ticket.go | 217 | ||||
-rw-r--r-- | p2p/discv5/topic.go | 3 |
4 files changed, 139 insertions, 142 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index 2fbb60824..cd9981584 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -51,16 +51,9 @@ const ( const testTopic = "foo" const ( - printDebugLogs = false printTestImgLogs = false ) -func debugLog(s string) { - if printDebugLogs { - fmt.Println(s) - } -} - // Network manages the table and all protocol interaction. type Network struct { db *nodeDB // database of known nodes @@ -388,14 +381,14 @@ func (net *Network) loop() { } }() resetNextTicket := func() { - t, timeout := net.ticketStore.nextFilteredTicket() - if t != nextTicket { - nextTicket = t + ticket, timeout := net.ticketStore.nextFilteredTicket() + if nextTicket != ticket { + nextTicket = ticket if nextRegisterTimer != nil { nextRegisterTimer.Stop() nextRegisterTime = nil } - if t != nil { + if ticket != nil { nextRegisterTimer = time.NewTimer(timeout) nextRegisterTime = nextRegisterTimer.C } @@ -423,13 +416,13 @@ loop: select { case <-net.closeReq: - debugLog("<-net.closeReq") + log.Trace("<-net.closeReq") break loop // Ingress packet handling. case pkt := <-net.read: //fmt.Println("read", pkt.ev) - debugLog("<-net.read") + log.Trace("<-net.read") n := net.internNode(&pkt) prestate := n.state status := "ok" @@ -444,7 +437,7 @@ loop: // State transition timeouts. case timeout := <-net.timeout: - debugLog("<-net.timeout") + log.Trace("<-net.timeout") if net.timeoutTimers[timeout] == nil { // Stale timer (was aborted). continue @@ -462,20 +455,20 @@ loop: // Querying. case q := <-net.queryReq: - debugLog("<-net.queryReq") + log.Trace("<-net.queryReq") if !q.start(net) { q.remote.deferQuery(q) } // Interacting with the table. case f := <-net.tableOpReq: - debugLog("<-net.tableOpReq") + log.Trace("<-net.tableOpReq") f() net.tableOpResp <- struct{}{} // Topic registration stuff. case req := <-net.topicRegisterReq: - debugLog("<-net.topicRegisterReq") + log.Trace("<-net.topicRegisterReq") if !req.add { net.ticketStore.removeRegisterTopic(req.topic) continue @@ -486,7 +479,7 @@ loop: // determination for new topics. // if topicRegisterLookupDone == nil { if topicRegisterLookupTarget.target == (common.Hash{}) { - debugLog("topicRegisterLookupTarget == null") + log.Trace("topicRegisterLookupTarget == null") if topicRegisterLookupTick.Stop() { <-topicRegisterLookupTick.C } @@ -496,7 +489,7 @@ loop: } case nodes := <-topicRegisterLookupDone: - debugLog("<-topicRegisterLookupDone") + log.Trace("<-topicRegisterLookupDone") net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte { net.ping(n, n.addr()) return n.pingEcho @@ -507,7 +500,7 @@ loop: topicRegisterLookupDone = nil case <-topicRegisterLookupTick.C: - debugLog("<-topicRegisterLookupTick") + log.Trace("<-topicRegisterLookupTick") if (topicRegisterLookupTarget.target == common.Hash{}) { target, delay := net.ticketStore.nextRegisterLookup() topicRegisterLookupTarget = target @@ -520,14 +513,14 @@ loop: } case <-nextRegisterTime: - debugLog("<-nextRegisterTime") + log.Trace("<-nextRegisterTime") net.ticketStore.ticketRegistered(*nextTicket) //fmt.Println("sendTopicRegister", nextTicket.t.node.addr().String(), nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) case req := <-net.topicSearchReq: if refreshDone == nil { - debugLog("<-net.topicSearchReq") + log.Trace("<-net.topicSearchReq") info, ok := searchInfo[req.topic] if ok { if req.delay == time.Duration(0) { @@ -588,7 +581,7 @@ loop: }) case <-statsDump.C: - debugLog("<-statsDump.C") + log.Trace("<-statsDump.C") /*r, ok := net.ticketStore.radius[testTopic] if !ok { fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now()) @@ -617,7 +610,7 @@ loop: // Periodic / lookup-initiated bucket refresh. case <-refreshTimer.C: - debugLog("<-refreshTimer.C") + log.Trace("<-refreshTimer.C") // TODO: ideally we would start the refresh timer after // fallback nodes have been set for the first time. if refreshDone == nil { @@ -631,7 +624,7 @@ loop: bucketRefreshTimer.Reset(bucketRefreshInterval) }() case newNursery := <-net.refreshReq: - debugLog("<-net.refreshReq") + log.Trace("<-net.refreshReq") if newNursery != nil { net.nursery = newNursery } @@ -641,7 +634,7 @@ loop: } net.refreshResp <- refreshDone case <-refreshDone: - debugLog("<-net.refreshDone") + log.Trace("<-net.refreshDone") refreshDone = nil list := searchReqWhenRefreshDone searchReqWhenRefreshDone = nil @@ -652,7 +645,7 @@ loop: }() } } - debugLog("loop stopped") + log.Trace("loop stopped") log.Debug(fmt.Sprintf("shutting down")) if net.conn != nil { @@ -1109,14 +1102,14 @@ func (net *Network) ping(n *Node, addr *net.UDPAddr) { //fmt.Println(" not sent") return } - debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8])) + log.Trace("Pinging remote node", "node", n.ID) n.pingTopics = net.ticketStore.regTopicSet() n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics) net.timedEvent(respTimeout, n, pongTimeout) } func (net *Network) handlePing(n *Node, pkt *ingressPacket) { - debugLog(fmt.Sprintf("handlePing(node = %x)", n.ID[:8])) + log.Trace("Handling remote ping", "node", n.ID) ping := pkt.data.(*ping) n.TCP = ping.From.TCP t := net.topictab.getTicket(n, ping.Topics) @@ -1131,7 +1124,7 @@ func (net *Network) handlePing(n *Node, pkt *ingressPacket) { } func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error { - debugLog(fmt.Sprintf("handleKnownPong(node = %x)", n.ID[:8])) + log.Trace("Handling known pong", "node", n.ID) net.abortTimedEvent(n, pongTimeout) now := mclock.Now() ticket, err := pongToTicket(now, n.pingTopics, n, pkt) @@ -1139,9 +1132,8 @@ func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error { // fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data) net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket) } else { - debugLog(fmt.Sprintf(" error: %v", err)) + log.Trace("Failed to convert pong to ticket", "err", err) } - n.pingEcho = nil n.pingTopics = nil return err diff --git a/p2p/discv5/node.go b/p2p/discv5/node.go index 2db7a508f..fd88a55b1 100644 --- a/p2p/discv5/node.go +++ b/p2p/discv5/node.go @@ -273,6 +273,11 @@ func (n NodeID) GoString() string { return fmt.Sprintf("discover.HexID(\"%x\")", n[:]) } +// TerminalString returns a shortened hex string for terminal logging. +func (n NodeID) TerminalString() string { + return hex.EncodeToString(n[:8]) +} + // HexID converts a hex string to a NodeID. // The string may be prefixed with 0x. func HexID(in string) (NodeID, error) { diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 193cef4be..b45ec4d2b 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -128,8 +129,11 @@ type ticketStore struct { // Contains buckets (for each absolute minute) of tickets // that can be used in that minute. // This is only set if the topic is being registered. - tickets map[Topic]topicTickets - regtopics []Topic + tickets map[Topic]*topicTickets + + regQueue []Topic // Topic registration queue for round robin attempts + regSet map[Topic]struct{} // Topic registration queue contents for fast filling + nodes map[*Node]*ticket nodeLastReq map[*Node]reqInfo @@ -152,14 +156,16 @@ type sentQuery struct { } type topicTickets struct { - buckets map[timeBucket][]ticketRef - nextLookup, nextReg mclock.AbsTime + buckets map[timeBucket][]ticketRef + nextLookup mclock.AbsTime + nextReg mclock.AbsTime } func newTicketStore() *ticketStore { return &ticketStore{ radius: make(map[Topic]*topicRadius), - tickets: make(map[Topic]topicTickets), + tickets: make(map[Topic]*topicTickets), + regSet: make(map[Topic]struct{}), nodes: make(map[*Node]*ticket), nodeLastReq: make(map[*Node]reqInfo), searchTopicMap: make(map[Topic]searchTopic), @@ -169,13 +175,13 @@ func newTicketStore() *ticketStore { // addTopic starts tracking a topic. If register is true, // the local node will register the topic and tickets will be collected. -func (s *ticketStore) addTopic(t Topic, register bool) { - debugLog(fmt.Sprintf(" addTopic(%v, %v)", t, register)) - if s.radius[t] == nil { - s.radius[t] = newTopicRadius(t) +func (s *ticketStore) addTopic(topic Topic, register bool) { + log.Trace("Adding discovery topic", "topic", topic, "register", register) + if s.radius[topic] == nil { + s.radius[topic] = newTopicRadius(topic) } - if register && s.tickets[t].buckets == nil { - s.tickets[t] = topicTickets{buckets: make(map[timeBucket][]ticketRef)} + if register && s.tickets[topic] == nil { + s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)} } } @@ -194,7 +200,11 @@ func (s *ticketStore) removeSearchTopic(t Topic) { // removeRegisterTopic deletes all tickets for the given topic. func (s *ticketStore) removeRegisterTopic(topic Topic) { - debugLog(fmt.Sprintf(" removeRegisterTopic(%v)", topic)) + log.Trace("Removing discovery topic", "topic", topic) + if s.tickets[topic] == nil { + log.Warn("Removing non-existent discovery topic", "topic", topic) + return + } for _, list := range s.tickets[topic].buckets { for _, ref := range list { ref.t.refCnt-- @@ -216,23 +226,35 @@ func (s *ticketStore) regTopicSet() []Topic { } // nextRegisterLookup returns the target of the next lookup for ticket collection. -func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Duration) { - debugLog("nextRegisterLookup()") - firstTopic, ok := s.iterRegTopics() - for topic := firstTopic; ok; { - debugLog(fmt.Sprintf(" checking topic %v, len(s.tickets[topic]) = %d", topic, len(s.tickets[topic].buckets))) - if s.tickets[topic].buckets != nil && s.needMoreTickets(topic) { - next := s.radius[topic].nextTarget(false) - debugLog(fmt.Sprintf(" %x 1s", next.target[:8])) - return next, 100 * time.Millisecond +func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) { + // Queue up any new topics (or discarded ones), preserving iteration order + for topic := range s.tickets { + if _, ok := s.regSet[topic]; !ok { + s.regQueue = append(s.regQueue, topic) + s.regSet[topic] = struct{}{} } - topic, ok = s.iterRegTopics() - if topic == firstTopic { - break // We have checked all topics. + } + // Iterate over the set of all topics and look up the next suitable one + for len(s.regQueue) > 0 { + // Fetch the next topic from the queue, and ensure it still exists + topic := s.regQueue[0] + s.regQueue = s.regQueue[1:] + delete(s.regSet, topic) + + if s.tickets[topic] == nil { + continue + } + // If the topic needs more tickets, return it + if s.tickets[topic].nextLookup < mclock.Now() { + next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond + log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay) + return next, delay } } - debugLog(" null, 40s") - return lookupInfo{}, 40 * time.Second + // No registration topics found or all exhausted, sleep + delay := 40 * time.Second + log.Trace("No topic found to register", "delay", delay) + return lookupInfo{}, delay } func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { @@ -246,40 +268,22 @@ func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { return target } -// iterRegTopics returns topics to register in arbitrary order. -// The second return value is false if there are no topics. -func (s *ticketStore) iterRegTopics() (Topic, bool) { - debugLog("iterRegTopics()") - if len(s.regtopics) == 0 { - if len(s.tickets) == 0 { - debugLog(" false") - return "", false - } - // Refill register list. - for t := range s.tickets { - s.regtopics = append(s.regtopics, t) - } +// ticketsInWindow returns the tickets of a given topic in the registration window. +func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef { + // Sanity check that the topic still exists before operating on it + if s.tickets[topic] == nil { + log.Warn("Listing non-existing discovery tickets", "topic", topic) + return nil } - topic := s.regtopics[len(s.regtopics)-1] - s.regtopics = s.regtopics[:len(s.regtopics)-1] - debugLog(" " + string(topic) + " true") - return topic, true -} - -func (s *ticketStore) needMoreTickets(t Topic) bool { - return s.tickets[t].nextLookup < mclock.Now() -} + // Gather all the tickers in the next time window + var tickets []ticketRef -// ticketsInWindow returns the tickets of a given topic in the registration window. -func (s *ticketStore) ticketsInWindow(t Topic) []ticketRef { - ltBucket := s.lastBucketFetched - var res []ticketRef - tickets := s.tickets[t].buckets - for g := ltBucket; g < ltBucket+timeWindow; g++ { - res = append(res, tickets[g]...) + buckets := s.tickets[topic].buckets + for idx := timeBucket(0); idx < timeWindow; idx++ { + tickets = append(tickets, buckets[s.lastBucketFetched+idx]...) } - debugLog(fmt.Sprintf("ticketsInWindow(%v) = %v", t, len(res))) - return res + log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets)) + return tickets } func (s *ticketStore) removeExcessTickets(t Topic) { @@ -317,53 +321,55 @@ func (s ticketRefByWaitTime) Swap(i, j int) { func (s *ticketStore) addTicketRef(r ticketRef) { topic := r.t.topics[r.idx] - t := s.tickets[topic] - if t.buckets == nil { + tickets := s.tickets[topic] + if tickets == nil { + log.Warn("Adding ticket to non-existent topic", "topic", topic) return } bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen)) - t.buckets[bucket] = append(t.buckets[bucket], r) + tickets.buckets[bucket] = append(tickets.buckets[bucket], r) r.t.refCnt++ min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt - if t.nextLookup < min { - t.nextLookup = min + if tickets.nextLookup < min { + tickets.nextLookup = min } - t.nextLookup += mclock.AbsTime(collectFrequency) - s.tickets[topic] = t + tickets.nextLookup += mclock.AbsTime(collectFrequency) //s.removeExcessTickets(topic) } -func (s *ticketStore) nextFilteredTicket() (t *ticketRef, wait time.Duration) { +func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) { now := mclock.Now() for { - t, wait = s.nextRegisterableTicket() - if t == nil { - return + ticket, wait := s.nextRegisterableTicket() + if ticket == nil { + return ticket, wait } + log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait) + regTime := now + mclock.AbsTime(wait) - topic := t.t.topics[t.idx] + topic := ticket.t.topics[ticket.idx] if regTime >= s.tickets[topic].nextReg { - return + return ticket, wait } - s.removeTicketRef(*t) + s.removeTicketRef(*ticket) } } -func (s *ticketStore) ticketRegistered(t ticketRef) { +func (s *ticketStore) ticketRegistered(ref ticketRef) { now := mclock.Now() - topic := t.t.topics[t.idx] - tt := s.tickets[topic] + topic := ref.t.topics[ref.idx] + tickets := s.tickets[topic] min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt - if min > tt.nextReg { - tt.nextReg = min + if min > tickets.nextReg { + tickets.nextReg = min } - tt.nextReg += mclock.AbsTime(registerFrequency) - s.tickets[topic] = tt + tickets.nextReg += mclock.AbsTime(registerFrequency) + s.tickets[topic] = tickets - s.removeTicketRef(t) + s.removeTicketRef(ref) } // nextRegisterableTicket returns the next ticket that can be used @@ -374,16 +380,7 @@ func (s *ticketStore) ticketRegistered(t ticketRef) { // // A ticket can be returned more than once with <= zero wait time in case // the ticket contains multiple topics. -func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration) { - defer func() { - if t == nil { - debugLog(" nil") - } else { - debugLog(fmt.Sprintf(" node = %x sn = %v wait = %v", t.t.node.ID[:8], t.t.serial, wait)) - } - }() - - debugLog("nextRegisterableTicket()") +func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) { now := mclock.Now() if s.nextTicketCached != nil { return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now) @@ -412,9 +409,8 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration return nil, 0 } if nextTicket.t != nil { - wait = time.Duration(nextTicket.topicRegTime() - now) s.nextTicketCached = &nextTicket - return &nextTicket, wait + return &nextTicket, time.Duration(nextTicket.topicRegTime() - now) } s.lastBucketFetched = bucket } @@ -422,14 +418,17 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration // removeTicket removes a ticket from the ticket store func (s *ticketStore) removeTicketRef(ref ticketRef) { - debugLog(fmt.Sprintf("removeTicketRef(node = %x sn = %v)", ref.t.node.ID[:8], ref.t.serial)) + log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial) + topic := ref.topic() - tickets := s.tickets[topic].buckets + tickets := s.tickets[topic] + if tickets == nil { + log.Warn("Removing tickets from unknown topic", "topic", topic) return } bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen)) - list := tickets[bucket] + list := tickets.buckets[bucket] idx := -1 for i, bt := range list { if bt.t == ref.t { @@ -442,9 +441,9 @@ func (s *ticketStore) removeTicketRef(ref ticketRef) { } list = append(list[:idx], list[idx+1:]...) if len(list) != 0 { - tickets[bucket] = list + tickets.buckets[bucket] = list } else { - delete(tickets, bucket) + delete(tickets.buckets, bucket) } ref.t.refCnt-- if ref.t.refCnt == 0 { @@ -523,21 +522,21 @@ func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Has } } -func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ticket) { - debugLog(fmt.Sprintf("add(node = %x sn = %v)", t.node.ID[:8], t.serial)) +func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) { + log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial) - lastReq, ok := s.nodeLastReq[t.node] + lastReq, ok := s.nodeLastReq[ticket.node] if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) { return } - s.adjustWithTicket(localTime, lastReq.lookup.target, t) + s.adjustWithTicket(localTime, lastReq.lookup.target, ticket) - if lastReq.lookup.radiusLookup || s.nodes[t.node] != nil { + if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil { return } topic := lastReq.lookup.topic - topicIdx := t.findIdx(topic) + topicIdx := ticket.findIdx(topic) if topicIdx == -1 { return } @@ -548,29 +547,29 @@ func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ti } if _, ok := s.tickets[topic]; ok { - wait := t.regTime[topicIdx] - localTime + wait := ticket.regTime[topicIdx] - localTime rnd := rand.ExpFloat64() if rnd > 10 { rnd = 10 } if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd { // use the ticket to register this topic - //fmt.Println("addTicket", t.node.ID[:8], t.node.addr().String(), t.serial, t.pong) - s.addTicketRef(ticketRef{t, topicIdx}) + //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong) + s.addTicketRef(ticketRef{ticket, topicIdx}) } } - if t.refCnt > 0 { + if ticket.refCnt > 0 { s.nextTicketCached = nil - s.nodes[t.node] = t + s.nodes[ticket.node] = ticket } } func (s *ticketStore) getNodeTicket(node *Node) *ticket { if s.nodes[node] == nil { - debugLog(fmt.Sprintf("getNodeTicket(%x) sn = nil", node.ID[:8])) + log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil) } else { - debugLog(fmt.Sprintf("getNodeTicket(%x) sn = %v", node.ID[:8], s.nodes[node].serial)) + log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial) } return s.nodes[node] } diff --git a/p2p/discv5/topic.go b/p2p/discv5/topic.go index b6bea013c..e7a7f8e02 100644 --- a/p2p/discv5/topic.go +++ b/p2p/discv5/topic.go @@ -24,6 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -235,7 +236,7 @@ func (t *topicTable) deleteEntry(e *topicEntry) { // It is assumed that topics and waitPeriods have the same length. func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) { - debugLog(fmt.Sprintf("useTicket %v %v %v", serialNo, topics, waitPeriods)) + log.Trace("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods) //fmt.Println("useTicket", serialNo, topics, waitPeriods) t.collectGarbage() |