aboutsummaryrefslogtreecommitdiffstats
path: root/les/fetcher.go
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2016-12-15 18:13:52 +0800
committerZsolt Felfoldi <zsfelfoldi@gmail.com>2017-01-06 11:34:31 +0800
commit93f9c023ccda2256079484d6c2a3159818ba6691 (patch)
treef2ed341904459184587ca1cc8667d8169761f752 /les/fetcher.go
parente0ee0cc66a4416edd47232649f4d4bca4a5e3c07 (diff)
downloaddexon-93f9c023ccda2256079484d6c2a3159818ba6691.tar
dexon-93f9c023ccda2256079484d6c2a3159818ba6691.tar.gz
dexon-93f9c023ccda2256079484d6c2a3159818ba6691.tar.bz2
dexon-93f9c023ccda2256079484d6c2a3159818ba6691.tar.lz
dexon-93f9c023ccda2256079484d6c2a3159818ba6691.tar.xz
dexon-93f9c023ccda2256079484d6c2a3159818ba6691.tar.zst
dexon-93f9c023ccda2256079484d6c2a3159818ba6691.zip
les: fixed selectPeer deadlock, improved request distribution
les/flowcontrol: using proper types for relative and absolute times
Diffstat (limited to 'les/fetcher.go')
-rw-r--r--les/fetcher.go46
1 files changed, 32 insertions, 14 deletions
diff --git a/les/fetcher.go b/les/fetcher.go
index c23af8da3..d0958870f 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -125,7 +125,7 @@ func (f *lightFetcher) syncLoop() {
f.pm.wg.Add(1)
defer f.pm.wg.Done()
- requestStarted := false
+ requesting := false
for {
select {
case <-f.pm.quitSync:
@@ -134,13 +134,13 @@ func (f *lightFetcher) syncLoop() {
// no further requests are necessary or possible
case newAnnounce := <-f.requestChn:
f.lock.Lock()
- s := requestStarted
- requestStarted = false
+ s := requesting
+ requesting = false
if !f.syncing && !(newAnnounce && s) {
- if peer, node, amount := f.nextRequest(); node != nil {
- requestStarted = true
- reqID, started := f.request(peer, node, amount)
- if started {
+ reqID := getNextReqID()
+ if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
+ requesting = true
+ if reqID, ok := f.request(peer, reqID, node, amount); ok {
go func() {
time.Sleep(softRequestTimeout)
f.reqMu.Lock()
@@ -154,6 +154,14 @@ func (f *lightFetcher) syncLoop() {
f.requestChn <- false
}()
}
+ } else {
+ if retry {
+ requesting = true
+ go func() {
+ time.Sleep(time.Millisecond * 100)
+ f.requestChn <- false
+ }()
+ }
}
}
f.lock.Unlock()
@@ -344,10 +352,11 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
}
// request initiates a header download request from a certain peer
-func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) {
+func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
fp := f.peers[p]
if fp == nil {
glog.V(logger.Debug).Infof("request: unknown peer")
+ p.fcServer.DeassignRequest(reqID)
return 0, false
}
if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
@@ -357,10 +366,10 @@ func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint
f.pm.synchronise(p)
f.syncDone <- p
}()
+ p.fcServer.DeassignRequest(reqID)
return 0, false
}
- reqID := getNextReqID()
n.requested = true
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
p.fcServer.SendRequest(reqID, cost)
@@ -400,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
// nextRequest selects the peer and announced head to be requested next, amount
// to be downloaded starting from the head backwards is also returned
-func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
+func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
var (
bestHash common.Hash
bestAmount uint64
@@ -420,21 +429,24 @@ func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
}
}
if bestTd == f.maxConfirmedTd {
- return nil, nil, 0
+ return nil, nil, 0, false
}
- peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
fp := f.peers[p]
if fp == nil || fp.nodeByHash[bestHash] == nil {
return false, 0
}
return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
})
+ if !locked {
+ return nil, nil, 0, true
+ }
var node *fetcherTreeNode
if peer != nil {
node = f.peers[peer].nodeByHash[bestHash]
}
- return peer, node, bestAmount
+ return peer, node, bestAmount, false
}
// deliverHeaders delivers header download request responses for processing
@@ -442,9 +454,10 @@ func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
}
-// processResponse processes header download request responses
+// processResponse processes header download request responses, returns true if successful
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
+ glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8])
return false
}
headers := make([]*types.Header, req.amount)
@@ -452,12 +465,17 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo
headers[int(req.amount)-1-i] = header
}
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
+ if err == core.BlockFutureErr {
+ return true
+ }
+ glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err)
return false
}
tds := make([]*big.Int, len(headers))
for i, header := range headers {
td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
if td == nil {
+ glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers))
return false
}
tds[i] = td