diff options
author | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2016-12-15 18:13:52 +0800 |
---|---|---|
committer | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2017-01-06 11:34:31 +0800 |
commit | 93f9c023ccda2256079484d6c2a3159818ba6691 (patch) | |
tree | f2ed341904459184587ca1cc8667d8169761f752 /les/fetcher.go | |
parent | e0ee0cc66a4416edd47232649f4d4bca4a5e3c07 (diff) | |
download | go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.gz go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.bz2 go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.lz go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.xz go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.zst go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.zip |
les: fixed selectPeer deadlock, improved request distribution
les/flowcontrol: using proper types for relative and absolute times
Diffstat (limited to 'les/fetcher.go')
-rw-r--r-- | les/fetcher.go | 46 |
1 files changed, 32 insertions, 14 deletions
diff --git a/les/fetcher.go b/les/fetcher.go index c23af8da3..d0958870f 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -125,7 +125,7 @@ func (f *lightFetcher) syncLoop() { f.pm.wg.Add(1) defer f.pm.wg.Done() - requestStarted := false + requesting := false for { select { case <-f.pm.quitSync: @@ -134,13 +134,13 @@ func (f *lightFetcher) syncLoop() { // no further requests are necessary or possible case newAnnounce := <-f.requestChn: f.lock.Lock() - s := requestStarted - requestStarted = false + s := requesting + requesting = false if !f.syncing && !(newAnnounce && s) { - if peer, node, amount := f.nextRequest(); node != nil { - requestStarted = true - reqID, started := f.request(peer, node, amount) - if started { + reqID := getNextReqID() + if peer, node, amount, retry := f.nextRequest(reqID); node != nil { + requesting = true + if reqID, ok := f.request(peer, reqID, node, amount); ok { go func() { time.Sleep(softRequestTimeout) f.reqMu.Lock() @@ -154,6 +154,14 @@ func (f *lightFetcher) syncLoop() { f.requestChn <- false }() } + } else { + if retry { + requesting = true + go func() { + time.Sleep(time.Millisecond * 100) + f.requestChn <- false + }() + } } } f.lock.Unlock() @@ -344,10 +352,11 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo } // request initiates a header download request from a certain peer -func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) { +func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) { fp := f.peers[p] if fp == nil { glog.V(logger.Debug).Infof("request: unknown peer") + p.fcServer.DeassignRequest(reqID) return 0, false } if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) { @@ -357,10 +366,10 @@ func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint f.pm.synchronise(p) f.syncDone <- p }() + p.fcServer.DeassignRequest(reqID) return 0, false } - reqID := getNextReqID() n.requested = true cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) p.fcServer.SendRequest(reqID, cost) @@ -400,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // nextRequest selects the peer and announced head to be requested next, amount // to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { +func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) { var ( bestHash common.Hash bestAmount uint64 @@ -420,21 +429,24 @@ func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { } } if bestTd == f.maxConfirmedTd { - return nil, nil, 0 + return nil, nil, 0, false } - peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) { + peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) { fp := f.peers[p] if fp == nil || fp.nodeByHash[bestHash] == nil { return false, 0 } return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) }) + if !locked { + return nil, nil, 0, true + } var node *fetcherTreeNode if peer != nil { node = f.peers[peer].nodeByHash[bestHash] } - return peer, node, bestAmount + return peer, node, bestAmount, false } // deliverHeaders delivers header download request responses for processing @@ -442,9 +454,10 @@ func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} } -// processResponse processes header download request responses +// processResponse processes header download request responses, returns true if successful func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { + glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8]) return false } headers := make([]*types.Header, req.amount) @@ -452,12 +465,17 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo headers[int(req.amount)-1-i] = header } if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { + if err == core.BlockFutureErr { + return true + } + glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err) return false } tds := make([]*big.Int, len(headers)) for i, header := range headers { td := f.chain.GetTd(header.Hash(), header.Number.Uint64()) if td == nil { + glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers)) return false } tds[i] = td |