aboutsummaryrefslogtreecommitdiffstats
path: root/les/fetcher.go
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2019-08-21 17:29:34 +0800
committerFelföldi Zsolt <zsfelfoldi@gmail.com>2019-08-21 17:29:34 +0800
commit2ed729d38e90154d1f23ebdf5a9f2808212276d8 (patch)
tree04aad0e43f8a0b191809a86a2b665535f1f87a93 /les/fetcher.go
parent4aee0d1994ad286c18aa3a586ae950e96979a7f1 (diff)
downloadgo-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.tar
go-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.tar.gz
go-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.tar.bz2
go-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.tar.lz
go-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.tar.xz
go-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.tar.zst
go-tangerine-2ed729d38e90154d1f23ebdf5a9f2808212276d8.zip
les: handler separation (#19639)
les: handler separation
Diffstat (limited to 'les/fetcher.go')
-rw-r--r--les/fetcher.go75
1 files changed, 37 insertions, 38 deletions
diff --git a/les/fetcher.go b/les/fetcher.go
index 76e4f076a..df76c56d7 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -40,9 +40,8 @@ const (
// ODR system to ensure that we only request data related to a certain block from peers who have already processed
// and announced that block.
type lightFetcher struct {
- pm *ProtocolManager
- odr *LesOdr
- chain lightChain
+ handler *clientHandler
+ chain *light.LightChain
lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests
maxConfirmedTd *big.Int
@@ -58,13 +57,9 @@ type lightFetcher struct {
requestTriggered bool
requestTrigger chan struct{}
lastTrustedHeader *types.Header
-}
-// lightChain extends the BlockChain interface by locking.
-type lightChain interface {
- BlockChain
- LockChain()
- UnlockChain()
+ closeCh chan struct{}
+ wg sync.WaitGroup
}
// fetcherPeerInfo holds fetcher-specific information about each active peer
@@ -114,32 +109,37 @@ type fetchResponse struct {
}
// newLightFetcher creates a new light fetcher
-func newLightFetcher(pm *ProtocolManager) *lightFetcher {
+func newLightFetcher(h *clientHandler) *lightFetcher {
f := &lightFetcher{
- pm: pm,
- chain: pm.blockchain.(*light.LightChain),
- odr: pm.odr,
+ handler: h,
+ chain: h.backend.blockchain,
peers: make(map[*peer]*fetcherPeerInfo),
deliverChn: make(chan fetchResponse, 100),
requested: make(map[uint64]fetchRequest),
timeoutChn: make(chan uint64),
requestTrigger: make(chan struct{}, 1),
syncDone: make(chan *peer),
+ closeCh: make(chan struct{}),
maxConfirmedTd: big.NewInt(0),
}
- pm.peers.notify(f)
+ h.backend.peers.notify(f)
- f.pm.wg.Add(1)
+ f.wg.Add(1)
go f.syncLoop()
return f
}
+func (f *lightFetcher) close() {
+ close(f.closeCh)
+ f.wg.Wait()
+}
+
// syncLoop is the main event loop of the light fetcher
func (f *lightFetcher) syncLoop() {
- defer f.pm.wg.Done()
+ defer f.wg.Done()
for {
select {
- case <-f.pm.quitSync:
+ case <-f.closeCh:
return
// request loop keeps running until no further requests are necessary or possible
case <-f.requestTrigger:
@@ -156,7 +156,7 @@ func (f *lightFetcher) syncLoop() {
f.lock.Unlock()
if rq != nil {
- if _, ok := <-f.pm.reqDist.queue(rq); ok {
+ if _, ok := <-f.handler.backend.reqDist.queue(rq); ok {
if syncing {
f.lock.Lock()
f.syncing = true
@@ -187,9 +187,9 @@ func (f *lightFetcher) syncLoop() {
}
f.reqMu.Unlock()
if ok {
- f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
+ f.handler.backend.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
req.peer.Log().Debug("Fetching data timed out hard")
- go f.pm.removePeer(req.peer.id)
+ go f.handler.removePeer(req.peer.id)
}
case resp := <-f.deliverChn:
f.reqMu.Lock()
@@ -202,12 +202,12 @@ func (f *lightFetcher) syncLoop() {
}
f.reqMu.Unlock()
if ok {
- f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
+ f.handler.backend.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
}
f.lock.Lock()
if !ok || !(f.syncing || f.processResponse(req, resp)) {
resp.peer.Log().Debug("Failed processing response")
- go f.pm.removePeer(resp.peer.id)
+ go f.handler.removePeer(resp.peer.id)
}
f.lock.Unlock()
case p := <-f.syncDone:
@@ -264,7 +264,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) {
if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
// announced tds should be strictly monotonic
p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td)
- go f.pm.removePeer(p.id)
+ go f.handler.removePeer(p.id)
return
}
@@ -297,7 +297,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) {
// if one of root's children is canonical, keep it, delete other branches and root itself
var newRoot *fetcherTreeNode
for i, nn := range fp.root.children {
- if rawdb.ReadCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
+ if rawdb.ReadCanonicalHash(f.handler.backend.chainDb, nn.number) == nn.hash {
fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
nn.parent = nil
newRoot = nn
@@ -390,7 +390,7 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64, ha
//
// when syncing, just check if it is part of the known chain, there is nothing better we
// can do since we do not know the most recent block hash yet
- return rawdb.ReadCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.pm.chainDb, number) == hash
+ return rawdb.ReadCanonicalHash(f.handler.backend.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.handler.backend.chainDb, number) == hash
}
// requestAmount calculates the amount of headers to be downloaded starting
@@ -453,8 +453,7 @@ func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint6
if f.checkKnownNode(p, n) || n.requested {
continue
}
-
- //if ulc mode is disabled, isTrustedHash returns true
+ // if ulc mode is disabled, isTrustedHash returns true
amount := f.requestAmount(p, n)
if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && (f.isTrustedHash(hash) || f.maxConfirmedTd.Int64() == 0) {
bestHash = hash
@@ -470,7 +469,7 @@ func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint6
// isTrustedHash checks if the block can be trusted by the minimum trusted fraction.
func (f *lightFetcher) isTrustedHash(hash common.Hash) bool {
// If ultra light cliet mode is disabled, trust all hashes
- if f.pm.ulc == nil {
+ if f.handler.ulc == nil {
return true
}
// Ultra light enabled, only trust after enough confirmations
@@ -480,7 +479,7 @@ func (f *lightFetcher) isTrustedHash(hash common.Hash) bool {
agreed++
}
}
- return 100*agreed/len(f.pm.ulc.keys) >= f.pm.ulc.fraction
+ return 100*agreed/len(f.handler.ulc.keys) >= f.handler.ulc.fraction
}
func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {
@@ -500,14 +499,14 @@ func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {
return fp != nil && fp.nodeByHash[bestHash] != nil
},
request: func(dp distPeer) func() {
- if f.pm.ulc != nil {
+ if f.handler.ulc != nil {
// Keep last trusted header before sync
f.setLastTrustedHeader(f.chain.CurrentHeader())
}
go func() {
p := dp.(*peer)
p.Log().Debug("Synchronisation started")
- f.pm.synchronise(p)
+ f.handler.synchronise(p)
f.syncDone <- p
}()
return nil
@@ -607,7 +606,7 @@ func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
for p, fp := range f.peers {
if !f.checkAnnouncedHeaders(fp, headers, tds) {
p.Log().Debug("Inconsistent announcement")
- go f.pm.removePeer(p.id)
+ go f.handler.removePeer(p.id)
}
if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
maxTd = fp.confirmedTd
@@ -705,7 +704,7 @@ func (f *lightFetcher) checkSyncedHeaders(p *peer) {
node = fp.lastAnnounced
td *big.Int
)
- if f.pm.ulc != nil {
+ if f.handler.ulc != nil {
// Roll back untrusted blocks
h, unapproved := f.lastTrustedTreeNode(p)
f.chain.Rollback(unapproved)
@@ -721,7 +720,7 @@ func (f *lightFetcher) checkSyncedHeaders(p *peer) {
// Now node is the latest downloaded/approved header after syncing
if node == nil {
p.Log().Debug("Synchronisation failed")
- go f.pm.removePeer(p.id)
+ go f.handler.removePeer(p.id)
return
}
header := f.chain.GetHeader(node.hash, node.number)
@@ -741,7 +740,7 @@ func (f *lightFetcher) lastTrustedTreeNode(p *peer) (*types.Header, []common.Has
if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() {
canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64())
}
- commonAncestor := rawdb.FindCommonAncestor(f.pm.chainDb, canonical, f.lastTrustedHeader)
+ commonAncestor := rawdb.FindCommonAncestor(f.handler.backend.chainDb, canonical, f.lastTrustedHeader)
if commonAncestor == nil {
log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash())
return current, unapprovedHashes
@@ -787,7 +786,7 @@ func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
}
if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
p.Log().Debug("Inconsistent announcement")
- go f.pm.removePeer(p.id)
+ go f.handler.removePeer(p.id)
}
if fp.confirmedTd != nil {
f.updateMaxConfirmedTd(fp.confirmedTd)
@@ -880,12 +879,12 @@ func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
fp.firstUpdateStats = newEntry
}
for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
- f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
+ f.handler.backend.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
fp.firstUpdateStats = fp.firstUpdateStats.next
}
if fp.confirmedTd != nil {
for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
- f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
+ f.handler.backend.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
fp.firstUpdateStats = fp.firstUpdateStats.next
}
}