From 525116dbff916825463931361f75e75e955c12e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Wed, 22 Mar 2017 20:44:22 +0100 Subject: les: implement request distributor, fix blocking issues (#3660) * les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock --- les/serverpool.go | 76 ------------------------------------------------------- 1 file changed, 76 deletions(-) (limited to 'les/serverpool.go') diff --git a/les/serverpool.go b/les/serverpool.go index 55d481dbf..64fe991c6 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -268,82 +268,6 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, } } -type selectPeerItem struct { - peer *peer - weight int64 - wait time.Duration -} - -func (sp selectPeerItem) Weight() int64 { - return sp.weight -} - -// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request -// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed -// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time. -func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) { - pool.lock.Lock() - type selectPeer struct { - peer *peer - rstat, tstat float64 - } - var list []selectPeer - sel := newWeightedRandomSelect() - for _, entry := range pool.entries { - if entry.state == psRegistered { - if !entry.peer.fcServer.IsAssigned() { - list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()}) - } - } - } - pool.lock.Unlock() - - for _, sp := range list { - ok, wait := canSend(sp.peer) - if ok { - w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow))) - sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait}) - } - } - choice := sel.choose() - if choice == nil { - return nil, 0, false - } - peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait - locked := false - if wait < time.Millisecond*100 { - if peer.fcServer.AssignRequest(reqID) { - ok, w := canSend(peer) - wait = time.Duration(w) - if ok && wait < time.Millisecond*100 { - locked = true - } else { - peer.fcServer.DeassignRequest(reqID) - wait = time.Millisecond * 100 - } - } - } else { - wait = time.Millisecond * 100 - } - return peer, wait, locked -} - -// selectPeer selects a suitable peer for a request, waiting until an assignment to -// the request is guaranteed or the process is aborted. -func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer { - for { - peer, wait, locked := pool.selectPeer(reqID, canSend) - if locked { - return peer - } - select { - case <-abort: - return nil - case <-time.After(wait): - } - } -} - // eventLoop handles pool events and mutex locking for all internal functions func (pool *serverPool) eventLoop() { lookupCnt := 0 -- cgit v1.2.3