aboutsummaryrefslogtreecommitdiffstats
path: root/les/fetcher.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-03-23 03:44:22 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-03-23 03:44:22 +0800
commit525116dbff916825463931361f75e75e955c12e2 (patch)
treeb272801c420ad9a591f227919567c7952b0bd512 /les/fetcher.go
parent1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff)
downloadgo-tangerine-525116dbff916825463931361f75e75e955c12e2.tar
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst
go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock
Diffstat (limited to 'les/fetcher.go')
-rw-r--r--les/fetcher.go188
1 files changed, 111 insertions, 77 deletions
diff --git a/les/fetcher.go b/les/fetcher.go
index f9e517d25..353e91932 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -135,35 +135,38 @@ func (f *lightFetcher) syncLoop() {
f.lock.Lock()
s := requesting
requesting = false
+ var (
+ rq *distReq
+ reqID uint64
+ )
if !f.syncing && !(newAnnounce && s) {
- reqID := getNextReqID()
- if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
- requesting = true
- if reqID, ok := f.request(peer, reqID, node, amount); ok {
- go func() {
- time.Sleep(softRequestTimeout)
- f.reqMu.Lock()
- req, ok := f.requested[reqID]
- if ok {
- req.timeout = true
- f.requested[reqID] = req
- }
- f.reqMu.Unlock()
- // keep starting new requests while possible
- f.requestChn <- false
- }()
- }
- } else {
- if retry {
- requesting = true
- go func() {
- time.Sleep(time.Millisecond * 100)
- f.requestChn <- false
- }()
- }
- }
+ rq, reqID = f.nextRequest()
}
+ syncing := f.syncing
f.lock.Unlock()
+
+ if rq != nil {
+ requesting = true
+ _, ok := <-f.pm.reqDist.queue(rq)
+ if !ok {
+ f.requestChn <- false
+ }
+
+ if !syncing {
+ go func() {
+ time.Sleep(softRequestTimeout)
+ f.reqMu.Lock()
+ req, ok := f.requested[reqID]
+ if ok {
+ req.timeout = true
+ f.requested[reqID] = req
+ }
+ f.reqMu.Unlock()
+ // keep starting new requests while possible
+ f.requestChn <- false
+ }()
+ }
+ }
case reqID := <-f.timeoutChn:
f.reqMu.Lock()
req, ok := f.requested[reqID]
@@ -334,6 +337,12 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
f.lock.Lock()
defer f.lock.Unlock()
+ if f.syncing {
+ // always return true when syncing
+ // false positives are acceptable, a more sophisticated condition can be implemented later
+ return true
+ }
+
fp := f.peers[p]
if fp == nil || fp.root == nil {
return false
@@ -346,43 +355,13 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
f.chain.LockChain()
defer f.chain.UnlockChain()
// if it's older than the peer's block tree root but it's in the same canonical chain
- // than the root, we can still be sure the peer knows it
+ // as the root, we can still be sure the peer knows it
+ //
+ // when syncing, just check if it is part of the known chain, there is nothing better we
+ // can do since we do not know the most recent block hash yet
return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
}
-// request initiates a header download request from a certain peer
-func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
- fp := f.peers[p]
- if fp == nil {
- p.Log().Debug("Requesting from unknown peer")
- p.fcServer.DeassignRequest(reqID)
- return 0, false
- }
- if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
- f.syncing = true
- go func() {
- p.Log().Debug("Synchronisation started")
- f.pm.synchronise(p)
- f.syncDone <- p
- }()
- p.fcServer.DeassignRequest(reqID)
- return 0, false
- }
-
- n.requested = true
- cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
- p.fcServer.SendRequest(reqID, cost)
- f.reqMu.Lock()
- f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
- f.reqMu.Unlock()
- go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
- go func() {
- time.Sleep(hardRequestTimeout)
- f.timeoutChn <- reqID
- }()
- return reqID, true
-}
-
// requestAmount calculates the amount of headers to be downloaded starting
// from a certain head backwards
func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
@@ -408,12 +387,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
// nextRequest selects the peer and announced head to be requested next, amount
// to be downloaded starting from the head backwards is also returned
-func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
+func (f *lightFetcher) nextRequest() (*distReq, uint64) {
var (
bestHash common.Hash
bestAmount uint64
)
bestTd := f.maxConfirmedTd
+ bestSyncing := false
for p, fp := range f.peers {
for hash, n := range fp.nodeByHash {
@@ -423,29 +403,83 @@ func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint6
bestHash = hash
bestAmount = amount
bestTd = n.td
+ bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
}
}
}
}
if bestTd == f.maxConfirmedTd {
- return nil, nil, 0, false
- }
-
- peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
- fp := f.peers[p]
- if fp == nil || fp.nodeByHash[bestHash] == nil {
- return false, 0
+ return nil, 0
+ }
+
+ f.syncing = bestSyncing
+
+ var rq *distReq
+ reqID := getNextReqID()
+ if f.syncing {
+ rq = &distReq{
+ getCost: func(dp distPeer) uint64 {
+ return 0
+ },
+ canSend: func(dp distPeer) bool {
+ p := dp.(*peer)
+ fp := f.peers[p]
+ return fp != nil && fp.nodeByHash[bestHash] != nil
+ },
+ request: func(dp distPeer) func() {
+ go func() {
+ p := dp.(*peer)
+ p.Log().Debug("Synchronisation started")
+ f.pm.synchronise(p)
+ f.syncDone <- p
+ }()
+ return nil
+ },
+ }
+ } else {
+ rq = &distReq{
+ getCost: func(dp distPeer) uint64 {
+ p := dp.(*peer)
+ return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
+ },
+ canSend: func(dp distPeer) bool {
+ p := dp.(*peer)
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ fp := f.peers[p]
+ if fp == nil {
+ return false
+ }
+ n := fp.nodeByHash[bestHash]
+ return n != nil && !n.requested
+ },
+ request: func(dp distPeer) func() {
+ p := dp.(*peer)
+ f.lock.Lock()
+ fp := f.peers[p]
+ if fp != nil {
+ n := fp.nodeByHash[bestHash]
+ if n != nil {
+ n.requested = true
+ }
+ }
+ f.lock.Unlock()
+
+ cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
+ p.fcServer.QueueRequest(reqID, cost)
+ f.reqMu.Lock()
+ f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
+ f.reqMu.Unlock()
+ go func() {
+ time.Sleep(hardRequestTimeout)
+ f.timeoutChn <- reqID
+ }()
+ return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
+ },
}
- return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
- })
- if !locked {
- return nil, nil, 0, true
- }
- var node *fetcherTreeNode
- if peer != nil {
- node = f.peers[peer].nodeByHash[bestHash]
}
- return peer, node, bestAmount, false
+ return rq, reqID
}
// deliverHeaders delivers header download request responses for processing