aboutsummaryrefslogtreecommitdiffstats
path: root/les/serverpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/serverpool.go')
-rw-r--r--les/serverpool.go204
1 files changed, 158 insertions, 46 deletions
diff --git a/les/serverpool.go b/les/serverpool.go
index 95968eda2..f5e880460 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -59,6 +59,9 @@ const (
targetKnownSelect = 3
// after dialTimeout, consider the server unavailable and adjust statistics
dialTimeout = time.Second * 30
+ // targetConnTime is the minimum expected connection duration before a server
+ // drops a client without any specific reason
+ targetConnTime = time.Minute * 10
// new entry selection weight calculation based on most recent discovery time:
// unity until discoverExpireStart, then exponential decay with discoverExpireConst
discoverExpireStart = time.Minute * 20
@@ -75,6 +78,17 @@ const (
// node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
// each unsuccessful connection (restored after a successful one)
addrFailDropLn = math.Ln2
+ // responseScoreTC and delayScoreTC are exponential decay time constants for
+ // calculating selection chances from response times and block delay times
+ responseScoreTC = time.Millisecond * 100
+ delayScoreTC = time.Second * 5
+ timeoutPow = 10
+ // peerSelectMinWeight is added to calculated weights at request peer selection
+ // to give poorly performing peers a little chance of coming back
+ peerSelectMinWeight = 0.005
+ // initStatsWeight is used to initialize previously unknown peers with good
+ // statistics to give a chance to prove themselves
+ initStatsWeight = 1
)
// serverPool implements a pool for storing and selecting newly discovered and already
@@ -95,6 +109,7 @@ type serverPool struct {
entries map[discover.NodeID]*poolEntry
lock sync.Mutex
timeout, enableRetry chan *poolEntry
+ adjustStats chan poolStatAdjust
knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect
@@ -112,6 +127,7 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
wg: wg,
entries: make(map[discover.NodeID]*poolEntry),
timeout: make(chan *poolEntry, 1),
+ adjustStats: make(chan poolStatAdjust, 100),
enableRetry: make(chan *poolEntry, 1),
knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(),
@@ -139,18 +155,19 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
// Otherwise, the connection should be rejected.
// Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called.
-func (pool *serverPool) connect(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
+func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
pool.lock.Lock()
defer pool.lock.Unlock()
- entry := pool.entries[id]
+ entry := pool.entries[p.ID()]
if entry == nil {
return nil
}
- glog.V(logger.Debug).Infof("connecting to %v, state: %v", id.String(), entry.state)
+ glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
if entry.state != psDialed {
return nil
}
pool.connWg.Add(1)
+ entry.peer = p
entry.state = psConnected
addr := &poolEntryAddress{
ip: ip,
@@ -172,42 +189,111 @@ func (pool *serverPool) registered(entry *poolEntry) {
defer pool.lock.Unlock()
entry.state = psRegistered
+ entry.regTime = mclock.Now()
if !entry.known {
pool.newQueue.remove(entry)
entry.known = true
}
pool.knownQueue.setLatest(entry)
entry.shortRetry = shortRetryCnt
- entry.connectStats.add(1)
}
// disconnect should be called when ending a connection. Service quality statistics
// can be updated optionally (not updated if no registration happened, in this case
// only connection statistics are updated, just like in case of timeout)
-func (pool *serverPool) disconnect(entry *poolEntry, quality float64, setQuality bool) {
+func (pool *serverPool) disconnect(entry *poolEntry) {
glog.V(logger.Debug).Infof("disconnected %v", entry.id.String())
pool.lock.Lock()
defer pool.lock.Unlock()
- if entry.state != psRegistered {
- setQuality = false
+ if entry.state == psRegistered {
+ connTime := mclock.Now() - entry.regTime
+ connAdjust := float64(connTime) / float64(targetConnTime)
+ if connAdjust > 1 {
+ connAdjust = 1
+ }
+ stopped := false
+ select {
+ case <-pool.quit:
+ stopped = true
+ default:
+ }
+ if stopped {
+ entry.connectStats.add(1, connAdjust)
+ } else {
+ entry.connectStats.add(connAdjust, 1)
+ }
}
+
entry.state = psNotConnected
if entry.knownSelected {
pool.knownSelected--
} else {
pool.newSelected--
}
- if setQuality {
- glog.V(logger.Debug).Infof("update quality %v %v", quality, entry.id.String())
- entry.qualityStats.add(quality)
- } else {
- glog.V(logger.Debug).Infof("do not update quality")
- }
pool.setRetryDial(entry)
pool.connWg.Done()
}
+const (
+ pseBlockDelay = iota
+ pseResponseTime
+ pseResponseTimeout
+)
+
+// poolStatAdjust records are sent to adjust peer block delay/response time statistics
+type poolStatAdjust struct {
+ adjustType int
+ entry *poolEntry
+ time time.Duration
+}
+
+// adjustBlockDelay adjusts the block announce delay statistics of a node
+func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
+ pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
+}
+
+// adjustResponseTime adjusts the request response time statistics of a node
+func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
+ if timeout {
+ pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
+ } else {
+ pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
+ }
+}
+
+type selectPeerItem struct {
+ peer *peer
+ weight int64
+}
+
+func (sp selectPeerItem) Weight() int64 {
+ return sp.weight
+}
+
+// selectPeer selects a suitable peer for a request
+func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
+ pool.lock.Lock()
+ defer pool.lock.Unlock()
+
+ sel := newWeightedRandomSelect()
+ for _, entry := range pool.entries {
+ if entry.state == psRegistered {
+ p := entry.peer
+ ok, cost := canSend(p)
+ if ok {
+ w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
+ sel.update(selectPeerItem{peer: p, weight: w})
+ }
+ }
+ }
+ choice := sel.choose()
+ if choice == nil {
+ return nil
+ }
+ return choice.(selectPeerItem).peer
+}
+
// eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() {
lookupCnt := 0
@@ -230,6 +316,19 @@ func (pool *serverPool) eventLoop() {
}
pool.lock.Unlock()
+ case adj := <-pool.adjustStats:
+ pool.lock.Lock()
+ switch adj.adjustType {
+ case pseBlockDelay:
+ adj.entry.delayStats.add(float64(adj.time), 1)
+ case pseResponseTime:
+ adj.entry.responseStats.add(float64(adj.time), 1)
+ adj.entry.timeoutStats.add(0, 1)
+ case pseResponseTimeout:
+ adj.entry.timeoutStats.add(1, 1)
+ }
+ pool.lock.Unlock()
+
case node := <-pool.discNodes:
pool.lock.Lock()
now := mclock.Now()
@@ -244,6 +343,11 @@ func (pool *serverPool) eventLoop() {
shortRetry: shortRetryCnt,
}
pool.entries[id] = entry
+ // initialize previously unknown peers with good statistics to give a chance to prove themselves
+ entry.connectStats.add(1, initStatsWeight)
+ entry.delayStats.add(0, initStatsWeight)
+ entry.responseStats.add(0, initStatsWeight)
+ entry.timeoutStats.add(0, initStatsWeight)
}
entry.lastDiscovered = now
addr := &poolEntryAddress{
@@ -298,9 +402,8 @@ func (pool *serverPool) loadNodes() {
glog.V(logger.Debug).Infof("node list decode error: %v", err)
return
}
- glog.V(logger.Debug).Infof("loaded node list")
for _, e := range list {
- glog.V(logger.Debug).Infof(" adding node %v fails: %v connStats sum: %v cnt: %v qualityStats sum: %v cnt: %v", e.id.String()+"@"+e.lastConnected.strKey(), e.lastConnected.fails, e.connectStats.sum, e.connectStats.cnt, e.qualityStats.sum, e.qualityStats.cnt)
+ glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight)
pool.entries[e.id] = e
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
@@ -433,7 +536,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
} else {
pool.newSelected--
}
- entry.connectStats.add(0)
+ entry.connectStats.add(0, 1)
entry.dialed.fails++
pool.setRetryDial(entry)
}
@@ -447,33 +550,36 @@ const (
// poolEntry represents a server node and stores its current state and statistics.
type poolEntry struct {
+ peer *peer
id discover.NodeID
addr map[string]*poolEntryAddress
lastConnected, dialed *poolEntryAddress
addrSelect weightedRandomSelect
- lastDiscovered mclock.AbsTime
- known, knownSelected bool
- connectStats, qualityStats poolStats
- state int
- queueIdx int
- removed bool
+ lastDiscovered mclock.AbsTime
+ known, knownSelected bool
+ connectStats, delayStats poolStats
+ responseStats, timeoutStats poolStats
+ state int
+ regTime mclock.AbsTime
+ queueIdx int
+ removed bool
delayedRetry bool
shortRetry int
}
func (e *poolEntry) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.qualityStats})
+ return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
}
func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
var entry struct {
- ID discover.NodeID
- IP net.IP
- Port uint16
- Fails uint
- CStat, QStat poolStats
+ ID discover.NodeID
+ IP net.IP
+ Port uint16
+ Fails uint
+ CStat, DStat, RStat, TStat poolStats
}
if err := s.Decode(&entry); err != nil {
return err
@@ -486,7 +592,9 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
e.addrSelect.update(addr)
e.lastConnected = addr
e.connectStats = entry.CStat
- e.qualityStats = entry.QStat
+ e.delayStats = entry.DStat
+ e.responseStats = entry.RStat
+ e.timeoutStats = entry.TStat
e.shortRetry = shortRetryCnt
e.known = true
return nil
@@ -516,7 +624,7 @@ func (e *knownEntry) Weight() int64 {
if e.state != psNotConnected || !e.known || e.delayedRetry {
return 0
}
- return int64(1000000000 * e.connectStats.recentAvg() * (e.qualityStats.recentAvg() + 0.001) * math.Exp(-float64(e.lastConnected.fails)*failDropLn))
+ return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
}
// poolEntryAddress is a separate object because currently it is necessary to remember
@@ -544,18 +652,17 @@ func (a *poolEntryAddress) strKey() string {
// pstatRecentAdjust with each update and also returned exponentially to the
// average with the time constant pstatReturnToMeanTC
type poolStats struct {
- sum, avg, recent float64
- cnt uint
- lastRecalc mclock.AbsTime
+ sum, weight, avg, recent float64
+ lastRecalc mclock.AbsTime
}
// init initializes stats with a long term sum/update count pair retrieved from the database
-func (s *poolStats) init(sum float64, cnt uint) {
+func (s *poolStats) init(sum, weight float64) {
s.sum = sum
- s.cnt = cnt
+ s.weight = weight
var avg float64
- if cnt > 0 {
- avg = s.sum / float64(cnt)
+ if weight > 0 {
+ avg = s.sum / weight
}
s.avg = avg
s.recent = avg
@@ -566,16 +673,22 @@ func (s *poolStats) init(sum float64, cnt uint) {
func (s *poolStats) recalc() {
now := mclock.Now()
s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
- if s.cnt > 0 {
- s.avg = s.sum / float64(s.cnt)
+ if s.sum == 0 {
+ s.avg = 0
+ } else {
+ if s.sum > s.weight*1e30 {
+ s.avg = 1e30
+ } else {
+ s.avg = s.sum / s.weight
+ }
}
s.lastRecalc = now
}
// add updates the stats with a new value
-func (s *poolStats) add(val float64) {
- s.cnt++
- s.sum += val
+func (s *poolStats) add(value, weight float64) {
+ s.weight += weight
+ s.sum += value * weight
s.recalc()
}
@@ -586,18 +699,17 @@ func (s *poolStats) recentAvg() float64 {
}
func (s *poolStats) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), s.cnt})
+ return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
}
func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
var stats struct {
- SumUint uint64
- Cnt uint
+ SumUint, WeightUint uint64
}
if err := st.Decode(&stats); err != nil {
return err
}
- s.init(math.Float64frombits(stats.SumUint), stats.Cnt)
+ s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
return nil
}