aboutsummaryrefslogtreecommitdiffstats
path: root/les/serverpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/serverpool.go')
-rw-r--r--les/serverpool.go121
1 files changed, 80 insertions, 41 deletions
diff --git a/les/serverpool.go b/les/serverpool.go
index 1a4c75229..0fe6e49b6 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -18,6 +18,7 @@
package les
import (
+ "crypto/ecdsa"
"fmt"
"io"
"math"
@@ -28,11 +29,12 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -90,8 +92,7 @@ const (
// connReq represents a request for peer connection.
type connReq struct {
p *peer
- ip net.IP
- port uint16
+ node *enode.Node
result chan *poolEntry
}
@@ -122,10 +123,10 @@ type serverPool struct {
topic discv5.Topic
discSetPeriod chan time.Duration
- discNodes chan *discv5.Node
+ discNodes chan *enode.Node
discLookups chan bool
- entries map[discover.NodeID]*poolEntry
+ entries map[enode.ID]*poolEntry
timeout, enableRetry chan *poolEntry
adjustStats chan poolStatAdjust
@@ -145,7 +146,7 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
db: db,
quit: quit,
wg: wg,
- entries: make(map[discover.NodeID]*poolEntry),
+ entries: make(map[enode.ID]*poolEntry),
timeout: make(chan *poolEntry, 1),
adjustStats: make(chan poolStatAdjust, 100),
enableRetry: make(chan *poolEntry, 1),
@@ -170,22 +171,38 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
if pool.server.DiscV5 != nil {
pool.discSetPeriod = make(chan time.Duration, 1)
- pool.discNodes = make(chan *discv5.Node, 100)
+ pool.discNodes = make(chan *enode.Node, 100)
pool.discLookups = make(chan bool, 100)
- go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
+ go pool.discoverNodes()
}
pool.checkDial()
go pool.eventLoop()
}
+// discoverNodes wraps SearchTopic, converting result nodes to enode.Node.
+func (pool *serverPool) discoverNodes() {
+ ch := make(chan *discv5.Node)
+ go func() {
+ pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups)
+ close(ch)
+ }()
+ for n := range ch {
+ pubkey, err := decodePubkey64(n.ID[:])
+ if err != nil {
+ continue
+ }
+ pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP))
+ }
+}
+
// connect should be called upon any incoming connection. If the connection has been
// dialed by the server pool recently, the appropriate pool entry is returned.
// Otherwise, the connection should be rejected.
// Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called.
-func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
+func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry {
log.Debug("Connect new entry", "enode", p.id)
- req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)}
+ req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)}
select {
case pool.connCh <- req:
case <-pool.quit:
@@ -196,7 +213,7 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
// registered should be called after a successful handshake
func (pool *serverPool) registered(entry *poolEntry) {
- log.Debug("Registered new entry", "enode", entry.id)
+ log.Debug("Registered new entry", "enode", entry.node.ID())
req := &registerReq{entry: entry, done: make(chan struct{})}
select {
case pool.registerCh <- req:
@@ -216,7 +233,7 @@ func (pool *serverPool) disconnect(entry *poolEntry) {
stopped = true
default:
}
- log.Debug("Disconnected old entry", "enode", entry.id)
+ log.Debug("Disconnected old entry", "enode", entry.node.ID())
req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
// Block until disconnection request is served.
@@ -320,7 +337,7 @@ func (pool *serverPool) eventLoop() {
}
case node := <-pool.discNodes:
- entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
+ entry := pool.findOrNewNode(node)
pool.updateCheckDial(entry)
case conv := <-pool.discLookups:
@@ -341,7 +358,7 @@ func (pool *serverPool) eventLoop() {
// Handle peer connection requests.
entry := pool.entries[req.p.ID()]
if entry == nil {
- entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port)
+ entry = pool.findOrNewNode(req.node)
}
if entry.state == psConnected || entry.state == psRegistered {
req.result <- nil
@@ -351,8 +368,8 @@ func (pool *serverPool) eventLoop() {
entry.peer = req.p
entry.state = psConnected
addr := &poolEntryAddress{
- ip: req.ip,
- port: req.port,
+ ip: req.node.IP(),
+ port: uint16(req.node.TCP()),
lastSeen: mclock.Now(),
}
entry.lastConnected = addr
@@ -401,18 +418,18 @@ func (pool *serverPool) eventLoop() {
}
}
-func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
+func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
now := mclock.Now()
- entry := pool.entries[id]
+ entry := pool.entries[node.ID()]
if entry == nil {
- log.Debug("Discovered new entry", "id", id)
+ log.Debug("Discovered new entry", "id", node.ID())
entry = &poolEntry{
- id: id,
+ node: node,
addr: make(map[string]*poolEntryAddress),
addrSelect: *newWeightedRandomSelect(),
shortRetry: shortRetryCnt,
}
- pool.entries[id] = entry
+ pool.entries[node.ID()] = entry
// initialize previously unknown peers with good statistics to give a chance to prove themselves
entry.connectStats.add(1, initStatsWeight)
entry.delayStats.add(0, initStatsWeight)
@@ -420,10 +437,7 @@ func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16
entry.timeoutStats.add(0, initStatsWeight)
}
entry.lastDiscovered = now
- addr := &poolEntryAddress{
- ip: ip,
- port: port,
- }
+ addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())}
if a, ok := entry.addr[addr.strKey()]; ok {
addr = a
} else {
@@ -450,12 +464,12 @@ func (pool *serverPool) loadNodes() {
return
}
for _, e := range list {
- log.Debug("Loaded server stats", "id", e.id, "fails", e.lastConnected.fails,
+ log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails,
"conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
"delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
"response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
"timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
- pool.entries[e.id] = e
+ pool.entries[e.node.ID()] = e
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
}
@@ -481,7 +495,7 @@ func (pool *serverPool) removeEntry(entry *poolEntry) {
pool.newSelect.remove((*discoveredEntry)(entry))
pool.knownSelect.remove((*knownEntry)(entry))
entry.removed = true
- delete(pool.entries, entry.id)
+ delete(pool.entries, entry.node.ID())
}
// setRetryDial starts the timer which will enable dialing a certain node again
@@ -559,10 +573,10 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
pool.newSelected++
}
addr := entry.addrSelect.choose().(*poolEntryAddress)
- log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
+ log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
entry.dialed = addr
go func() {
- pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
+ pool.server.AddPeer(entry.node)
select {
case <-pool.quit:
case <-time.After(dialTimeout):
@@ -580,7 +594,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
if entry.state != psDialed {
return
}
- log.Debug("Dial timeout", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey())
+ log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey())
entry.state = psNotConnected
if entry.knownSelected {
pool.knownSelected--
@@ -602,8 +616,9 @@ const (
// poolEntry represents a server node and stores its current state and statistics.
type poolEntry struct {
peer *peer
- id discover.NodeID
+ pubkey [64]byte // secp256k1 key of the node
addr map[string]*poolEntryAddress
+ node *enode.Node
lastConnected, dialed *poolEntryAddress
addrSelect weightedRandomSelect
@@ -620,23 +635,39 @@ type poolEntry struct {
shortRetry int
}
+// poolEntryEnc is the RLP encoding of poolEntry.
+type poolEntryEnc struct {
+ Pubkey []byte
+ IP net.IP
+ Port uint16
+ Fails uint
+ CStat, DStat, RStat, TStat poolStats
+}
+
func (e *poolEntry) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
+ return rlp.Encode(w, &poolEntryEnc{
+ Pubkey: encodePubkey64(e.node.Pubkey()),
+ IP: e.lastConnected.ip,
+ Port: e.lastConnected.port,
+ Fails: e.lastConnected.fails,
+ CStat: e.connectStats,
+ DStat: e.delayStats,
+ RStat: e.responseStats,
+ TStat: e.timeoutStats,
+ })
}
func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
- var entry struct {
- ID discover.NodeID
- IP net.IP
- Port uint16
- Fails uint
- CStat, DStat, RStat, TStat poolStats
- }
+ var entry poolEntryEnc
if err := s.Decode(&entry); err != nil {
return err
}
+ pubkey, err := decodePubkey64(entry.Pubkey)
+ if err != nil {
+ return err
+ }
addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
- e.id = entry.ID
+ e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
e.addr = make(map[string]*poolEntryAddress)
e.addr[addr.strKey()] = addr
e.addrSelect = *newWeightedRandomSelect()
@@ -651,6 +682,14 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
return nil
}
+func encodePubkey64(pub *ecdsa.PublicKey) []byte {
+ return crypto.FromECDSAPub(pub)[:1]
+}
+
+func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) {
+ return crypto.UnmarshalPubkey(append([]byte{0x04}, b...))
+}
+
// discoveredEntry implements wrsItem
type discoveredEntry poolEntry