diff options
Diffstat (limited to 'les/serverpool.go')
-rw-r--r-- | les/serverpool.go | 121 |
1 files changed, 80 insertions, 41 deletions
diff --git a/les/serverpool.go b/les/serverpool.go index 1a4c75229..0fe6e49b6 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -18,6 +18,7 @@ package les import ( + "crypto/ecdsa" "fmt" "io" "math" @@ -28,11 +29,12 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" ) @@ -90,8 +92,7 @@ const ( // connReq represents a request for peer connection. type connReq struct { p *peer - ip net.IP - port uint16 + node *enode.Node result chan *poolEntry } @@ -122,10 +123,10 @@ type serverPool struct { topic discv5.Topic discSetPeriod chan time.Duration - discNodes chan *discv5.Node + discNodes chan *enode.Node discLookups chan bool - entries map[discover.NodeID]*poolEntry + entries map[enode.ID]*poolEntry timeout, enableRetry chan *poolEntry adjustStats chan poolStatAdjust @@ -145,7 +146,7 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s db: db, quit: quit, wg: wg, - entries: make(map[discover.NodeID]*poolEntry), + entries: make(map[enode.ID]*poolEntry), timeout: make(chan *poolEntry, 1), adjustStats: make(chan poolStatAdjust, 100), enableRetry: make(chan *poolEntry, 1), @@ -170,22 +171,38 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { if pool.server.DiscV5 != nil { pool.discSetPeriod = make(chan time.Duration, 1) - pool.discNodes = make(chan *discv5.Node, 100) + pool.discNodes = make(chan *enode.Node, 100) pool.discLookups = make(chan bool, 100) - go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + go pool.discoverNodes() } pool.checkDial() go pool.eventLoop() } +// discoverNodes wraps SearchTopic, converting result nodes to enode.Node. +func (pool *serverPool) discoverNodes() { + ch := make(chan *discv5.Node) + go func() { + pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups) + close(ch) + }() + for n := range ch { + pubkey, err := decodePubkey64(n.ID[:]) + if err != nil { + continue + } + pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP)) + } +} + // connect should be called upon any incoming connection. If the connection has been // dialed by the server pool recently, the appropriate pool entry is returned. // Otherwise, the connection should be rejected. // Note that whenever a connection has been accepted and a pool entry has been returned, // disconnect should also always be called. -func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { +func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry { log.Debug("Connect new entry", "enode", p.id) - req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)} + req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)} select { case pool.connCh <- req: case <-pool.quit: @@ -196,7 +213,7 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { // registered should be called after a successful handshake func (pool *serverPool) registered(entry *poolEntry) { - log.Debug("Registered new entry", "enode", entry.id) + log.Debug("Registered new entry", "enode", entry.node.ID()) req := ®isterReq{entry: entry, done: make(chan struct{})} select { case pool.registerCh <- req: @@ -216,7 +233,7 @@ func (pool *serverPool) disconnect(entry *poolEntry) { stopped = true default: } - log.Debug("Disconnected old entry", "enode", entry.id) + log.Debug("Disconnected old entry", "enode", entry.node.ID()) req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})} // Block until disconnection request is served. @@ -320,7 +337,7 @@ func (pool *serverPool) eventLoop() { } case node := <-pool.discNodes: - entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP) + entry := pool.findOrNewNode(node) pool.updateCheckDial(entry) case conv := <-pool.discLookups: @@ -341,7 +358,7 @@ func (pool *serverPool) eventLoop() { // Handle peer connection requests. entry := pool.entries[req.p.ID()] if entry == nil { - entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port) + entry = pool.findOrNewNode(req.node) } if entry.state == psConnected || entry.state == psRegistered { req.result <- nil @@ -351,8 +368,8 @@ func (pool *serverPool) eventLoop() { entry.peer = req.p entry.state = psConnected addr := &poolEntryAddress{ - ip: req.ip, - port: req.port, + ip: req.node.IP(), + port: uint16(req.node.TCP()), lastSeen: mclock.Now(), } entry.lastConnected = addr @@ -401,18 +418,18 @@ func (pool *serverPool) eventLoop() { } } -func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry { +func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { now := mclock.Now() - entry := pool.entries[id] + entry := pool.entries[node.ID()] if entry == nil { - log.Debug("Discovered new entry", "id", id) + log.Debug("Discovered new entry", "id", node.ID()) entry = &poolEntry{ - id: id, + node: node, addr: make(map[string]*poolEntryAddress), addrSelect: *newWeightedRandomSelect(), shortRetry: shortRetryCnt, } - pool.entries[id] = entry + pool.entries[node.ID()] = entry // initialize previously unknown peers with good statistics to give a chance to prove themselves entry.connectStats.add(1, initStatsWeight) entry.delayStats.add(0, initStatsWeight) @@ -420,10 +437,7 @@ func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16 entry.timeoutStats.add(0, initStatsWeight) } entry.lastDiscovered = now - addr := &poolEntryAddress{ - ip: ip, - port: port, - } + addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())} if a, ok := entry.addr[addr.strKey()]; ok { addr = a } else { @@ -450,12 +464,12 @@ func (pool *serverPool) loadNodes() { return } for _, e := range list { - log.Debug("Loaded server stats", "id", e.id, "fails", e.lastConnected.fails, + log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails, "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight), "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight), "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight), "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight)) - pool.entries[e.id] = e + pool.entries[e.node.ID()] = e pool.knownQueue.setLatest(e) pool.knownSelect.update((*knownEntry)(e)) } @@ -481,7 +495,7 @@ func (pool *serverPool) removeEntry(entry *poolEntry) { pool.newSelect.remove((*discoveredEntry)(entry)) pool.knownSelect.remove((*knownEntry)(entry)) entry.removed = true - delete(pool.entries, entry.id) + delete(pool.entries, entry.node.ID()) } // setRetryDial starts the timer which will enable dialing a certain node again @@ -559,10 +573,10 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { pool.newSelected++ } addr := entry.addrSelect.choose().(*poolEntryAddress) - log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) + log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) entry.dialed = addr go func() { - pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) + pool.server.AddPeer(entry.node) select { case <-pool.quit: case <-time.After(dialTimeout): @@ -580,7 +594,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) { if entry.state != psDialed { return } - log.Debug("Dial timeout", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey()) + log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey()) entry.state = psNotConnected if entry.knownSelected { pool.knownSelected-- @@ -602,8 +616,9 @@ const ( // poolEntry represents a server node and stores its current state and statistics. type poolEntry struct { peer *peer - id discover.NodeID + pubkey [64]byte // secp256k1 key of the node addr map[string]*poolEntryAddress + node *enode.Node lastConnected, dialed *poolEntryAddress addrSelect weightedRandomSelect @@ -620,23 +635,39 @@ type poolEntry struct { shortRetry int } +// poolEntryEnc is the RLP encoding of poolEntry. +type poolEntryEnc struct { + Pubkey []byte + IP net.IP + Port uint16 + Fails uint + CStat, DStat, RStat, TStat poolStats +} + func (e *poolEntry) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats}) + return rlp.Encode(w, &poolEntryEnc{ + Pubkey: encodePubkey64(e.node.Pubkey()), + IP: e.lastConnected.ip, + Port: e.lastConnected.port, + Fails: e.lastConnected.fails, + CStat: e.connectStats, + DStat: e.delayStats, + RStat: e.responseStats, + TStat: e.timeoutStats, + }) } func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { - var entry struct { - ID discover.NodeID - IP net.IP - Port uint16 - Fails uint - CStat, DStat, RStat, TStat poolStats - } + var entry poolEntryEnc if err := s.Decode(&entry); err != nil { return err } + pubkey, err := decodePubkey64(entry.Pubkey) + if err != nil { + return err + } addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()} - e.id = entry.ID + e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port)) e.addr = make(map[string]*poolEntryAddress) e.addr[addr.strKey()] = addr e.addrSelect = *newWeightedRandomSelect() @@ -651,6 +682,14 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { return nil } +func encodePubkey64(pub *ecdsa.PublicKey) []byte { + return crypto.FromECDSAPub(pub)[:1] +} + +func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) { + return crypto.UnmarshalPubkey(append([]byte{0x04}, b...)) +} + // discoveredEntry implements wrsItem type discoveredEntry poolEntry |