aboutsummaryrefslogtreecommitdiffstats
path: root/les/serverpool.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-01-09 22:58:23 +0800
committerGitHub <noreply@github.com>2017-01-09 22:58:23 +0800
commit681b51aac46fa11c235e9ac1af1228e0105a0720 (patch)
treeecbd5467a5f7356b2386189a67b8629663068cec /les/serverpool.go
parent4268cb8efecceaf506e1b0b71e06714a838a49a8 (diff)
parent66979aa468b6329aabf49542bd3db14e59010c20 (diff)
downloaddexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.gz
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.bz2
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.lz
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.xz
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.zst
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.zip
Merge pull request #3519 from zsfelfoldi/light-topic5
les: fixed selectPeer deadlock, improved request distribution
Diffstat (limited to 'les/serverpool.go')
-rw-r--r--les/serverpool.go66
1 files changed, 55 insertions, 11 deletions
diff --git a/les/serverpool.go b/les/serverpool.go
index 80c446eef..e3b7cf620 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -265,33 +265,77 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration,
type selectPeerItem struct {
peer *peer
weight int64
+ wait time.Duration
}
func (sp selectPeerItem) Weight() int64 {
return sp.weight
}
-// selectPeer selects a suitable peer for a request
-func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
+// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
+// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
+// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
+func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
pool.lock.Lock()
- defer pool.lock.Unlock()
-
+ type selectPeer struct {
+ peer *peer
+ rstat, tstat float64
+ }
+ var list []selectPeer
sel := newWeightedRandomSelect()
for _, entry := range pool.entries {
if entry.state == psRegistered {
- p := entry.peer
- ok, cost := canSend(p)
- if ok {
- w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
- sel.update(selectPeerItem{peer: p, weight: w})
+ if !entry.peer.fcServer.IsAssigned() {
+ list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
}
}
}
+ pool.lock.Unlock()
+
+ for _, sp := range list {
+ ok, wait := canSend(sp.peer)
+ if ok {
+ w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
+ sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
+ }
+ }
choice := sel.choose()
if choice == nil {
- return nil
+ return nil, 0, false
+ }
+ peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
+ locked := false
+ if wait < time.Millisecond*100 {
+ if peer.fcServer.AssignRequest(reqID) {
+ ok, w := canSend(peer)
+ wait = time.Duration(w)
+ if ok && wait < time.Millisecond*100 {
+ locked = true
+ } else {
+ peer.fcServer.DeassignRequest(reqID)
+ wait = time.Millisecond * 100
+ }
+ }
+ } else {
+ wait = time.Millisecond * 100
+ }
+ return peer, wait, locked
+}
+
+// selectPeer selects a suitable peer for a request, waiting until an assignment to
+// the request is guaranteed or the process is aborted.
+func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
+ for {
+ peer, wait, locked := pool.selectPeer(reqID, canSend)
+ if locked {
+ return peer
+ }
+ select {
+ case <-abort:
+ return nil
+ case <-time.After(wait):
+ }
}
- return choice.(selectPeerItem).peer
}
// eventLoop handles pool events and mutex locking for all internal functions