aboutsummaryrefslogtreecommitdiffstats
path: root/lds/serverpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'lds/serverpool.go')
-rw-r--r--lds/serverpool.go855
1 files changed, 855 insertions, 0 deletions
diff --git a/lds/serverpool.go b/lds/serverpool.go
new file mode 100644
index 000000000..8e06b2356
--- /dev/null
+++ b/lds/serverpool.go
@@ -0,0 +1,855 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package les implements the Light Ethereum Subprotocol.
+package lds
+
+import (
+ "crypto/ecdsa"
+ "fmt"
+ "io"
+ "math"
+ "math/rand"
+ "net"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common/mclock"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discv5"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/rlp"
+)
+
+const (
+ // After a connection has been ended or timed out, there is a waiting period
+ // before it can be selected for connection again.
+ // waiting period = base delay * (1 + random(1))
+ // base delay = shortRetryDelay for the first shortRetryCnt times after a
+ // successful connection, after that longRetryDelay is applied
+ shortRetryCnt = 5
+ shortRetryDelay = time.Second * 5
+ longRetryDelay = time.Minute * 10
+ // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
+ // If the limit is reached, the least recently discovered one is thrown out.
+ maxNewEntries = 1000
+ // maxKnownEntries is the maximum number of known (already connected) nodes.
+ // If the limit is reached, the least recently connected one is thrown out.
+ // (not that unlike new entries, known entries are persistent)
+ maxKnownEntries = 1000
+ // target for simultaneously connected servers
+ targetServerCount = 5
+ // target for servers selected from the known table
+ // (we leave room for trying new ones if there is any)
+ targetKnownSelect = 3
+ // after dialTimeout, consider the server unavailable and adjust statistics
+ dialTimeout = time.Second * 30
+ // targetConnTime is the minimum expected connection duration before a server
+ // drops a client without any specific reason
+ targetConnTime = time.Minute * 10
+ // new entry selection weight calculation based on most recent discovery time:
+ // unity until discoverExpireStart, then exponential decay with discoverExpireConst
+ discoverExpireStart = time.Minute * 20
+ discoverExpireConst = time.Minute * 20
+ // known entry selection weight is dropped by a factor of exp(-failDropLn) after
+ // each unsuccessful connection (restored after a successful one)
+ failDropLn = 0.1
+ // known node connection success and quality statistics have a long term average
+ // and a short term value which is adjusted exponentially with a factor of
+ // pstatRecentAdjust with each dial/connection and also returned exponentially
+ // to the average with the time constant pstatReturnToMeanTC
+ pstatReturnToMeanTC = time.Hour
+ // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
+ // each unsuccessful connection (restored after a successful one)
+ addrFailDropLn = math.Ln2
+ // responseScoreTC and delayScoreTC are exponential decay time constants for
+ // calculating selection chances from response times and block delay times
+ responseScoreTC = time.Millisecond * 100
+ delayScoreTC = time.Second * 5
+ timeoutPow = 10
+ // initStatsWeight is used to initialize previously unknown peers with good
+ // statistics to give a chance to prove themselves
+ initStatsWeight = 1
+)
+
+// connReq represents a request for peer connection.
+type connReq struct {
+ p *peer
+ node *enode.Node
+ result chan *poolEntry
+}
+
+// disconnReq represents a request for peer disconnection.
+type disconnReq struct {
+ entry *poolEntry
+ stopped bool
+ done chan struct{}
+}
+
+// registerReq represents a request for peer registration.
+type registerReq struct {
+ entry *poolEntry
+ done chan struct{}
+}
+
+// serverPool implements a pool for storing and selecting newly discovered and already
+// known light server nodes. It received discovered nodes, stores statistics about
+// known nodes and takes care of always having enough good quality servers connected.
+type serverPool struct {
+ db ethdb.Database
+ dbKey []byte
+ server *p2p.Server
+ quit chan struct{}
+ wg *sync.WaitGroup
+ connWg sync.WaitGroup
+
+ topic discv5.Topic
+
+ discSetPeriod chan time.Duration
+ discNodes chan *enode.Node
+ discLookups chan bool
+
+ entries map[enode.ID]*poolEntry
+ timeout, enableRetry chan *poolEntry
+ adjustStats chan poolStatAdjust
+
+ connCh chan *connReq
+ disconnCh chan *disconnReq
+ registerCh chan *registerReq
+
+ knownQueue, newQueue poolEntryQueue
+ knownSelect, newSelect *weightedRandomSelect
+ knownSelected, newSelected int
+ fastDiscover bool
+}
+
+// newServerPool creates a new serverPool instance
+func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
+ pool := &serverPool{
+ db: db,
+ quit: quit,
+ wg: wg,
+ entries: make(map[enode.ID]*poolEntry),
+ timeout: make(chan *poolEntry, 1),
+ adjustStats: make(chan poolStatAdjust, 100),
+ enableRetry: make(chan *poolEntry, 1),
+ connCh: make(chan *connReq),
+ disconnCh: make(chan *disconnReq),
+ registerCh: make(chan *registerReq),
+ knownSelect: newWeightedRandomSelect(),
+ newSelect: newWeightedRandomSelect(),
+ fastDiscover: true,
+ }
+ pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
+ pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
+ return pool
+}
+
+func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
+ pool.server = server
+ pool.topic = topic
+ pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
+ pool.wg.Add(1)
+ pool.loadNodes()
+
+ if pool.server.DiscV5 != nil {
+ pool.discSetPeriod = make(chan time.Duration, 1)
+ pool.discNodes = make(chan *enode.Node, 100)
+ pool.discLookups = make(chan bool, 100)
+ go pool.discoverNodes()
+ }
+ pool.checkDial()
+ go pool.eventLoop()
+}
+
+// discoverNodes wraps SearchTopic, converting result nodes to enode.Node.
+func (pool *serverPool) discoverNodes() {
+ ch := make(chan *discv5.Node)
+ go func() {
+ pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups)
+ close(ch)
+ }()
+ for n := range ch {
+ pubkey, err := decodePubkey64(n.ID[:])
+ if err != nil {
+ continue
+ }
+ pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP))
+ }
+}
+
+// connect should be called upon any incoming connection. If the connection has been
+// dialed by the server pool recently, the appropriate pool entry is returned.
+// Otherwise, the connection should be rejected.
+// Note that whenever a connection has been accepted and a pool entry has been returned,
+// disconnect should also always be called.
+func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry {
+ log.Debug("Connect new entry", "enode", p.id)
+ req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)}
+ select {
+ case pool.connCh <- req:
+ case <-pool.quit:
+ return nil
+ }
+ return <-req.result
+}
+
+// registered should be called after a successful handshake
+func (pool *serverPool) registered(entry *poolEntry) {
+ log.Debug("Registered new entry", "enode", entry.node.ID())
+ req := &registerReq{entry: entry, done: make(chan struct{})}
+ select {
+ case pool.registerCh <- req:
+ case <-pool.quit:
+ return
+ }
+ <-req.done
+}
+
+// disconnect should be called when ending a connection. Service quality statistics
+// can be updated optionally (not updated if no registration happened, in this case
+// only connection statistics are updated, just like in case of timeout)
+func (pool *serverPool) disconnect(entry *poolEntry) {
+ stopped := false
+ select {
+ case <-pool.quit:
+ stopped = true
+ default:
+ }
+ log.Debug("Disconnected old entry", "enode", entry.node.ID())
+ req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
+
+ // Block until disconnection request is served.
+ pool.disconnCh <- req
+ <-req.done
+}
+
+const (
+ pseBlockDelay = iota
+ pseResponseTime
+ pseResponseTimeout
+)
+
+// poolStatAdjust records are sent to adjust peer block delay/response time statistics
+type poolStatAdjust struct {
+ adjustType int
+ entry *poolEntry
+ time time.Duration
+}
+
+// adjustBlockDelay adjusts the block announce delay statistics of a node
+func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
+ if entry == nil {
+ return
+ }
+ pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
+}
+
+// adjustResponseTime adjusts the request response time statistics of a node
+func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
+ if entry == nil {
+ return
+ }
+ if timeout {
+ pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
+ } else {
+ pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
+ }
+}
+
+// eventLoop handles pool events and mutex locking for all internal functions
+func (pool *serverPool) eventLoop() {
+ lookupCnt := 0
+ var convTime mclock.AbsTime
+ if pool.discSetPeriod != nil {
+ pool.discSetPeriod <- time.Millisecond * 100
+ }
+
+ // disconnect updates service quality statistics depending on the connection time
+ // and disconnection initiator.
+ disconnect := func(req *disconnReq, stopped bool) {
+ // Handle peer disconnection requests.
+ entry := req.entry
+ if entry.state == psRegistered {
+ connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
+ if connAdjust > 1 {
+ connAdjust = 1
+ }
+ if stopped {
+ // disconnect requested by ourselves.
+ entry.connectStats.add(1, connAdjust)
+ } else {
+ // disconnect requested by server side.
+ entry.connectStats.add(connAdjust, 1)
+ }
+ }
+ entry.state = psNotConnected
+
+ if entry.knownSelected {
+ pool.knownSelected--
+ } else {
+ pool.newSelected--
+ }
+ pool.setRetryDial(entry)
+ pool.connWg.Done()
+ close(req.done)
+ }
+
+ for {
+ select {
+ case entry := <-pool.timeout:
+ if !entry.removed {
+ pool.checkDialTimeout(entry)
+ }
+
+ case entry := <-pool.enableRetry:
+ if !entry.removed {
+ entry.delayedRetry = false
+ pool.updateCheckDial(entry)
+ }
+
+ case adj := <-pool.adjustStats:
+ switch adj.adjustType {
+ case pseBlockDelay:
+ adj.entry.delayStats.add(float64(adj.time), 1)
+ case pseResponseTime:
+ adj.entry.responseStats.add(float64(adj.time), 1)
+ adj.entry.timeoutStats.add(0, 1)
+ case pseResponseTimeout:
+ adj.entry.timeoutStats.add(1, 1)
+ }
+
+ case node := <-pool.discNodes:
+ entry := pool.findOrNewNode(node)
+ pool.updateCheckDial(entry)
+
+ case conv := <-pool.discLookups:
+ if conv {
+ if lookupCnt == 0 {
+ convTime = mclock.Now()
+ }
+ lookupCnt++
+ if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
+ pool.fastDiscover = false
+ if pool.discSetPeriod != nil {
+ pool.discSetPeriod <- time.Minute
+ }
+ }
+ }
+
+ case req := <-pool.connCh:
+ // Handle peer connection requests.
+ entry := pool.entries[req.p.ID()]
+ if entry == nil {
+ entry = pool.findOrNewNode(req.node)
+ }
+ if entry.state == psConnected || entry.state == psRegistered {
+ req.result <- nil
+ continue
+ }
+ pool.connWg.Add(1)
+ entry.peer = req.p
+ entry.state = psConnected
+ addr := &poolEntryAddress{
+ ip: req.node.IP(),
+ port: uint16(req.node.TCP()),
+ lastSeen: mclock.Now(),
+ }
+ entry.lastConnected = addr
+ entry.addr = make(map[string]*poolEntryAddress)
+ entry.addr[addr.strKey()] = addr
+ entry.addrSelect = *newWeightedRandomSelect()
+ entry.addrSelect.update(addr)
+ req.result <- entry
+
+ case req := <-pool.registerCh:
+ // Handle peer registration requests.
+ entry := req.entry
+ entry.state = psRegistered
+ entry.regTime = mclock.Now()
+ if !entry.known {
+ pool.newQueue.remove(entry)
+ entry.known = true
+ }
+ pool.knownQueue.setLatest(entry)
+ entry.shortRetry = shortRetryCnt
+ close(req.done)
+
+ case req := <-pool.disconnCh:
+ // Handle peer disconnection requests.
+ disconnect(req, req.stopped)
+
+ case <-pool.quit:
+ if pool.discSetPeriod != nil {
+ close(pool.discSetPeriod)
+ }
+
+ // Spawn a goroutine to close the disconnCh after all connections are disconnected.
+ go func() {
+ pool.connWg.Wait()
+ close(pool.disconnCh)
+ }()
+
+ // Handle all remaining disconnection requests before exit.
+ for req := range pool.disconnCh {
+ disconnect(req, true)
+ }
+ pool.saveNodes()
+ pool.wg.Done()
+ return
+ }
+ }
+}
+
+func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
+ now := mclock.Now()
+ entry := pool.entries[node.ID()]
+ if entry == nil {
+ log.Debug("Discovered new entry", "id", node.ID())
+ entry = &poolEntry{
+ node: node,
+ addr: make(map[string]*poolEntryAddress),
+ addrSelect: *newWeightedRandomSelect(),
+ shortRetry: shortRetryCnt,
+ }
+ pool.entries[node.ID()] = entry
+ // initialize previously unknown peers with good statistics to give a chance to prove themselves
+ entry.connectStats.add(1, initStatsWeight)
+ entry.delayStats.add(0, initStatsWeight)
+ entry.responseStats.add(0, initStatsWeight)
+ entry.timeoutStats.add(0, initStatsWeight)
+ }
+ entry.lastDiscovered = now
+ addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())}
+ if a, ok := entry.addr[addr.strKey()]; ok {
+ addr = a
+ } else {
+ entry.addr[addr.strKey()] = addr
+ }
+ addr.lastSeen = now
+ entry.addrSelect.update(addr)
+ if !entry.known {
+ pool.newQueue.setLatest(entry)
+ }
+ return entry
+}
+
+// loadNodes loads known nodes and their statistics from the database
+func (pool *serverPool) loadNodes() {
+ enc, err := pool.db.Get(pool.dbKey)
+ if err != nil {
+ return
+ }
+ var list []*poolEntry
+ err = rlp.DecodeBytes(enc, &list)
+ if err != nil {
+ log.Debug("Failed to decode node list", "err", err)
+ return
+ }
+ for _, e := range list {
+ log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails,
+ "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
+ "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
+ "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
+ "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
+ pool.entries[e.node.ID()] = e
+ pool.knownQueue.setLatest(e)
+ pool.knownSelect.update((*knownEntry)(e))
+ }
+}
+
+// saveNodes saves known nodes and their statistics into the database. Nodes are
+// ordered from least to most recently connected.
+func (pool *serverPool) saveNodes() {
+ list := make([]*poolEntry, len(pool.knownQueue.queue))
+ for i := range list {
+ list[i] = pool.knownQueue.fetchOldest()
+ }
+ enc, err := rlp.EncodeToBytes(list)
+ if err == nil {
+ pool.db.Put(pool.dbKey, enc)
+ }
+}
+
+// removeEntry removes a pool entry when the entry count limit is reached.
+// Note that it is called by the new/known queues from which the entry has already
+// been removed so removing it from the queues is not necessary.
+func (pool *serverPool) removeEntry(entry *poolEntry) {
+ pool.newSelect.remove((*discoveredEntry)(entry))
+ pool.knownSelect.remove((*knownEntry)(entry))
+ entry.removed = true
+ delete(pool.entries, entry.node.ID())
+}
+
+// setRetryDial starts the timer which will enable dialing a certain node again
+func (pool *serverPool) setRetryDial(entry *poolEntry) {
+ delay := longRetryDelay
+ if entry.shortRetry > 0 {
+ entry.shortRetry--
+ delay = shortRetryDelay
+ }
+ delay += time.Duration(rand.Int63n(int64(delay) + 1))
+ entry.delayedRetry = true
+ go func() {
+ select {
+ case <-pool.quit:
+ case <-time.After(delay):
+ select {
+ case <-pool.quit:
+ case pool.enableRetry <- entry:
+ }
+ }
+ }()
+}
+
+// updateCheckDial is called when an entry can potentially be dialed again. It updates
+// its selection weights and checks if new dials can/should be made.
+func (pool *serverPool) updateCheckDial(entry *poolEntry) {
+ pool.newSelect.update((*discoveredEntry)(entry))
+ pool.knownSelect.update((*knownEntry)(entry))
+ pool.checkDial()
+}
+
+// checkDial checks if new dials can/should be made. It tries to select servers both
+// based on good statistics and recent discovery.
+func (pool *serverPool) checkDial() {
+ fillWithKnownSelects := !pool.fastDiscover
+ for pool.knownSelected < targetKnownSelect {
+ entry := pool.knownSelect.choose()
+ if entry == nil {
+ fillWithKnownSelects = false
+ break
+ }
+ pool.dial((*poolEntry)(entry.(*knownEntry)), true)
+ }
+ for pool.knownSelected+pool.newSelected < targetServerCount {
+ entry := pool.newSelect.choose()
+ if entry == nil {
+ break
+ }
+ pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
+ }
+ if fillWithKnownSelects {
+ // no more newly discovered nodes to select and since fast discover period
+ // is over, we probably won't find more in the near future so select more
+ // known entries if possible
+ for pool.knownSelected < targetServerCount {
+ entry := pool.knownSelect.choose()
+ if entry == nil {
+ break
+ }
+ pool.dial((*poolEntry)(entry.(*knownEntry)), true)
+ }
+ }
+}
+
+// dial initiates a new connection
+func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
+ if pool.server == nil || entry.state != psNotConnected {
+ return
+ }
+ entry.state = psDialed
+ entry.knownSelected = knownSelected
+ if knownSelected {
+ pool.knownSelected++
+ } else {
+ pool.newSelected++
+ }
+ addr := entry.addrSelect.choose().(*poolEntryAddress)
+ log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
+ entry.dialed = addr
+ go func() {
+ pool.server.AddPeer(entry.node)
+ select {
+ case <-pool.quit:
+ case <-time.After(dialTimeout):
+ select {
+ case <-pool.quit:
+ case pool.timeout <- entry:
+ }
+ }
+ }()
+}
+
+// checkDialTimeout checks if the node is still in dialed state and if so, resets it
+// and adjusts connection statistics accordingly.
+func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
+ if entry.state != psDialed {
+ return
+ }
+ log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey())
+ entry.state = psNotConnected
+ if entry.knownSelected {
+ pool.knownSelected--
+ } else {
+ pool.newSelected--
+ }
+ entry.connectStats.add(0, 1)
+ entry.dialed.fails++
+ pool.setRetryDial(entry)
+}
+
+const (
+ psNotConnected = iota
+ psDialed
+ psConnected
+ psRegistered
+)
+
+// poolEntry represents a server node and stores its current state and statistics.
+type poolEntry struct {
+ peer *peer
+ pubkey [64]byte // secp256k1 key of the node
+ addr map[string]*poolEntryAddress
+ node *enode.Node
+ lastConnected, dialed *poolEntryAddress
+ addrSelect weightedRandomSelect
+
+ lastDiscovered mclock.AbsTime
+ known, knownSelected bool
+ connectStats, delayStats poolStats
+ responseStats, timeoutStats poolStats
+ state int
+ regTime mclock.AbsTime
+ queueIdx int
+ removed bool
+
+ delayedRetry bool
+ shortRetry int
+}
+
+// poolEntryEnc is the RLP encoding of poolEntry.
+type poolEntryEnc struct {
+ Pubkey []byte
+ IP net.IP
+ Port uint16
+ Fails uint
+ CStat, DStat, RStat, TStat poolStats
+}
+
+func (e *poolEntry) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, &poolEntryEnc{
+ Pubkey: encodePubkey64(e.node.Pubkey()),
+ IP: e.lastConnected.ip,
+ Port: e.lastConnected.port,
+ Fails: e.lastConnected.fails,
+ CStat: e.connectStats,
+ DStat: e.delayStats,
+ RStat: e.responseStats,
+ TStat: e.timeoutStats,
+ })
+}
+
+func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
+ var entry poolEntryEnc
+ if err := s.Decode(&entry); err != nil {
+ return err
+ }
+ pubkey, err := decodePubkey64(entry.Pubkey)
+ if err != nil {
+ return err
+ }
+ addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
+ e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
+ e.addr = make(map[string]*poolEntryAddress)
+ e.addr[addr.strKey()] = addr
+ e.addrSelect = *newWeightedRandomSelect()
+ e.addrSelect.update(addr)
+ e.lastConnected = addr
+ e.connectStats = entry.CStat
+ e.delayStats = entry.DStat
+ e.responseStats = entry.RStat
+ e.timeoutStats = entry.TStat
+ e.shortRetry = shortRetryCnt
+ e.known = true
+ return nil
+}
+
+func encodePubkey64(pub *ecdsa.PublicKey) []byte {
+ return crypto.FromECDSAPub(pub)[1:]
+}
+
+func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) {
+ return crypto.UnmarshalPubkey(append([]byte{0x04}, b...))
+}
+
+// discoveredEntry implements wrsItem
+type discoveredEntry poolEntry
+
+// Weight calculates random selection weight for newly discovered entries
+func (e *discoveredEntry) Weight() int64 {
+ if e.state != psNotConnected || e.delayedRetry {
+ return 0
+ }
+ t := time.Duration(mclock.Now() - e.lastDiscovered)
+ if t <= discoverExpireStart {
+ return 1000000000
+ }
+ return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
+}
+
+// knownEntry implements wrsItem
+type knownEntry poolEntry
+
+// Weight calculates random selection weight for known entries
+func (e *knownEntry) Weight() int64 {
+ if e.state != psNotConnected || !e.known || e.delayedRetry {
+ return 0
+ }
+ return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow))
+}
+
+// poolEntryAddress is a separate object because currently it is necessary to remember
+// multiple potential network addresses for a pool entry. This will be removed after
+// the final implementation of v5 discovery which will retrieve signed and serial
+// numbered advertisements, making it clear which IP/port is the latest one.
+type poolEntryAddress struct {
+ ip net.IP
+ port uint16
+ lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
+ fails uint // connection failures since last successful connection (persistent)
+}
+
+func (a *poolEntryAddress) Weight() int64 {
+ t := time.Duration(mclock.Now() - a.lastSeen)
+ return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
+}
+
+func (a *poolEntryAddress) strKey() string {
+ return a.ip.String() + ":" + strconv.Itoa(int(a.port))
+}
+
+// poolStats implement statistics for a certain quantity with a long term average
+// and a short term value which is adjusted exponentially with a factor of
+// pstatRecentAdjust with each update and also returned exponentially to the
+// average with the time constant pstatReturnToMeanTC
+type poolStats struct {
+ sum, weight, avg, recent float64
+ lastRecalc mclock.AbsTime
+}
+
+// init initializes stats with a long term sum/update count pair retrieved from the database
+func (s *poolStats) init(sum, weight float64) {
+ s.sum = sum
+ s.weight = weight
+ var avg float64
+ if weight > 0 {
+ avg = s.sum / weight
+ }
+ s.avg = avg
+ s.recent = avg
+ s.lastRecalc = mclock.Now()
+}
+
+// recalc recalculates recent value return-to-mean and long term average
+func (s *poolStats) recalc() {
+ now := mclock.Now()
+ s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
+ if s.sum == 0 {
+ s.avg = 0
+ } else {
+ if s.sum > s.weight*1e30 {
+ s.avg = 1e30
+ } else {
+ s.avg = s.sum / s.weight
+ }
+ }
+ s.lastRecalc = now
+}
+
+// add updates the stats with a new value
+func (s *poolStats) add(value, weight float64) {
+ s.weight += weight
+ s.sum += value * weight
+ s.recalc()
+}
+
+// recentAvg returns the short-term adjusted average
+func (s *poolStats) recentAvg() float64 {
+ s.recalc()
+ return s.recent
+}
+
+func (s *poolStats) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
+}
+
+func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
+ var stats struct {
+ SumUint, WeightUint uint64
+ }
+ if err := st.Decode(&stats); err != nil {
+ return err
+ }
+ s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
+ return nil
+}
+
+// poolEntryQueue keeps track of its least recently accessed entries and removes
+// them when the number of entries reaches the limit
+type poolEntryQueue struct {
+ queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
+ newPtr, oldPtr, maxCnt int
+ removeFromPool func(*poolEntry)
+}
+
+// newPoolEntryQueue returns a new poolEntryQueue
+func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
+ return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
+}
+
+// fetchOldest returns and removes the least recently accessed entry
+func (q *poolEntryQueue) fetchOldest() *poolEntry {
+ if len(q.queue) == 0 {
+ return nil
+ }
+ for {
+ if e := q.queue[q.oldPtr]; e != nil {
+ delete(q.queue, q.oldPtr)
+ q.oldPtr++
+ return e
+ }
+ q.oldPtr++
+ }
+}
+
+// remove removes an entry from the queue
+func (q *poolEntryQueue) remove(entry *poolEntry) {
+ if q.queue[entry.queueIdx] == entry {
+ delete(q.queue, entry.queueIdx)
+ }
+}
+
+// setLatest adds or updates a recently accessed entry. It also checks if an old entry
+// needs to be removed and removes it from the parent pool too with a callback function.
+func (q *poolEntryQueue) setLatest(entry *poolEntry) {
+ if q.queue[entry.queueIdx] == entry {
+ delete(q.queue, entry.queueIdx)
+ } else {
+ if len(q.queue) == q.maxCnt {
+ e := q.fetchOldest()
+ q.remove(e)
+ q.removeFromPool(e)
+ }
+ }
+ entry.queueIdx = q.newPtr
+ q.queue[entry.queueIdx] = entry
+ q.newPtr++
+}