diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-03-23 03:44:22 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-23 03:44:22 +0800 |
commit | 525116dbff916825463931361f75e75e955c12e2 (patch) | |
tree | b272801c420ad9a591f227919567c7952b0bd512 /les/fetcher.go | |
parent | 1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff) | |
download | go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2 go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip |
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
Diffstat (limited to 'les/fetcher.go')
-rw-r--r-- | les/fetcher.go | 188 |
1 files changed, 111 insertions, 77 deletions
diff --git a/les/fetcher.go b/les/fetcher.go index f9e517d25..353e91932 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -135,35 +135,38 @@ func (f *lightFetcher) syncLoop() { f.lock.Lock() s := requesting requesting = false + var ( + rq *distReq + reqID uint64 + ) if !f.syncing && !(newAnnounce && s) { - reqID := getNextReqID() - if peer, node, amount, retry := f.nextRequest(reqID); node != nil { - requesting = true - if reqID, ok := f.request(peer, reqID, node, amount); ok { - go func() { - time.Sleep(softRequestTimeout) - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - req.timeout = true - f.requested[reqID] = req - } - f.reqMu.Unlock() - // keep starting new requests while possible - f.requestChn <- false - }() - } - } else { - if retry { - requesting = true - go func() { - time.Sleep(time.Millisecond * 100) - f.requestChn <- false - }() - } - } + rq, reqID = f.nextRequest() } + syncing := f.syncing f.lock.Unlock() + + if rq != nil { + requesting = true + _, ok := <-f.pm.reqDist.queue(rq) + if !ok { + f.requestChn <- false + } + + if !syncing { + go func() { + time.Sleep(softRequestTimeout) + f.reqMu.Lock() + req, ok := f.requested[reqID] + if ok { + req.timeout = true + f.requested[reqID] = req + } + f.reqMu.Unlock() + // keep starting new requests while possible + f.requestChn <- false + }() + } + } case reqID := <-f.timeoutChn: f.reqMu.Lock() req, ok := f.requested[reqID] @@ -334,6 +337,12 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo f.lock.Lock() defer f.lock.Unlock() + if f.syncing { + // always return true when syncing + // false positives are acceptable, a more sophisticated condition can be implemented later + return true + } + fp := f.peers[p] if fp == nil || fp.root == nil { return false @@ -346,43 +355,13 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo f.chain.LockChain() defer f.chain.UnlockChain() // if it's older than the peer's block tree root but it's in the same canonical chain - // than the root, we can still be sure the peer knows it + // as the root, we can still be sure the peer knows it + // + // when syncing, just check if it is part of the known chain, there is nothing better we + // can do since we do not know the most recent block hash yet return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash } -// request initiates a header download request from a certain peer -func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) { - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Requesting from unknown peer") - p.fcServer.DeassignRequest(reqID) - return 0, false - } - if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) { - f.syncing = true - go func() { - p.Log().Debug("Synchronisation started") - f.pm.synchronise(p) - f.syncDone <- p - }() - p.fcServer.DeassignRequest(reqID) - return 0, false - } - - n.requested = true - cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) - p.fcServer.SendRequest(reqID, cost) - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()} - f.reqMu.Unlock() - go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true) - go func() { - time.Sleep(hardRequestTimeout) - f.timeoutChn <- reqID - }() - return reqID, true -} - // requestAmount calculates the amount of headers to be downloaded starting // from a certain head backwards func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 { @@ -408,12 +387,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // nextRequest selects the peer and announced head to be requested next, amount // to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) { +func (f *lightFetcher) nextRequest() (*distReq, uint64) { var ( bestHash common.Hash bestAmount uint64 ) bestTd := f.maxConfirmedTd + bestSyncing := false for p, fp := range f.peers { for hash, n := range fp.nodeByHash { @@ -423,29 +403,83 @@ func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint6 bestHash = hash bestAmount = amount bestTd = n.td + bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) } } } } if bestTd == f.maxConfirmedTd { - return nil, nil, 0, false - } - - peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) { - fp := f.peers[p] - if fp == nil || fp.nodeByHash[bestHash] == nil { - return false, 0 + return nil, 0 + } + + f.syncing = bestSyncing + + var rq *distReq + reqID := getNextReqID() + if f.syncing { + rq = &distReq{ + getCost: func(dp distPeer) uint64 { + return 0 + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + fp := f.peers[p] + return fp != nil && fp.nodeByHash[bestHash] != nil + }, + request: func(dp distPeer) func() { + go func() { + p := dp.(*peer) + p.Log().Debug("Synchronisation started") + f.pm.synchronise(p) + f.syncDone <- p + }() + return nil + }, + } + } else { + rq = &distReq{ + getCost: func(dp distPeer) uint64 { + p := dp.(*peer) + return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + f.lock.Lock() + defer f.lock.Unlock() + + fp := f.peers[p] + if fp == nil { + return false + } + n := fp.nodeByHash[bestHash] + return n != nil && !n.requested + }, + request: func(dp distPeer) func() { + p := dp.(*peer) + f.lock.Lock() + fp := f.peers[p] + if fp != nil { + n := fp.nodeByHash[bestHash] + if n != nil { + n.requested = true + } + } + f.lock.Unlock() + + cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + p.fcServer.QueueRequest(reqID, cost) + f.reqMu.Lock() + f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} + f.reqMu.Unlock() + go func() { + time.Sleep(hardRequestTimeout) + f.timeoutChn <- reqID + }() + return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } + }, } - return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) - }) - if !locked { - return nil, nil, 0, true - } - var node *fetcherTreeNode - if peer != nil { - node = f.peers[peer].nodeByHash[bestHash] } - return peer, node, bestAmount, false + return rq, reqID } // deliverHeaders delivers header download request responses for processing |