aboutsummaryrefslogtreecommitdiffstats
path: root/les/serverpool.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-03-23 03:44:22 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-03-23 03:44:22 +0800
commit525116dbff916825463931361f75e75e955c12e2 (patch)
treeb272801c420ad9a591f227919567c7952b0bd512 /les/serverpool.go
parent1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff)
downloadgo-tangerine-525116dbff916825463931361f75e75e955c12e2.tar
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst
go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock
Diffstat (limited to 'les/serverpool.go')
-rw-r--r--les/serverpool.go76
1 files changed, 0 insertions, 76 deletions
diff --git a/les/serverpool.go b/les/serverpool.go
index 55d481dbf..64fe991c6 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -268,82 +268,6 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration,
}
}
-type selectPeerItem struct {
- peer *peer
- weight int64
- wait time.Duration
-}
-
-func (sp selectPeerItem) Weight() int64 {
- return sp.weight
-}
-
-// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
-// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
-// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
-func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
- pool.lock.Lock()
- type selectPeer struct {
- peer *peer
- rstat, tstat float64
- }
- var list []selectPeer
- sel := newWeightedRandomSelect()
- for _, entry := range pool.entries {
- if entry.state == psRegistered {
- if !entry.peer.fcServer.IsAssigned() {
- list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
- }
- }
- }
- pool.lock.Unlock()
-
- for _, sp := range list {
- ok, wait := canSend(sp.peer)
- if ok {
- w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
- sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
- }
- }
- choice := sel.choose()
- if choice == nil {
- return nil, 0, false
- }
- peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
- locked := false
- if wait < time.Millisecond*100 {
- if peer.fcServer.AssignRequest(reqID) {
- ok, w := canSend(peer)
- wait = time.Duration(w)
- if ok && wait < time.Millisecond*100 {
- locked = true
- } else {
- peer.fcServer.DeassignRequest(reqID)
- wait = time.Millisecond * 100
- }
- }
- } else {
- wait = time.Millisecond * 100
- }
- return peer, wait, locked
-}
-
-// selectPeer selects a suitable peer for a request, waiting until an assignment to
-// the request is guaranteed or the process is aborted.
-func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
- for {
- peer, wait, locked := pool.selectPeer(reqID, canSend)
- if locked {
- return peer
- }
- select {
- case <-abort:
- return nil
- case <-time.After(wait):
- }
- }
-}
-
// eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() {
lookupCnt := 0