diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-01-09 22:58:23 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-09 22:58:23 +0800 |
commit | 681b51aac46fa11c235e9ac1af1228e0105a0720 (patch) | |
tree | ecbd5467a5f7356b2386189a67b8629663068cec /les/serverpool.go | |
parent | 4268cb8efecceaf506e1b0b71e06714a838a49a8 (diff) | |
parent | 66979aa468b6329aabf49542bd3db14e59010c20 (diff) | |
download | go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.gz go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.bz2 go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.lz go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.xz go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.zst go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.zip |
Merge pull request #3519 from zsfelfoldi/light-topic5
les: fixed selectPeer deadlock, improved request distribution
Diffstat (limited to 'les/serverpool.go')
-rw-r--r-- | les/serverpool.go | 66 |
1 files changed, 55 insertions, 11 deletions
diff --git a/les/serverpool.go b/les/serverpool.go index 80c446eef..e3b7cf620 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -265,33 +265,77 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, type selectPeerItem struct { peer *peer weight int64 + wait time.Duration } func (sp selectPeerItem) Weight() int64 { return sp.weight } -// selectPeer selects a suitable peer for a request -func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer { +// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request +// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed +// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time. +func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) { pool.lock.Lock() - defer pool.lock.Unlock() - + type selectPeer struct { + peer *peer + rstat, tstat float64 + } + var list []selectPeer sel := newWeightedRandomSelect() for _, entry := range pool.entries { if entry.state == psRegistered { - p := entry.peer - ok, cost := canSend(p) - if ok { - w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow))) - sel.update(selectPeerItem{peer: p, weight: w}) + if !entry.peer.fcServer.IsAssigned() { + list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()}) } } } + pool.lock.Unlock() + + for _, sp := range list { + ok, wait := canSend(sp.peer) + if ok { + w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow))) + sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait}) + } + } choice := sel.choose() if choice == nil { - return nil + return nil, 0, false + } + peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait + locked := false + if wait < time.Millisecond*100 { + if peer.fcServer.AssignRequest(reqID) { + ok, w := canSend(peer) + wait = time.Duration(w) + if ok && wait < time.Millisecond*100 { + locked = true + } else { + peer.fcServer.DeassignRequest(reqID) + wait = time.Millisecond * 100 + } + } + } else { + wait = time.Millisecond * 100 + } + return peer, wait, locked +} + +// selectPeer selects a suitable peer for a request, waiting until an assignment to +// the request is guaranteed or the process is aborted. +func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer { + for { + peer, wait, locked := pool.selectPeer(reqID, canSend) + if locked { + return peer + } + select { + case <-abort: + return nil + case <-time.After(wait): + } } - return choice.(selectPeerItem).peer } // eventLoop handles pool events and mutex locking for all internal functions |