diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-01-09 22:58:23 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-09 22:58:23 +0800 |
commit | 681b51aac46fa11c235e9ac1af1228e0105a0720 (patch) | |
tree | ecbd5467a5f7356b2386189a67b8629663068cec /les/fetcher.go | |
parent | 4268cb8efecceaf506e1b0b71e06714a838a49a8 (diff) | |
parent | 66979aa468b6329aabf49542bd3db14e59010c20 (diff) | |
download | go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.gz go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.bz2 go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.lz go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.xz go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.zst go-tangerine-681b51aac46fa11c235e9ac1af1228e0105a0720.zip |
Merge pull request #3519 from zsfelfoldi/light-topic5
les: fixed selectPeer deadlock, improved request distribution
Diffstat (limited to 'les/fetcher.go')
-rw-r--r-- | les/fetcher.go | 46 |
1 files changed, 32 insertions, 14 deletions
diff --git a/les/fetcher.go b/les/fetcher.go index 4a0830a8a..de706de5e 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -125,7 +125,7 @@ func (f *lightFetcher) syncLoop() { f.pm.wg.Add(1) defer f.pm.wg.Done() - requestStarted := false + requesting := false for { select { case <-f.pm.quitSync: @@ -134,13 +134,13 @@ func (f *lightFetcher) syncLoop() { // no further requests are necessary or possible case newAnnounce := <-f.requestChn: f.lock.Lock() - s := requestStarted - requestStarted = false + s := requesting + requesting = false if !f.syncing && !(newAnnounce && s) { - if peer, node, amount := f.nextRequest(); node != nil { - requestStarted = true - reqID, started := f.request(peer, node, amount) - if started { + reqID := getNextReqID() + if peer, node, amount, retry := f.nextRequest(reqID); node != nil { + requesting = true + if reqID, ok := f.request(peer, reqID, node, amount); ok { go func() { time.Sleep(softRequestTimeout) f.reqMu.Lock() @@ -154,6 +154,14 @@ func (f *lightFetcher) syncLoop() { f.requestChn <- false }() } + } else { + if retry { + requesting = true + go func() { + time.Sleep(time.Millisecond * 100) + f.requestChn <- false + }() + } } } f.lock.Unlock() @@ -344,10 +352,11 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo } // request initiates a header download request from a certain peer -func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) { +func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) { fp := f.peers[p] if fp == nil { glog.V(logger.Debug).Infof("request: unknown peer") + p.fcServer.DeassignRequest(reqID) return 0, false } if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) { @@ -357,10 +366,10 @@ func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint f.pm.synchronise(p) f.syncDone <- p }() + p.fcServer.DeassignRequest(reqID) return 0, false } - reqID := getNextReqID() n.requested = true cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) p.fcServer.SendRequest(reqID, cost) @@ -400,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // nextRequest selects the peer and announced head to be requested next, amount // to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { +func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) { var ( bestHash common.Hash bestAmount uint64 @@ -420,21 +429,24 @@ func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { } } if bestTd == f.maxConfirmedTd { - return nil, nil, 0 + return nil, nil, 0, false } - peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) { + peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) { fp := f.peers[p] if fp == nil || fp.nodeByHash[bestHash] == nil { return false, 0 } return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) }) + if !locked { + return nil, nil, 0, true + } var node *fetcherTreeNode if peer != nil { node = f.peers[peer].nodeByHash[bestHash] } - return peer, node, bestAmount + return peer, node, bestAmount, false } // deliverHeaders delivers header download request responses for processing @@ -442,9 +454,10 @@ func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} } -// processResponse processes header download request responses +// processResponse processes header download request responses, returns true if successful func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { + glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8]) return false } headers := make([]*types.Header, req.amount) @@ -452,12 +465,17 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo headers[int(req.amount)-1-i] = header } if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { + if err == core.BlockFutureErr { + return true + } + glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err) return false } tds := make([]*big.Int, len(headers)) for i, header := range headers { td := f.chain.GetTd(header.Hash(), header.Number.Uint64()) if td == nil { + glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers)) return false } tds[i] = td |