aboutsummaryrefslogblamecommitdiffstats
path: root/les/serverpool.go
blob: 95a8242b3279843d13f93f33fe735f5df3828d69 (plain) (tree)



















                                                                                  
             









                                                       
                                             




























                                                                                           


                                                                                     















                                                                                             










                                                                                       



















                                                                                      
                                                
















                                                                                                                                                    
                                                             


























                                                                                                              
                                                                             

                                
                                     
                         
                                                            
         
                                                                                
                                                                      


                          
                      















                                                           
                                                                  



                                  
                                    





                                           




                                                                                   
                                                      
                                                                    


                                
















                                                                         
         
 





                                    



                                














                                                                                      


                         




                                                                                                


                         









                                                                                   
                            





                                         



                                                                                                                          
                        




                                    


                                                

                                                                                                                                            


                         








                                                                                                                                                                        

                              

































                                                                                                                              
         

 



                                                                             


                                                            
















                                                            












                                                                                 

                                              
                                                                                                










                                                                                                                                 


                                                                                 



                                 


                                                         








                                          



                                                                                              
                                                                    






























                                                                                                                








                                                                     
                                                                         

                      
                                
                                                                                                                                                                                                                                                                                                                                                                                                                









                                                                                
                             




























































































                                                                                          
                                                                                                                                      



















                                                                                              
                                                                                         





                                    
                                    












                                                                                  
                                   




                                                          







                                                  





                                                  
                                                                                                                                                                                     



                                                    




                                                          











                                                                                                             


                                     




























                                                                                                                 
                                                                                                                                                                                                                                                                               


























                                                                                                                    

                                               


                                                                                            
                                               
                   
                         
                       

                                    









                                                                                                             







                                                




                                         


                                                









                                                    
                                                                                                



                                                     
                                          



                                                 
                                                                                           





















































                                                                                                              
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package les implements the Light Ethereum Subprotocol.
package les

import (
    "fmt"
    "io"
    "math"
    "math/rand"
    "net"
    "strconv"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common/mclock"
    "github.com/ethereum/go-ethereum/ethdb"
    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/discover"
    "github.com/ethereum/go-ethereum/p2p/discv5"
    "github.com/ethereum/go-ethereum/rlp"
)

const (
    // After a connection has been ended or timed out, there is a waiting period
    // before it can be selected for connection again.
    // waiting period = base delay * (1 + random(1))
    // base delay = shortRetryDelay for the first shortRetryCnt times after a
    // successful connection, after that longRetryDelay is applied
    shortRetryCnt   = 5
    shortRetryDelay = time.Second * 5
    longRetryDelay  = time.Minute * 10
    // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
    // If the limit is reached, the least recently discovered one is thrown out.
    maxNewEntries = 1000
    // maxKnownEntries is the maximum number of known (already connected) nodes.
    // If the limit is reached, the least recently connected one is thrown out.
    // (not that unlike new entries, known entries are persistent)
    maxKnownEntries = 1000
    // target for simultaneously connected servers
    targetServerCount = 5
    // target for servers selected from the known table
    // (we leave room for trying new ones if there is any)
    targetKnownSelect = 3
    // after dialTimeout, consider the server unavailable and adjust statistics
    dialTimeout = time.Second * 30
    // targetConnTime is the minimum expected connection duration before a server
    // drops a client without any specific reason
    targetConnTime = time.Minute * 10
    // new entry selection weight calculation based on most recent discovery time:
    // unity until discoverExpireStart, then exponential decay with discoverExpireConst
    discoverExpireStart = time.Minute * 20
    discoverExpireConst = time.Minute * 20
    // known entry selection weight is dropped by a factor of exp(-failDropLn) after
    // each unsuccessful connection (restored after a successful one)
    failDropLn = 0.1
    // known node connection success and quality statistics have a long term average
    // and a short term value which is adjusted exponentially with a factor of
    // pstatRecentAdjust with each dial/connection and also returned exponentially
    // to the average with the time constant pstatReturnToMeanTC
    pstatRecentAdjust   = 0.1
    pstatReturnToMeanTC = time.Hour
    // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
    // each unsuccessful connection (restored after a successful one)
    addrFailDropLn = math.Ln2
    // responseScoreTC and delayScoreTC are exponential decay time constants for
    // calculating selection chances from response times and block delay times
    responseScoreTC = time.Millisecond * 100
    delayScoreTC    = time.Second * 5
    timeoutPow      = 10
    // peerSelectMinWeight is added to calculated weights at request peer selection
    // to give poorly performing peers a little chance of coming back
    peerSelectMinWeight = 0.005
    // initStatsWeight is used to initialize previously unknown peers with good
    // statistics to give a chance to prove themselves
    initStatsWeight = 1
)

// serverPool implements a pool for storing and selecting newly discovered and already
// known light server nodes. It received discovered nodes, stores statistics about
// known nodes and takes care of always having enough good quality servers connected.
type serverPool struct {
    db     ethdb.Database
    dbKey  []byte
    server *p2p.Server
    quit   chan struct{}
    wg     *sync.WaitGroup
    connWg sync.WaitGroup

    discSetPeriod chan time.Duration
    discNodes     chan *discv5.Node
    discLookups   chan bool

    entries              map[discover.NodeID]*poolEntry
    lock                 sync.Mutex
    timeout, enableRetry chan *poolEntry
    adjustStats          chan poolStatAdjust

    knownQueue, newQueue       poolEntryQueue
    knownSelect, newSelect     *weightedRandomSelect
    knownSelected, newSelected int
    fastDiscover               bool
}

// newServerPool creates a new serverPool instance
func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
    pool := &serverPool{
        db:           db,
        dbKey:        append(dbPrefix, []byte(topic)...),
        server:       server,
        quit:         quit,
        wg:           wg,
        entries:      make(map[discover.NodeID]*poolEntry),
        timeout:      make(chan *poolEntry, 1),
        adjustStats:  make(chan poolStatAdjust, 100),
        enableRetry:  make(chan *poolEntry, 1),
        knownSelect:  newWeightedRandomSelect(),
        newSelect:    newWeightedRandomSelect(),
        fastDiscover: true,
    }
    pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
    pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
    wg.Add(1)
    pool.loadNodes()
    pool.checkDial()

    if pool.server.DiscV5 != nil {
        pool.discSetPeriod = make(chan time.Duration, 1)
        pool.discNodes = make(chan *discv5.Node, 100)
        pool.discLookups = make(chan bool, 100)
        go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
    }

    go pool.eventLoop()
    return pool
}

// connect should be called upon any incoming connection. If the connection has been
// dialed by the server pool recently, the appropriate pool entry is returned.
// Otherwise, the connection should be rejected.
// Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called.
func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
    pool.lock.Lock()
    defer pool.lock.Unlock()
    entry := pool.entries[p.ID()]
    if entry == nil {
        entry = pool.findOrNewNode(p.ID(), ip, port)
    }
    log.Debug(fmt.Sprintf("connecting to %v, state: %v", p.id, entry.state))
    if entry.state == psConnected || entry.state == psRegistered {
        return nil
    }
    pool.connWg.Add(1)
    entry.peer = p
    entry.state = psConnected
    addr := &poolEntryAddress{
        ip:       ip,
        port:     port,
        lastSeen: mclock.Now(),
    }
    entry.lastConnected = addr
    entry.addr = make(map[string]*poolEntryAddress)
    entry.addr[addr.strKey()] = addr
    entry.addrSelect = *newWeightedRandomSelect()
    entry.addrSelect.update(addr)
    return entry
}

// registered should be called after a successful handshake
func (pool *serverPool) registered(entry *poolEntry) {
    log.Debug(fmt.Sprintf("registered %v", entry.id.String()))
    pool.lock.Lock()
    defer pool.lock.Unlock()

    entry.state = psRegistered
    entry.regTime = mclock.Now()
    if !entry.known {
        pool.newQueue.remove(entry)
        entry.known = true
    }
    pool.knownQueue.setLatest(entry)
    entry.shortRetry = shortRetryCnt
}

// disconnect should be called when ending a connection. Service quality statistics
// can be updated optionally (not updated if no registration happened, in this case
// only connection statistics are updated, just like in case of timeout)
func (pool *serverPool) disconnect(entry *poolEntry) {
    log.Debug(fmt.Sprintf("disconnected %v", entry.id.String()))
    pool.lock.Lock()
    defer pool.lock.Unlock()

    if entry.state == psRegistered {
        connTime := mclock.Now() - entry.regTime
        connAdjust := float64(connTime) / float64(targetConnTime)
        if connAdjust > 1 {
            connAdjust = 1
        }
        stopped := false
        select {
        case <-pool.quit:
            stopped = true
        default:
        }
        if stopped {
            entry.connectStats.add(1, connAdjust)
        } else {
            entry.connectStats.add(connAdjust, 1)
        }
    }

    entry.state = psNotConnected
    if entry.knownSelected {
        pool.knownSelected--
    } else {
        pool.newSelected--
    }
    pool.setRetryDial(entry)
    pool.connWg.Done()
}

const (
    pseBlockDelay = iota
    pseResponseTime
    pseResponseTimeout
)

// poolStatAdjust records are sent to adjust peer block delay/response time statistics
type poolStatAdjust struct {
    adjustType int
    entry      *poolEntry
    time       time.Duration
}

// adjustBlockDelay adjusts the block announce delay statistics of a node
func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
    if entry == nil {
        return
    }
    pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
}

// adjustResponseTime adjusts the request response time statistics of a node
func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
    if entry == nil {
        return
    }
    if timeout {
        pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
    } else {
        pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
    }
}

type selectPeerItem struct {
    peer   *peer
    weight int64
    wait   time.Duration
}

func (sp selectPeerItem) Weight() int64 {
    return sp.weight
}

// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
    pool.lock.Lock()
    type selectPeer struct {
        peer         *peer
        rstat, tstat float64
    }
    var list []selectPeer
    sel := newWeightedRandomSelect()
    for _, entry := range pool.entries {
        if entry.state == psRegistered {
            if !entry.peer.fcServer.IsAssigned() {
                list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
            }
        }
    }
    pool.lock.Unlock()

    for _, sp := range list {
        ok, wait := canSend(sp.peer)
        if ok {
            w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
            sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
        }
    }
    choice := sel.choose()
    if choice == nil {
        return nil, 0, false
    }
    peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
    locked := false
    if wait < time.Millisecond*100 {
        if peer.fcServer.AssignRequest(reqID) {
            ok, w := canSend(peer)
            wait = time.Duration(w)
            if ok && wait < time.Millisecond*100 {
                locked = true
            } else {
                peer.fcServer.DeassignRequest(reqID)
                wait = time.Millisecond * 100
            }
        }
    } else {
        wait = time.Millisecond * 100
    }
    return peer, wait, locked
}

// selectPeer selects a suitable peer for a request, waiting until an assignment to
// the request is guaranteed or the process is aborted.
func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
    for {
        peer, wait, locked := pool.selectPeer(reqID, canSend)
        if locked {
            return peer
        }
        select {
        case <-abort:
            return nil
        case <-time.After(wait):
        }
    }
}

// eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() {
    lookupCnt := 0
    var convTime mclock.AbsTime
    if pool.discSetPeriod != nil {
        pool.discSetPeriod <- time.Millisecond * 100
    }
    for {
        select {
        case entry := <-pool.timeout:
            pool.lock.Lock()
            if !entry.removed {
                pool.checkDialTimeout(entry)
            }
            pool.lock.Unlock()

        case entry := <-pool.enableRetry:
            pool.lock.Lock()
            if !entry.removed {
                entry.delayedRetry = false
                pool.updateCheckDial(entry)
            }
            pool.lock.Unlock()

        case adj := <-pool.adjustStats:
            pool.lock.Lock()
            switch adj.adjustType {
            case pseBlockDelay:
                adj.entry.delayStats.add(float64(adj.time), 1)
            case pseResponseTime:
                adj.entry.responseStats.add(float64(adj.time), 1)
                adj.entry.timeoutStats.add(0, 1)
            case pseResponseTimeout:
                adj.entry.timeoutStats.add(1, 1)
            }
            pool.lock.Unlock()

        case node := <-pool.discNodes:
            pool.lock.Lock()
            entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
            pool.updateCheckDial(entry)
            pool.lock.Unlock()

        case conv := <-pool.discLookups:
            if conv {
                if lookupCnt == 0 {
                    convTime = mclock.Now()
                }
                lookupCnt++
                if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
                    pool.fastDiscover = false
                    if pool.discSetPeriod != nil {
                        pool.discSetPeriod <- time.Minute
                    }
                }
            }

        case <-pool.quit:
            if pool.discSetPeriod != nil {
                close(pool.discSetPeriod)
            }
            pool.connWg.Wait()
            pool.saveNodes()
            pool.wg.Done()
            return

        }
    }
}

func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
    now := mclock.Now()
    entry := pool.entries[id]
    if entry == nil {
        log.Debug(fmt.Sprintf("discovered %v", id.String()))
        entry = &poolEntry{
            id:         id,
            addr:       make(map[string]*poolEntryAddress),
            addrSelect: *newWeightedRandomSelect(),
            shortRetry: shortRetryCnt,
        }
        pool.entries[id] = entry
        // initialize previously unknown peers with good statistics to give a chance to prove themselves
        entry.connectStats.add(1, initStatsWeight)
        entry.delayStats.add(0, initStatsWeight)
        entry.responseStats.add(0, initStatsWeight)
        entry.timeoutStats.add(0, initStatsWeight)
    }
    entry.lastDiscovered = now
    addr := &poolEntryAddress{
        ip:   ip,
        port: port,
    }
    if a, ok := entry.addr[addr.strKey()]; ok {
        addr = a
    } else {
        entry.addr[addr.strKey()] = addr
    }
    addr.lastSeen = now
    entry.addrSelect.update(addr)
    if !entry.known {
        pool.newQueue.setLatest(entry)
    }
    return entry
}

// loadNodes loads known nodes and their statistics from the database
func (pool *serverPool) loadNodes() {
    enc, err := pool.db.Get(pool.dbKey)
    if err != nil {
        return
    }
    var list []*poolEntry
    err = rlp.DecodeBytes(enc, &list)
    if err != nil {
        log.Debug(fmt.Sprintf("node list decode error: %v", err))
        return
    }
    for _, e := range list {
        log.Debug(fmt.Sprintf("loaded server stats %016x  fails: %v  connStats: %v / %v  delayStats: %v / %v  responseStats: %v / %v  timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight))
        pool.entries[e.id] = e
        pool.knownQueue.setLatest(e)
        pool.knownSelect.update((*knownEntry)(e))
    }
}

// saveNodes saves known nodes and their statistics into the database. Nodes are
// ordered from least to most recently connected.
func (pool *serverPool) saveNodes() {
    list := make([]*poolEntry, len(pool.knownQueue.queue))
    for i := range list {
        list[i] = pool.knownQueue.fetchOldest()
    }
    enc, err := rlp.EncodeToBytes(list)
    if err == nil {
        pool.db.Put(pool.dbKey, enc)
    }
}

// removeEntry removes a pool entry when the entry count limit is reached.
// Note that it is called by the new/known queues from which the entry has already
// been removed so removing it from the queues is not necessary.
func (pool *serverPool) removeEntry(entry *poolEntry) {
    pool.newSelect.remove((*discoveredEntry)(entry))
    pool.knownSelect.remove((*knownEntry)(entry))
    entry.removed = true
    delete(pool.entries, entry.id)
}

// setRetryDial starts the timer which will enable dialing a certain node again
func (pool *serverPool) setRetryDial(entry *poolEntry) {
    delay := longRetryDelay
    if entry.shortRetry > 0 {
        entry.shortRetry--
        delay = shortRetryDelay
    }
    delay += time.Duration(rand.Int63n(int64(delay) + 1))
    entry.delayedRetry = true
    go func() {
        select {
        case <-pool.quit:
        case <-time.After(delay):
            select {
            case <-pool.quit:
            case pool.enableRetry <- entry:
            }
        }
    }()
}

// updateCheckDial is called when an entry can potentially be dialed again. It updates
// its selection weights and checks if new dials can/should be made.
func (pool *serverPool) updateCheckDial(entry *poolEntry) {
    pool.newSelect.update((*discoveredEntry)(entry))
    pool.knownSelect.update((*knownEntry)(entry))
    pool.checkDial()
}

// checkDial checks if new dials can/should be made. It tries to select servers both
// based on good statistics and recent discovery.
func (pool *serverPool) checkDial() {
    fillWithKnownSelects := !pool.fastDiscover
    for pool.knownSelected < targetKnownSelect {
        entry := pool.knownSelect.choose()
        if entry == nil {
            fillWithKnownSelects = false
            break
        }
        pool.dial((*poolEntry)(entry.(*knownEntry)), true)
    }
    for pool.knownSelected+pool.newSelected < targetServerCount {
        entry := pool.newSelect.choose()
        if entry == nil {
            break
        }
        pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
    }
    if fillWithKnownSelects {
        // no more newly discovered nodes to select and since fast discover period
        // is over, we probably won't find more in the near future so select more
        // known entries if possible
        for pool.knownSelected < targetServerCount {
            entry := pool.knownSelect.choose()
            if entry == nil {
                break
            }
            pool.dial((*poolEntry)(entry.(*knownEntry)), true)
        }
    }
}

// dial initiates a new connection
func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
    if entry.state != psNotConnected {
        return
    }
    entry.state = psDialed
    entry.knownSelected = knownSelected
    if knownSelected {
        pool.knownSelected++
    } else {
        pool.newSelected++
    }
    addr := entry.addrSelect.choose().(*poolEntryAddress)
    log.Debug(fmt.Sprintf("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected))
    entry.dialed = addr
    go func() {
        pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
        select {
        case <-pool.quit:
        case <-time.After(dialTimeout):
            select {
            case <-pool.quit:
            case pool.timeout <- entry:
            }
        }
    }()
}

// checkDialTimeout checks if the node is still in dialed state and if so, resets it
// and adjusts connection statistics accordingly.
func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
    if entry.state != psDialed {
        return
    }
    log.Debug(fmt.Sprintf("timeout %v", entry.id.String()+"@"+entry.dialed.strKey()))
    entry.state = psNotConnected
    if entry.knownSelected {
        pool.knownSelected--
    } else {
        pool.newSelected--
    }
    entry.connectStats.add(0, 1)
    entry.dialed.fails++
    pool.setRetryDial(entry)
}

const (
    psNotConnected = iota
    psDialed
    psConnected
    psRegistered
)

// poolEntry represents a server node and stores its current state and statistics.
type poolEntry struct {
    peer                  *peer
    id                    discover.NodeID
    addr                  map[string]*poolEntryAddress
    lastConnected, dialed *poolEntryAddress
    addrSelect            weightedRandomSelect

    lastDiscovered              mclock.AbsTime
    known, knownSelected        bool
    connectStats, delayStats    poolStats
    responseStats, timeoutStats poolStats
    state                       int
    regTime                     mclock.AbsTime
    queueIdx                    int
    removed                     bool

    delayedRetry bool
    shortRetry   int
}

func (e *poolEntry) EncodeRLP(w io.Writer) error {
    return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
}

func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
    var entry struct {
        ID                         discover.NodeID
        IP                         net.IP
        Port                       uint16
        Fails                      uint
        CStat, DStat, RStat, TStat poolStats
    }
    if err := s.Decode(&entry); err != nil {
        return err
    }
    addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
    e.id = entry.ID
    e.addr = make(map[string]*poolEntryAddress)
    e.addr[addr.strKey()] = addr
    e.addrSelect = *newWeightedRandomSelect()
    e.addrSelect.update(addr)
    e.lastConnected = addr
    e.connectStats = entry.CStat
    e.delayStats = entry.DStat
    e.responseStats = entry.RStat
    e.timeoutStats = entry.TStat
    e.shortRetry = shortRetryCnt
    e.known = true
    return nil
}

// discoveredEntry implements wrsItem
type discoveredEntry poolEntry

// Weight calculates random selection weight for newly discovered entries
func (e *discoveredEntry) Weight() int64 {
    if e.state != psNotConnected || e.delayedRetry {
        return 0
    }
    t := time.Duration(mclock.Now() - e.lastDiscovered)
    if t <= discoverExpireStart {
        return 1000000000
    } else {
        return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
    }
}

// knownEntry implements wrsItem
type knownEntry poolEntry

// Weight calculates random selection weight for known entries
func (e *knownEntry) Weight() int64 {
    if e.state != psNotConnected || !e.known || e.delayedRetry {
        return 0
    }
    return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
}

// poolEntryAddress is a separate object because currently it is necessary to remember
// multiple potential network addresses for a pool entry. This will be removed after
// the final implementation of v5 discovery which will retrieve signed and serial
// numbered advertisements, making it clear which IP/port is the latest one.
type poolEntryAddress struct {
    ip       net.IP
    port     uint16
    lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
    fails    uint           // connection failures since last successful connection (persistent)
}

func (a *poolEntryAddress) Weight() int64 {
    t := time.Duration(mclock.Now() - a.lastSeen)
    return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
}

func (a *poolEntryAddress) strKey() string {
    return a.ip.String() + ":" + strconv.Itoa(int(a.port))
}

// poolStats implement statistics for a certain quantity with a long term average
// and a short term value which is adjusted exponentially with a factor of
// pstatRecentAdjust with each update and also returned exponentially to the
// average with the time constant pstatReturnToMeanTC
type poolStats struct {
    sum, weight, avg, recent float64
    lastRecalc               mclock.AbsTime
}

// init initializes stats with a long term sum/update count pair retrieved from the database
func (s *poolStats) init(sum, weight float64) {
    s.sum = sum
    s.weight = weight
    var avg float64
    if weight > 0 {
        avg = s.sum / weight
    }
    s.avg = avg
    s.recent = avg
    s.lastRecalc = mclock.Now()
}

// recalc recalculates recent value return-to-mean and long term average
func (s *poolStats) recalc() {
    now := mclock.Now()
    s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
    if s.sum == 0 {
        s.avg = 0
    } else {
        if s.sum > s.weight*1e30 {
            s.avg = 1e30
        } else {
            s.avg = s.sum / s.weight
        }
    }
    s.lastRecalc = now
}

// add updates the stats with a new value
func (s *poolStats) add(value, weight float64) {
    s.weight += weight
    s.sum += value * weight
    s.recalc()
}

// recentAvg returns the short-term adjusted average
func (s *poolStats) recentAvg() float64 {
    s.recalc()
    return s.recent
}

func (s *poolStats) EncodeRLP(w io.Writer) error {
    return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
}

func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
    var stats struct {
        SumUint, WeightUint uint64
    }
    if err := st.Decode(&stats); err != nil {
        return err
    }
    s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
    return nil
}

// poolEntryQueue keeps track of its least recently accessed entries and removes
// them when the number of entries reaches the limit
type poolEntryQueue struct {
    queue                  map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
    newPtr, oldPtr, maxCnt int
    removeFromPool         func(*poolEntry)
}

// newPoolEntryQueue returns a new poolEntryQueue
func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
    return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
}

// fetchOldest returns and removes the least recently accessed entry
func (q *poolEntryQueue) fetchOldest() *poolEntry {
    if len(q.queue) == 0 {
        return nil
    }
    for {
        if e := q.queue[q.oldPtr]; e != nil {
            delete(q.queue, q.oldPtr)
            q.oldPtr++
            return e
        }
        q.oldPtr++
    }
}

// remove removes an entry from the queue
func (q *poolEntryQueue) remove(entry *poolEntry) {
    if q.queue[entry.queueIdx] == entry {
        delete(q.queue, entry.queueIdx)
    }
}

// setLatest adds or updates a recently accessed entry. It also checks if an old entry
// needs to be removed and removes it from the parent pool too with a callback function.
func (q *poolEntryQueue) setLatest(entry *poolEntry) {
    if q.queue[entry.queueIdx] == entry {
        delete(q.queue, entry.queueIdx)
    } else {
        if len(q.queue) == q.maxCnt {
            e := q.fetchOldest()
            q.remove(e)
            q.removeFromPool(e)
        }
    }
    entry.queueIdx = q.newPtr
    q.queue[entry.queueIdx] = entry
    q.newPtr++
}