diff options
Diffstat (limited to 'les/serverpool.go')
-rw-r--r-- | les/serverpool.go | 204 |
1 files changed, 158 insertions, 46 deletions
diff --git a/les/serverpool.go b/les/serverpool.go index 95968eda2..f5e880460 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -59,6 +59,9 @@ const ( targetKnownSelect = 3 // after dialTimeout, consider the server unavailable and adjust statistics dialTimeout = time.Second * 30 + // targetConnTime is the minimum expected connection duration before a server + // drops a client without any specific reason + targetConnTime = time.Minute * 10 // new entry selection weight calculation based on most recent discovery time: // unity until discoverExpireStart, then exponential decay with discoverExpireConst discoverExpireStart = time.Minute * 20 @@ -75,6 +78,17 @@ const ( // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after // each unsuccessful connection (restored after a successful one) addrFailDropLn = math.Ln2 + // responseScoreTC and delayScoreTC are exponential decay time constants for + // calculating selection chances from response times and block delay times + responseScoreTC = time.Millisecond * 100 + delayScoreTC = time.Second * 5 + timeoutPow = 10 + // peerSelectMinWeight is added to calculated weights at request peer selection + // to give poorly performing peers a little chance of coming back + peerSelectMinWeight = 0.005 + // initStatsWeight is used to initialize previously unknown peers with good + // statistics to give a chance to prove themselves + initStatsWeight = 1 ) // serverPool implements a pool for storing and selecting newly discovered and already @@ -95,6 +109,7 @@ type serverPool struct { entries map[discover.NodeID]*poolEntry lock sync.Mutex timeout, enableRetry chan *poolEntry + adjustStats chan poolStatAdjust knownQueue, newQueue poolEntryQueue knownSelect, newSelect *weightedRandomSelect @@ -112,6 +127,7 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic wg: wg, entries: make(map[discover.NodeID]*poolEntry), timeout: make(chan *poolEntry, 1), + adjustStats: make(chan poolStatAdjust, 100), enableRetry: make(chan *poolEntry, 1), knownSelect: newWeightedRandomSelect(), newSelect: newWeightedRandomSelect(), @@ -139,18 +155,19 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic // Otherwise, the connection should be rejected. // Note that whenever a connection has been accepted and a pool entry has been returned, // disconnect should also always be called. -func (pool *serverPool) connect(id discover.NodeID, ip net.IP, port uint16) *poolEntry { +func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { pool.lock.Lock() defer pool.lock.Unlock() - entry := pool.entries[id] + entry := pool.entries[p.ID()] if entry == nil { return nil } - glog.V(logger.Debug).Infof("connecting to %v, state: %v", id.String(), entry.state) + glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state) if entry.state != psDialed { return nil } pool.connWg.Add(1) + entry.peer = p entry.state = psConnected addr := &poolEntryAddress{ ip: ip, @@ -172,42 +189,111 @@ func (pool *serverPool) registered(entry *poolEntry) { defer pool.lock.Unlock() entry.state = psRegistered + entry.regTime = mclock.Now() if !entry.known { pool.newQueue.remove(entry) entry.known = true } pool.knownQueue.setLatest(entry) entry.shortRetry = shortRetryCnt - entry.connectStats.add(1) } // disconnect should be called when ending a connection. Service quality statistics // can be updated optionally (not updated if no registration happened, in this case // only connection statistics are updated, just like in case of timeout) -func (pool *serverPool) disconnect(entry *poolEntry, quality float64, setQuality bool) { +func (pool *serverPool) disconnect(entry *poolEntry) { glog.V(logger.Debug).Infof("disconnected %v", entry.id.String()) pool.lock.Lock() defer pool.lock.Unlock() - if entry.state != psRegistered { - setQuality = false + if entry.state == psRegistered { + connTime := mclock.Now() - entry.regTime + connAdjust := float64(connTime) / float64(targetConnTime) + if connAdjust > 1 { + connAdjust = 1 + } + stopped := false + select { + case <-pool.quit: + stopped = true + default: + } + if stopped { + entry.connectStats.add(1, connAdjust) + } else { + entry.connectStats.add(connAdjust, 1) + } } + entry.state = psNotConnected if entry.knownSelected { pool.knownSelected-- } else { pool.newSelected-- } - if setQuality { - glog.V(logger.Debug).Infof("update quality %v %v", quality, entry.id.String()) - entry.qualityStats.add(quality) - } else { - glog.V(logger.Debug).Infof("do not update quality") - } pool.setRetryDial(entry) pool.connWg.Done() } +const ( + pseBlockDelay = iota + pseResponseTime + pseResponseTimeout +) + +// poolStatAdjust records are sent to adjust peer block delay/response time statistics +type poolStatAdjust struct { + adjustType int + entry *poolEntry + time time.Duration +} + +// adjustBlockDelay adjusts the block announce delay statistics of a node +func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) { + pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time} +} + +// adjustResponseTime adjusts the request response time statistics of a node +func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) { + if timeout { + pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time} + } else { + pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time} + } +} + +type selectPeerItem struct { + peer *peer + weight int64 +} + +func (sp selectPeerItem) Weight() int64 { + return sp.weight +} + +// selectPeer selects a suitable peer for a request +func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer { + pool.lock.Lock() + defer pool.lock.Unlock() + + sel := newWeightedRandomSelect() + for _, entry := range pool.entries { + if entry.state == psRegistered { + p := entry.peer + ok, cost := canSend(p) + if ok { + w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow))) + sel.update(selectPeerItem{peer: p, weight: w}) + } + } + } + choice := sel.choose() + if choice == nil { + return nil + } + return choice.(selectPeerItem).peer +} + // eventLoop handles pool events and mutex locking for all internal functions func (pool *serverPool) eventLoop() { lookupCnt := 0 @@ -230,6 +316,19 @@ func (pool *serverPool) eventLoop() { } pool.lock.Unlock() + case adj := <-pool.adjustStats: + pool.lock.Lock() + switch adj.adjustType { + case pseBlockDelay: + adj.entry.delayStats.add(float64(adj.time), 1) + case pseResponseTime: + adj.entry.responseStats.add(float64(adj.time), 1) + adj.entry.timeoutStats.add(0, 1) + case pseResponseTimeout: + adj.entry.timeoutStats.add(1, 1) + } + pool.lock.Unlock() + case node := <-pool.discNodes: pool.lock.Lock() now := mclock.Now() @@ -244,6 +343,11 @@ func (pool *serverPool) eventLoop() { shortRetry: shortRetryCnt, } pool.entries[id] = entry + // initialize previously unknown peers with good statistics to give a chance to prove themselves + entry.connectStats.add(1, initStatsWeight) + entry.delayStats.add(0, initStatsWeight) + entry.responseStats.add(0, initStatsWeight) + entry.timeoutStats.add(0, initStatsWeight) } entry.lastDiscovered = now addr := &poolEntryAddress{ @@ -298,9 +402,8 @@ func (pool *serverPool) loadNodes() { glog.V(logger.Debug).Infof("node list decode error: %v", err) return } - glog.V(logger.Debug).Infof("loaded node list") for _, e := range list { - glog.V(logger.Debug).Infof(" adding node %v fails: %v connStats sum: %v cnt: %v qualityStats sum: %v cnt: %v", e.id.String()+"@"+e.lastConnected.strKey(), e.lastConnected.fails, e.connectStats.sum, e.connectStats.cnt, e.qualityStats.sum, e.qualityStats.cnt) + glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight) pool.entries[e.id] = e pool.knownQueue.setLatest(e) pool.knownSelect.update((*knownEntry)(e)) @@ -433,7 +536,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) { } else { pool.newSelected-- } - entry.connectStats.add(0) + entry.connectStats.add(0, 1) entry.dialed.fails++ pool.setRetryDial(entry) } @@ -447,33 +550,36 @@ const ( // poolEntry represents a server node and stores its current state and statistics. type poolEntry struct { + peer *peer id discover.NodeID addr map[string]*poolEntryAddress lastConnected, dialed *poolEntryAddress addrSelect weightedRandomSelect - lastDiscovered mclock.AbsTime - known, knownSelected bool - connectStats, qualityStats poolStats - state int - queueIdx int - removed bool + lastDiscovered mclock.AbsTime + known, knownSelected bool + connectStats, delayStats poolStats + responseStats, timeoutStats poolStats + state int + regTime mclock.AbsTime + queueIdx int + removed bool delayedRetry bool shortRetry int } func (e *poolEntry) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.qualityStats}) + return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats}) } func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { var entry struct { - ID discover.NodeID - IP net.IP - Port uint16 - Fails uint - CStat, QStat poolStats + ID discover.NodeID + IP net.IP + Port uint16 + Fails uint + CStat, DStat, RStat, TStat poolStats } if err := s.Decode(&entry); err != nil { return err @@ -486,7 +592,9 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { e.addrSelect.update(addr) e.lastConnected = addr e.connectStats = entry.CStat - e.qualityStats = entry.QStat + e.delayStats = entry.DStat + e.responseStats = entry.RStat + e.timeoutStats = entry.TStat e.shortRetry = shortRetryCnt e.known = true return nil @@ -516,7 +624,7 @@ func (e *knownEntry) Weight() int64 { if e.state != psNotConnected || !e.known || e.delayedRetry { return 0 } - return int64(1000000000 * e.connectStats.recentAvg() * (e.qualityStats.recentAvg() + 0.001) * math.Exp(-float64(e.lastConnected.fails)*failDropLn)) + return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow)) } // poolEntryAddress is a separate object because currently it is necessary to remember @@ -544,18 +652,17 @@ func (a *poolEntryAddress) strKey() string { // pstatRecentAdjust with each update and also returned exponentially to the // average with the time constant pstatReturnToMeanTC type poolStats struct { - sum, avg, recent float64 - cnt uint - lastRecalc mclock.AbsTime + sum, weight, avg, recent float64 + lastRecalc mclock.AbsTime } // init initializes stats with a long term sum/update count pair retrieved from the database -func (s *poolStats) init(sum float64, cnt uint) { +func (s *poolStats) init(sum, weight float64) { s.sum = sum - s.cnt = cnt + s.weight = weight var avg float64 - if cnt > 0 { - avg = s.sum / float64(cnt) + if weight > 0 { + avg = s.sum / weight } s.avg = avg s.recent = avg @@ -566,16 +673,22 @@ func (s *poolStats) init(sum float64, cnt uint) { func (s *poolStats) recalc() { now := mclock.Now() s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC)) - if s.cnt > 0 { - s.avg = s.sum / float64(s.cnt) + if s.sum == 0 { + s.avg = 0 + } else { + if s.sum > s.weight*1e30 { + s.avg = 1e30 + } else { + s.avg = s.sum / s.weight + } } s.lastRecalc = now } // add updates the stats with a new value -func (s *poolStats) add(val float64) { - s.cnt++ - s.sum += val +func (s *poolStats) add(value, weight float64) { + s.weight += weight + s.sum += value * weight s.recalc() } @@ -586,18 +699,17 @@ func (s *poolStats) recentAvg() float64 { } func (s *poolStats) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), s.cnt}) + return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)}) } func (s *poolStats) DecodeRLP(st *rlp.Stream) error { var stats struct { - SumUint uint64 - Cnt uint + SumUint, WeightUint uint64 } if err := st.Decode(&stats); err != nil { return err } - s.init(math.Float64frombits(stats.SumUint), stats.Cnt) + s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint)) return nil } |