diff options
author | Janos Guljas <janos@resenje.org> | 2018-02-09 19:23:30 +0800 |
---|---|---|
committer | Janos Guljas <janos@resenje.org> | 2018-02-22 21:23:17 +0800 |
commit | a3a07350dcef0ba39829a20d8ddba4bd3463d293 (patch) | |
tree | 100f2515cadd92105537a12e6981fab2193435ee /p2p/discv5 | |
parent | 820cf09c98706f71d4b02b6c25e62db15830f3fb (diff) | |
parent | 1a4e68721a901e86322631fed1191025a6d14c52 (diff) | |
download | dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.tar dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.tar.gz dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.tar.bz2 dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.tar.lz dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.tar.xz dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.tar.zst dexon-a3a07350dcef0ba39829a20d8ddba4bd3463d293.zip |
swarm, cmd/swarm: Merge branch 'master' into multiple-ens-endpoints
Diffstat (limited to 'p2p/discv5')
-rw-r--r-- | p2p/discv5/database.go | 6 | ||||
-rw-r--r-- | p2p/discv5/net.go | 59 | ||||
-rw-r--r-- | p2p/discv5/net_test.go | 2 | ||||
-rw-r--r-- | p2p/discv5/node.go | 5 | ||||
-rw-r--r-- | p2p/discv5/nodeevent_string.go | 6 | ||||
-rw-r--r-- | p2p/discv5/sim_test.go | 2 | ||||
-rw-r--r-- | p2p/discv5/ticket.go | 227 | ||||
-rw-r--r-- | p2p/discv5/topic.go | 3 | ||||
-rw-r--r-- | p2p/discv5/udp.go | 47 |
9 files changed, 172 insertions, 185 deletions
diff --git a/p2p/discv5/database.go b/p2p/discv5/database.go index a3b044ec1..3c2d5744c 100644 --- a/p2p/discv5/database.go +++ b/p2p/discv5/database.go @@ -239,14 +239,14 @@ func (db *nodeDB) ensureExpirer() { // expirer should be started in a go routine, and is responsible for looping ad // infinitum and dropping stale data from the database. func (db *nodeDB) expirer() { - tick := time.Tick(nodeDBCleanupCycle) + tick := time.NewTicker(nodeDBCleanupCycle) + defer tick.Stop() for { select { - case <-tick: + case <-tick.C: if err := db.expireNodes(); err != nil { log.Error(fmt.Sprintf("Failed to expire nodedb items: %v", err)) } - case <-db.quit: return } diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index 2fbb60824..f9baf126f 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/rlp" ) @@ -51,16 +50,9 @@ const ( const testTopic = "foo" const ( - printDebugLogs = false printTestImgLogs = false ) -func debugLog(s string) { - if printDebugLogs { - fmt.Println(s) - } -} - // Network manages the table and all protocol interaction. type Network struct { db *nodeDB // database of known nodes @@ -141,7 +133,7 @@ type timeoutEvent struct { node *Node } -func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, dbPath string, netrestrict *netutil.Netlist) (*Network, error) { +func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, dbPath string, netrestrict *netutil.Netlist) (*Network, error) { ourID := PubkeyID(&ourPubkey) var db *nodeDB @@ -388,14 +380,14 @@ func (net *Network) loop() { } }() resetNextTicket := func() { - t, timeout := net.ticketStore.nextFilteredTicket() - if t != nextTicket { - nextTicket = t + ticket, timeout := net.ticketStore.nextFilteredTicket() + if nextTicket != ticket { + nextTicket = ticket if nextRegisterTimer != nil { nextRegisterTimer.Stop() nextRegisterTime = nil } - if t != nil { + if ticket != nil { nextRegisterTimer = time.NewTimer(timeout) nextRegisterTime = nextRegisterTimer.C } @@ -423,13 +415,13 @@ loop: select { case <-net.closeReq: - debugLog("<-net.closeReq") + log.Trace("<-net.closeReq") break loop // Ingress packet handling. case pkt := <-net.read: //fmt.Println("read", pkt.ev) - debugLog("<-net.read") + log.Trace("<-net.read") n := net.internNode(&pkt) prestate := n.state status := "ok" @@ -444,7 +436,7 @@ loop: // State transition timeouts. case timeout := <-net.timeout: - debugLog("<-net.timeout") + log.Trace("<-net.timeout") if net.timeoutTimers[timeout] == nil { // Stale timer (was aborted). continue @@ -462,20 +454,20 @@ loop: // Querying. case q := <-net.queryReq: - debugLog("<-net.queryReq") + log.Trace("<-net.queryReq") if !q.start(net) { q.remote.deferQuery(q) } // Interacting with the table. case f := <-net.tableOpReq: - debugLog("<-net.tableOpReq") + log.Trace("<-net.tableOpReq") f() net.tableOpResp <- struct{}{} // Topic registration stuff. case req := <-net.topicRegisterReq: - debugLog("<-net.topicRegisterReq") + log.Trace("<-net.topicRegisterReq") if !req.add { net.ticketStore.removeRegisterTopic(req.topic) continue @@ -486,7 +478,7 @@ loop: // determination for new topics. // if topicRegisterLookupDone == nil { if topicRegisterLookupTarget.target == (common.Hash{}) { - debugLog("topicRegisterLookupTarget == null") + log.Trace("topicRegisterLookupTarget == null") if topicRegisterLookupTick.Stop() { <-topicRegisterLookupTick.C } @@ -496,7 +488,7 @@ loop: } case nodes := <-topicRegisterLookupDone: - debugLog("<-topicRegisterLookupDone") + log.Trace("<-topicRegisterLookupDone") net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte { net.ping(n, n.addr()) return n.pingEcho @@ -507,7 +499,7 @@ loop: topicRegisterLookupDone = nil case <-topicRegisterLookupTick.C: - debugLog("<-topicRegisterLookupTick") + log.Trace("<-topicRegisterLookupTick") if (topicRegisterLookupTarget.target == common.Hash{}) { target, delay := net.ticketStore.nextRegisterLookup() topicRegisterLookupTarget = target @@ -520,14 +512,14 @@ loop: } case <-nextRegisterTime: - debugLog("<-nextRegisterTime") + log.Trace("<-nextRegisterTime") net.ticketStore.ticketRegistered(*nextTicket) //fmt.Println("sendTopicRegister", nextTicket.t.node.addr().String(), nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) case req := <-net.topicSearchReq: if refreshDone == nil { - debugLog("<-net.topicSearchReq") + log.Trace("<-net.topicSearchReq") info, ok := searchInfo[req.topic] if ok { if req.delay == time.Duration(0) { @@ -588,7 +580,7 @@ loop: }) case <-statsDump.C: - debugLog("<-statsDump.C") + log.Trace("<-statsDump.C") /*r, ok := net.ticketStore.radius[testTopic] if !ok { fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now()) @@ -617,7 +609,7 @@ loop: // Periodic / lookup-initiated bucket refresh. case <-refreshTimer.C: - debugLog("<-refreshTimer.C") + log.Trace("<-refreshTimer.C") // TODO: ideally we would start the refresh timer after // fallback nodes have been set for the first time. if refreshDone == nil { @@ -631,7 +623,7 @@ loop: bucketRefreshTimer.Reset(bucketRefreshInterval) }() case newNursery := <-net.refreshReq: - debugLog("<-net.refreshReq") + log.Trace("<-net.refreshReq") if newNursery != nil { net.nursery = newNursery } @@ -641,7 +633,7 @@ loop: } net.refreshResp <- refreshDone case <-refreshDone: - debugLog("<-net.refreshDone") + log.Trace("<-net.refreshDone") refreshDone = nil list := searchReqWhenRefreshDone searchReqWhenRefreshDone = nil @@ -652,7 +644,7 @@ loop: }() } } - debugLog("loop stopped") + log.Trace("loop stopped") log.Debug(fmt.Sprintf("shutting down")) if net.conn != nil { @@ -1109,14 +1101,14 @@ func (net *Network) ping(n *Node, addr *net.UDPAddr) { //fmt.Println(" not sent") return } - debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8])) + log.Trace("Pinging remote node", "node", n.ID) n.pingTopics = net.ticketStore.regTopicSet() n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics) net.timedEvent(respTimeout, n, pongTimeout) } func (net *Network) handlePing(n *Node, pkt *ingressPacket) { - debugLog(fmt.Sprintf("handlePing(node = %x)", n.ID[:8])) + log.Trace("Handling remote ping", "node", n.ID) ping := pkt.data.(*ping) n.TCP = ping.From.TCP t := net.topictab.getTicket(n, ping.Topics) @@ -1131,7 +1123,7 @@ func (net *Network) handlePing(n *Node, pkt *ingressPacket) { } func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error { - debugLog(fmt.Sprintf("handleKnownPong(node = %x)", n.ID[:8])) + log.Trace("Handling known pong", "node", n.ID) net.abortTimedEvent(n, pongTimeout) now := mclock.Now() ticket, err := pongToTicket(now, n.pingTopics, n, pkt) @@ -1139,9 +1131,8 @@ func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error { // fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data) net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket) } else { - debugLog(fmt.Sprintf(" error: %v", err)) + log.Trace("Failed to convert pong to ticket", "err", err) } - n.pingEcho = nil n.pingTopics = nil return err diff --git a/p2p/discv5/net_test.go b/p2p/discv5/net_test.go index bd234f5ba..369282ca9 100644 --- a/p2p/discv5/net_test.go +++ b/p2p/discv5/net_test.go @@ -28,7 +28,7 @@ import ( func TestNetwork_Lookup(t *testing.T) { key, _ := crypto.GenerateKey() - network, err := newNetwork(lookupTestnet, key.PublicKey, nil, "", nil) + network, err := newNetwork(lookupTestnet, key.PublicKey, "", nil) if err != nil { t.Fatal(err) } diff --git a/p2p/discv5/node.go b/p2p/discv5/node.go index 2db7a508f..fd88a55b1 100644 --- a/p2p/discv5/node.go +++ b/p2p/discv5/node.go @@ -273,6 +273,11 @@ func (n NodeID) GoString() string { return fmt.Sprintf("discover.HexID(\"%x\")", n[:]) } +// TerminalString returns a shortened hex string for terminal logging. +func (n NodeID) TerminalString() string { + return hex.EncodeToString(n[:8]) +} + // HexID converts a hex string to a NodeID. // The string may be prefixed with 0x. func HexID(in string) (NodeID, error) { diff --git a/p2p/discv5/nodeevent_string.go b/p2p/discv5/nodeevent_string.go index fde9045c5..eb696fb8b 100644 --- a/p2p/discv5/nodeevent_string.go +++ b/p2p/discv5/nodeevent_string.go @@ -1,8 +1,8 @@ -// Code generated by "stringer -type nodeEvent"; DO NOT EDIT +// Code generated by "stringer -type=nodeEvent"; DO NOT EDIT. package discv5 -import "fmt" +import "strconv" const ( _nodeEvent_name_0 = "invalidEventpingPacketpongPacketfindnodePacketneighborsPacketfindnodeHashPackettopicRegisterPackettopicQueryPackettopicNodesPacket" @@ -22,6 +22,6 @@ func (i nodeEvent) String() string { i -= 265 return _nodeEvent_name_1[_nodeEvent_index_1[i]:_nodeEvent_index_1[i+1]] default: - return fmt.Sprintf("nodeEvent(%d)", i) + return "nodeEvent(" + strconv.FormatInt(int64(i), 10) + ")" } } diff --git a/p2p/discv5/sim_test.go b/p2p/discv5/sim_test.go index bf57872e2..543faecd4 100644 --- a/p2p/discv5/sim_test.go +++ b/p2p/discv5/sim_test.go @@ -282,7 +282,7 @@ func (s *simulation) launchNode(log bool) *Network { addr := &net.UDPAddr{IP: ip, Port: 30303} transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key} - net, err := newNetwork(transport, key.PublicKey, nil, "<no database>", nil) + net, err := newNetwork(transport, key.PublicKey, "<no database>", nil) if err != nil { panic("cannot launch new node: " + err.Error()) } diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 193cef4be..37ce8d23c 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -128,8 +129,11 @@ type ticketStore struct { // Contains buckets (for each absolute minute) of tickets // that can be used in that minute. // This is only set if the topic is being registered. - tickets map[Topic]topicTickets - regtopics []Topic + tickets map[Topic]*topicTickets + + regQueue []Topic // Topic registration queue for round robin attempts + regSet map[Topic]struct{} // Topic registration queue contents for fast filling + nodes map[*Node]*ticket nodeLastReq map[*Node]reqInfo @@ -152,14 +156,16 @@ type sentQuery struct { } type topicTickets struct { - buckets map[timeBucket][]ticketRef - nextLookup, nextReg mclock.AbsTime + buckets map[timeBucket][]ticketRef + nextLookup mclock.AbsTime + nextReg mclock.AbsTime } func newTicketStore() *ticketStore { return &ticketStore{ radius: make(map[Topic]*topicRadius), - tickets: make(map[Topic]topicTickets), + tickets: make(map[Topic]*topicTickets), + regSet: make(map[Topic]struct{}), nodes: make(map[*Node]*ticket), nodeLastReq: make(map[*Node]reqInfo), searchTopicMap: make(map[Topic]searchTopic), @@ -169,13 +175,13 @@ func newTicketStore() *ticketStore { // addTopic starts tracking a topic. If register is true, // the local node will register the topic and tickets will be collected. -func (s *ticketStore) addTopic(t Topic, register bool) { - debugLog(fmt.Sprintf(" addTopic(%v, %v)", t, register)) - if s.radius[t] == nil { - s.radius[t] = newTopicRadius(t) +func (s *ticketStore) addTopic(topic Topic, register bool) { + log.Trace("Adding discovery topic", "topic", topic, "register", register) + if s.radius[topic] == nil { + s.radius[topic] = newTopicRadius(topic) } - if register && s.tickets[t].buckets == nil { - s.tickets[t] = topicTickets{buckets: make(map[timeBucket][]ticketRef)} + if register && s.tickets[topic] == nil { + s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)} } } @@ -194,7 +200,11 @@ func (s *ticketStore) removeSearchTopic(t Topic) { // removeRegisterTopic deletes all tickets for the given topic. func (s *ticketStore) removeRegisterTopic(topic Topic) { - debugLog(fmt.Sprintf(" removeRegisterTopic(%v)", topic)) + log.Trace("Removing discovery topic", "topic", topic) + if s.tickets[topic] == nil { + log.Warn("Removing non-existent discovery topic", "topic", topic) + return + } for _, list := range s.tickets[topic].buckets { for _, ref := range list { ref.t.refCnt-- @@ -216,23 +226,35 @@ func (s *ticketStore) regTopicSet() []Topic { } // nextRegisterLookup returns the target of the next lookup for ticket collection. -func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Duration) { - debugLog("nextRegisterLookup()") - firstTopic, ok := s.iterRegTopics() - for topic := firstTopic; ok; { - debugLog(fmt.Sprintf(" checking topic %v, len(s.tickets[topic]) = %d", topic, len(s.tickets[topic].buckets))) - if s.tickets[topic].buckets != nil && s.needMoreTickets(topic) { - next := s.radius[topic].nextTarget(false) - debugLog(fmt.Sprintf(" %x 1s", next.target[:8])) - return next, 100 * time.Millisecond +func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) { + // Queue up any new topics (or discarded ones), preserving iteration order + for topic := range s.tickets { + if _, ok := s.regSet[topic]; !ok { + s.regQueue = append(s.regQueue, topic) + s.regSet[topic] = struct{}{} + } + } + // Iterate over the set of all topics and look up the next suitable one + for len(s.regQueue) > 0 { + // Fetch the next topic from the queue, and ensure it still exists + topic := s.regQueue[0] + s.regQueue = s.regQueue[1:] + delete(s.regSet, topic) + + if s.tickets[topic] == nil { + continue } - topic, ok = s.iterRegTopics() - if topic == firstTopic { - break // We have checked all topics. + // If the topic needs more tickets, return it + if s.tickets[topic].nextLookup < mclock.Now() { + next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond + log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay) + return next, delay } } - debugLog(" null, 40s") - return lookupInfo{}, 40 * time.Second + // No registration topics found or all exhausted, sleep + delay := 40 * time.Second + log.Trace("No topic found to register", "delay", delay) + return lookupInfo{}, delay } func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { @@ -246,40 +268,22 @@ func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { return target } -// iterRegTopics returns topics to register in arbitrary order. -// The second return value is false if there are no topics. -func (s *ticketStore) iterRegTopics() (Topic, bool) { - debugLog("iterRegTopics()") - if len(s.regtopics) == 0 { - if len(s.tickets) == 0 { - debugLog(" false") - return "", false - } - // Refill register list. - for t := range s.tickets { - s.regtopics = append(s.regtopics, t) - } +// ticketsInWindow returns the tickets of a given topic in the registration window. +func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef { + // Sanity check that the topic still exists before operating on it + if s.tickets[topic] == nil { + log.Warn("Listing non-existing discovery tickets", "topic", topic) + return nil } - topic := s.regtopics[len(s.regtopics)-1] - s.regtopics = s.regtopics[:len(s.regtopics)-1] - debugLog(" " + string(topic) + " true") - return topic, true -} - -func (s *ticketStore) needMoreTickets(t Topic) bool { - return s.tickets[t].nextLookup < mclock.Now() -} + // Gather all the tickers in the next time window + var tickets []ticketRef -// ticketsInWindow returns the tickets of a given topic in the registration window. -func (s *ticketStore) ticketsInWindow(t Topic) []ticketRef { - ltBucket := s.lastBucketFetched - var res []ticketRef - tickets := s.tickets[t].buckets - for g := ltBucket; g < ltBucket+timeWindow; g++ { - res = append(res, tickets[g]...) + buckets := s.tickets[topic].buckets + for idx := timeBucket(0); idx < timeWindow; idx++ { + tickets = append(tickets, buckets[s.lastBucketFetched+idx]...) } - debugLog(fmt.Sprintf("ticketsInWindow(%v) = %v", t, len(res))) - return res + log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets)) + return tickets } func (s *ticketStore) removeExcessTickets(t Topic) { @@ -317,53 +321,55 @@ func (s ticketRefByWaitTime) Swap(i, j int) { func (s *ticketStore) addTicketRef(r ticketRef) { topic := r.t.topics[r.idx] - t := s.tickets[topic] - if t.buckets == nil { + tickets := s.tickets[topic] + if tickets == nil { + log.Warn("Adding ticket to non-existent topic", "topic", topic) return } bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen)) - t.buckets[bucket] = append(t.buckets[bucket], r) + tickets.buckets[bucket] = append(tickets.buckets[bucket], r) r.t.refCnt++ min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt - if t.nextLookup < min { - t.nextLookup = min + if tickets.nextLookup < min { + tickets.nextLookup = min } - t.nextLookup += mclock.AbsTime(collectFrequency) - s.tickets[topic] = t + tickets.nextLookup += mclock.AbsTime(collectFrequency) //s.removeExcessTickets(topic) } -func (s *ticketStore) nextFilteredTicket() (t *ticketRef, wait time.Duration) { +func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) { now := mclock.Now() for { - t, wait = s.nextRegisterableTicket() - if t == nil { - return + ticket, wait := s.nextRegisterableTicket() + if ticket == nil { + return ticket, wait } + log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait) + regTime := now + mclock.AbsTime(wait) - topic := t.t.topics[t.idx] - if regTime >= s.tickets[topic].nextReg { - return + topic := ticket.t.topics[ticket.idx] + if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg { + return ticket, wait } - s.removeTicketRef(*t) + s.removeTicketRef(*ticket) } } -func (s *ticketStore) ticketRegistered(t ticketRef) { +func (s *ticketStore) ticketRegistered(ref ticketRef) { now := mclock.Now() - topic := t.t.topics[t.idx] - tt := s.tickets[topic] + topic := ref.t.topics[ref.idx] + tickets := s.tickets[topic] min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt - if min > tt.nextReg { - tt.nextReg = min + if min > tickets.nextReg { + tickets.nextReg = min } - tt.nextReg += mclock.AbsTime(registerFrequency) - s.tickets[topic] = tt + tickets.nextReg += mclock.AbsTime(registerFrequency) + s.tickets[topic] = tickets - s.removeTicketRef(t) + s.removeTicketRef(ref) } // nextRegisterableTicket returns the next ticket that can be used @@ -374,16 +380,7 @@ func (s *ticketStore) ticketRegistered(t ticketRef) { // // A ticket can be returned more than once with <= zero wait time in case // the ticket contains multiple topics. -func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration) { - defer func() { - if t == nil { - debugLog(" nil") - } else { - debugLog(fmt.Sprintf(" node = %x sn = %v wait = %v", t.t.node.ID[:8], t.t.serial, wait)) - } - }() - - debugLog("nextRegisterableTicket()") +func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) { now := mclock.Now() if s.nextTicketCached != nil { return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now) @@ -412,9 +409,8 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration return nil, 0 } if nextTicket.t != nil { - wait = time.Duration(nextTicket.topicRegTime() - now) s.nextTicketCached = &nextTicket - return &nextTicket, wait + return &nextTicket, time.Duration(nextTicket.topicRegTime() - now) } s.lastBucketFetched = bucket } @@ -422,14 +418,20 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration // removeTicket removes a ticket from the ticket store func (s *ticketStore) removeTicketRef(ref ticketRef) { - debugLog(fmt.Sprintf("removeTicketRef(node = %x sn = %v)", ref.t.node.ID[:8], ref.t.serial)) + log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial) + + // Make nextRegisterableTicket return the next available ticket. + s.nextTicketCached = nil + topic := ref.topic() - tickets := s.tickets[topic].buckets + tickets := s.tickets[topic] + if tickets == nil { + log.Trace("Removing tickets from unknown topic", "topic", topic) return } bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen)) - list := tickets[bucket] + list := tickets.buckets[bucket] idx := -1 for i, bt := range list { if bt.t == ref.t { @@ -442,18 +444,15 @@ func (s *ticketStore) removeTicketRef(ref ticketRef) { } list = append(list[:idx], list[idx+1:]...) if len(list) != 0 { - tickets[bucket] = list + tickets.buckets[bucket] = list } else { - delete(tickets, bucket) + delete(tickets.buckets, bucket) } ref.t.refCnt-- if ref.t.refCnt == 0 { delete(s.nodes, ref.t.node) delete(s.nodeLastReq, ref.t.node) } - - // Make nextRegisterableTicket return the next available ticket. - s.nextTicketCached = nil } type lookupInfo struct { @@ -523,21 +522,21 @@ func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Has } } -func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ticket) { - debugLog(fmt.Sprintf("add(node = %x sn = %v)", t.node.ID[:8], t.serial)) +func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) { + log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial) - lastReq, ok := s.nodeLastReq[t.node] + lastReq, ok := s.nodeLastReq[ticket.node] if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) { return } - s.adjustWithTicket(localTime, lastReq.lookup.target, t) + s.adjustWithTicket(localTime, lastReq.lookup.target, ticket) - if lastReq.lookup.radiusLookup || s.nodes[t.node] != nil { + if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil { return } topic := lastReq.lookup.topic - topicIdx := t.findIdx(topic) + topicIdx := ticket.findIdx(topic) if topicIdx == -1 { return } @@ -548,29 +547,29 @@ func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ti } if _, ok := s.tickets[topic]; ok { - wait := t.regTime[topicIdx] - localTime + wait := ticket.regTime[topicIdx] - localTime rnd := rand.ExpFloat64() if rnd > 10 { rnd = 10 } if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd { // use the ticket to register this topic - //fmt.Println("addTicket", t.node.ID[:8], t.node.addr().String(), t.serial, t.pong) - s.addTicketRef(ticketRef{t, topicIdx}) + //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong) + s.addTicketRef(ticketRef{ticket, topicIdx}) } } - if t.refCnt > 0 { + if ticket.refCnt > 0 { s.nextTicketCached = nil - s.nodes[t.node] = t + s.nodes[ticket.node] = ticket } } func (s *ticketStore) getNodeTicket(node *Node) *ticket { if s.nodes[node] == nil { - debugLog(fmt.Sprintf("getNodeTicket(%x) sn = nil", node.ID[:8])) + log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil) } else { - debugLog(fmt.Sprintf("getNodeTicket(%x) sn = %v", node.ID[:8], s.nodes[node].serial)) + log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial) } return s.nodes[node] } @@ -643,7 +642,7 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod if ip.IsUnspecified() || ip.IsLoopback() { ip = from.IP } - n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1 + n := NewNode(node.ID, ip, node.UDP, node.TCP) select { case chn <- n: default: diff --git a/p2p/discv5/topic.go b/p2p/discv5/topic.go index b6bea013c..e7a7f8e02 100644 --- a/p2p/discv5/topic.go +++ b/p2p/discv5/topic.go @@ -24,6 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -235,7 +236,7 @@ func (t *topicTable) deleteEntry(e *topicEntry) { // It is assumed that topics and waitPeriods have the same length. func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) { - debugLog(fmt.Sprintf("useTicket %v %v %v", serialNo, topics, waitPeriods)) + log.Trace("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods) //fmt.Println("useTicket", serialNo, topics, waitPeriods) t.collectGarbage() diff --git a/p2p/discv5/udp.go b/p2p/discv5/udp.go index 26087cd8e..543771817 100644 --- a/p2p/discv5/udp.go +++ b/p2p/discv5/udp.go @@ -37,7 +37,7 @@ const Version = 4 // Errors var ( errPacketTooSmall = errors.New("too small") - errBadHash = errors.New("bad hash") + errBadPrefix = errors.New("bad prefix") errExpired = errors.New("expired") errUnsolicitedReply = errors.New("unsolicited reply") errUnknownNode = errors.New("unknown node") @@ -145,10 +145,11 @@ type ( } ) -const ( - macSize = 256 / 8 - sigSize = 520 / 8 - headSize = macSize + sigSize // space of packet frame data +var ( + versionPrefix = []byte("temporary discovery v5") + versionPrefixSize = len(versionPrefix) + sigSize = 520 / 8 + headSize = versionPrefixSize + sigSize // space of packet frame data ) // Neighbors replies are sent across multiple packets to @@ -237,30 +238,23 @@ type udp struct { } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Network, error) { - transport, err := listenUDP(priv, laddr) +func ListenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr, nodeDBPath string, netrestrict *netutil.Netlist) (*Network, error) { + transport, err := listenUDP(priv, conn, realaddr) if err != nil { return nil, err } - net, err := newNetwork(transport, priv.PublicKey, natm, nodeDBPath, netrestrict) + net, err := newNetwork(transport, priv.PublicKey, nodeDBPath, netrestrict) if err != nil { return nil, err } + log.Info("UDP listener up", "net", net.tab.self) transport.net = net go transport.readLoop() return net, nil } -func listenUDP(priv *ecdsa.PrivateKey, laddr string) (*udp, error) { - addr, err := net.ResolveUDPAddr("udp", laddr) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - return &udp{conn: conn, priv: priv, ourEndpoint: makeEndpoint(addr, uint16(addr.Port))}, nil +func listenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr) (*udp, error) { + return &udp{conn: conn, priv: priv, ourEndpoint: makeEndpoint(realaddr, uint16(realaddr.Port))}, nil } func (t *udp) localAddr() *net.UDPAddr { @@ -372,11 +366,9 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (p, hash log.Error(fmt.Sprint("could not sign packet:", err)) return nil, nil, err } - copy(packet[macSize:], sig) - // add the hash to the front. Note: this doesn't protect the - // packet in any way. - hash = crypto.Keccak256(packet[macSize:]) - copy(packet, hash) + copy(packet, versionPrefix) + copy(packet[versionPrefixSize:], sig) + hash = crypto.Keccak256(packet[versionPrefixSize:]) return packet, hash, nil } @@ -420,17 +412,16 @@ func decodePacket(buffer []byte, pkt *ingressPacket) error { } buf := make([]byte, len(buffer)) copy(buf, buffer) - hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:] - shouldhash := crypto.Keccak256(buf[macSize:]) - if !bytes.Equal(hash, shouldhash) { - return errBadHash + prefix, sig, sigdata := buf[:versionPrefixSize], buf[versionPrefixSize:headSize], buf[headSize:] + if !bytes.Equal(prefix, versionPrefix) { + return errBadPrefix } fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig) if err != nil { return err } pkt.rawData = buf - pkt.hash = hash + pkt.hash = crypto.Keccak256(buf[versionPrefixSize:]) pkt.remoteID = fromID switch pkt.ev = nodeEvent(sigdata[0]); pkt.ev { case pingPacket: |