From 769657060e888612e7d585c6b6eae16a64c4ad19 Mon Sep 17 00:00:00 2001 From: b00ris Date: Thu, 24 Jan 2019 14:18:26 +0300 Subject: les: implement ultralight client (#16904) For more information about this light client mode, read https://hackmd.io/s/HJy7jjZpm --- les/fetcher.go | 300 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 211 insertions(+), 89 deletions(-) (limited to 'les/fetcher.go') diff --git a/les/fetcher.go b/les/fetcher.go index 2615f69df..aa3101af7 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -43,7 +43,7 @@ const ( type lightFetcher struct { pm *ProtocolManager odr *LesOdr - chain *light.LightChain + chain lightChain lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests maxConfirmedTd *big.Int @@ -52,11 +52,19 @@ type lightFetcher struct { syncing bool syncDone chan *peer - reqMu sync.RWMutex // reqMu protects access to sent header fetch requests - requested map[uint64]fetchRequest - deliverChn chan fetchResponse - timeoutChn chan uint64 - requestChn chan bool // true if initiated from outside + reqMu sync.RWMutex // reqMu protects access to sent header fetch requests + requested map[uint64]fetchRequest + deliverChn chan fetchResponse + timeoutChn chan uint64 + requestChn chan bool // true if initiated from outside + lastTrustedHeader *types.Header +} + +// lightChain extends the BlockChain interface by locking. +type lightChain interface { + BlockChain + LockChain() + UnlockChain() } // fetcherPeerInfo holds fetcher-specific information about each active peer @@ -145,6 +153,7 @@ func (f *lightFetcher) syncLoop() { reqID uint64 syncing bool ) + if !f.syncing && !(newAnnounce && s) { rq, reqID, syncing = f.nextRequest() } @@ -227,7 +236,6 @@ func (f *lightFetcher) registerPeer(p *peer) { f.lock.Lock() defer f.lock.Unlock() - f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} } @@ -280,8 +288,10 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { fp.nodeCnt = 0 fp.nodeByHash = make(map[common.Hash]*fetcherTreeNode) } + // check if the node count is too high to add new nodes, discard oldest ones if necessary if n != nil { - // check if the node count is too high to add new nodes, discard oldest ones if necessary + // n is now the reorg common ancestor, add a new branch of nodes + // check if the node count is too high to add new nodes locked := false for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil { if !locked { @@ -325,6 +335,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { fp.nodeByHash[n.hash] = n } } + if n == nil { // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed if fp.root != nil { @@ -411,25 +422,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // to be downloaded starting from the head backwards is also returned func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) { var ( - bestHash common.Hash - bestAmount uint64 + bestHash common.Hash + bestAmount uint64 + bestTd *big.Int + bestSyncing bool ) - bestTd := f.maxConfirmedTd - bestSyncing := false + bestHash, bestAmount, bestTd, bestSyncing = f.findBestRequest() - for p, fp := range f.peers { - for hash, n := range fp.nodeByHash { - if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) { - amount := f.requestAmount(p, n) - if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount { - bestHash = hash - bestAmount = amount - bestTd = n.td - bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) - } - } - } - } if bestTd == f.maxConfirmedTd { return nil, 0, false } @@ -437,72 +436,140 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) { var rq *distReq reqID := genReqID() if bestSyncing { - rq = &distReq{ - getCost: func(dp distPeer) uint64 { - return 0 - }, - canSend: func(dp distPeer) bool { - p := dp.(*peer) - f.lock.Lock() - defer f.lock.Unlock() - - fp := f.peers[p] - return fp != nil && fp.nodeByHash[bestHash] != nil - }, - request: func(dp distPeer) func() { - go func() { - p := dp.(*peer) - p.Log().Debug("Synchronisation started") - f.pm.synchronise(p) - f.syncDone <- p - }() - return nil - }, - } + rq = f.newFetcherDistReqForSync(bestHash) } else { - rq = &distReq{ - getCost: func(dp distPeer) uint64 { - p := dp.(*peer) - return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) - }, - canSend: func(dp distPeer) bool { + rq = f.newFetcherDistReq(bestHash, reqID, bestAmount) + } + return rq, reqID, bestSyncing +} + +// findBestRequest finds the best head to request that has been announced by but not yet requested from a known peer. +// It also returns the announced Td (which should be verified after fetching the head), +// the necessary amount to request and whether a downloader sync is necessary instead of a normal header request. +func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) { + bestTd = f.maxConfirmedTd + bestSyncing = false + + for p, fp := range f.peers { + for hash, n := range fp.nodeByHash { + if f.checkKnownNode(p, n) || n.requested { + continue + } + + //if ulc mode is disabled, isTrustedHash returns true + amount := f.requestAmount(p, n) + if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && (f.isTrustedHash(hash) || f.maxConfirmedTd.Int64() == 0) { + bestHash = hash + bestTd = n.td + bestAmount = amount + bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) + } + } + } + return +} + +// isTrustedHash checks if the block can be trusted by the minimum trusted fraction. +func (f *lightFetcher) isTrustedHash(hash common.Hash) bool { + if !f.pm.isULCEnabled() { + return true + } + + var numAgreed int + for p, fp := range f.peers { + if !p.isTrusted { + continue + } + if _, ok := fp.nodeByHash[hash]; !ok { + continue + } + + numAgreed++ + } + + return 100*numAgreed/len(f.pm.ulc.trustedKeys) >= f.pm.ulc.minTrustedFraction +} + +func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { + return &distReq{ + getCost: func(dp distPeer) uint64 { + return 0 + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + f.lock.Lock() + defer f.lock.Unlock() + + if p.isOnlyAnnounce { + return false + } + + fp := f.peers[p] + return fp != nil && fp.nodeByHash[bestHash] != nil + }, + request: func(dp distPeer) func() { + if f.pm.isULCEnabled() { + //keep last trusted header before sync + f.setLastTrustedHeader(f.chain.CurrentHeader()) + } + go func() { p := dp.(*peer) - f.lock.Lock() - defer f.lock.Unlock() + p.Log().Debug("Synchronisation started") + f.pm.synchronise(p) + f.syncDone <- p + }() + return nil + }, + } +} - fp := f.peers[p] - if fp == nil { - return false - } +// newFetcherDistReq creates a new request for the distributor. +func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq { + return &distReq{ + getCost: func(dp distPeer) uint64 { + p := dp.(*peer) + return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + f.lock.Lock() + defer f.lock.Unlock() + + if p.isOnlyAnnounce { + return false + } + + fp := f.peers[p] + if fp == nil { + return false + } + n := fp.nodeByHash[bestHash] + return n != nil && !n.requested + }, + request: func(dp distPeer) func() { + p := dp.(*peer) + f.lock.Lock() + fp := f.peers[p] + if fp != nil { n := fp.nodeByHash[bestHash] - return n != nil && !n.requested - }, - request: func(dp distPeer) func() { - p := dp.(*peer) - f.lock.Lock() - fp := f.peers[p] - if fp != nil { - n := fp.nodeByHash[bestHash] - if n != nil { - n.requested = true - } + if n != nil { + n.requested = true } - f.lock.Unlock() - - cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) - p.fcServer.QueueRequest(reqID, cost) - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} - f.reqMu.Unlock() - go func() { - time.Sleep(hardRequestTimeout) - f.timeoutChn <- reqID - }() - return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } - }, - } + } + f.lock.Unlock() + + cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + p.fcServer.QueueRequest(reqID, cost) + f.reqMu.Lock() + f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} + f.reqMu.Unlock() + go func() { + time.Sleep(hardRequestTimeout) + f.timeoutChn <- reqID + }() + return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } + }, } - return rq, reqID, bestSyncing } // deliverHeaders delivers header download request responses for processing @@ -520,6 +587,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo for i, header := range resp.headers { headers[int(req.amount)-1-i] = header } + if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { if err == consensus.ErrFutureBlock { return true @@ -544,6 +612,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo // downloaded and validated batch or headers func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { var maxTd *big.Int + for p, fp := range f.peers { if !f.checkAnnouncedHeaders(fp, headers, tds) { p.Log().Debug("Inconsistent announcement") @@ -553,6 +622,7 @@ func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { maxTd = fp.confirmedTd } } + if maxTd != nil { f.updateMaxConfirmedTd(maxTd) } @@ -640,22 +710,72 @@ func (f *lightFetcher) checkSyncedHeaders(p *peer) { p.Log().Debug("Unknown peer to check sync headers") return } + n := fp.lastAnnounced var td *big.Int + + var h *types.Header + if f.pm.isULCEnabled() { + var unapprovedHashes []common.Hash + // Overwrite last announced for ULC mode + h, unapprovedHashes = f.lastTrustedTreeNode(p) + //rollback untrusted blocks + f.chain.Rollback(unapprovedHashes) + //overwrite to last trusted + n = fp.nodeByHash[h.Hash()] + } + + //find last valid block for n != nil { if td = f.chain.GetTd(n.hash, n.number); td != nil { break } n = n.parent } - // now n is the latest downloaded header after syncing + + // Now n is the latest downloaded/approved header after syncing if n == nil { p.Log().Debug("Synchronisation failed") go f.pm.removePeer(p.id) - } else { - header := f.chain.GetHeader(n.hash, n.number) - f.newHeaders([]*types.Header{header}, []*big.Int{td}) + return } + header := f.chain.GetHeader(n.hash, n.number) + f.newHeaders([]*types.Header{header}, []*big.Int{td}) +} + +// lastTrustedTreeNode return last approved treeNode and a list of unapproved hashes +func (f *lightFetcher) lastTrustedTreeNode(p *peer) (*types.Header, []common.Hash) { + unapprovedHashes := make([]common.Hash, 0) + current := f.chain.CurrentHeader() + + if f.lastTrustedHeader == nil { + return current, unapprovedHashes + } + + canonical := f.chain.CurrentHeader() + if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() { + canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64()) + } + commonAncestor := rawdb.FindCommonAncestor(f.pm.chainDb, canonical, f.lastTrustedHeader) + if commonAncestor == nil { + log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash()) + return current, unapprovedHashes + } + + for current.Hash() == commonAncestor.Hash() { + if f.isTrustedHash(current.Hash()) { + break + } + unapprovedHashes = append(unapprovedHashes, current.Hash()) + current = f.chain.GetHeader(current.ParentHash, current.Number.Uint64()-1) + } + return current, unapprovedHashes +} + +func (f *lightFetcher) setLastTrustedHeader(h *types.Header) { + f.lock.Lock() + defer f.lock.Unlock() + f.lastTrustedHeader = h } // checkKnownNode checks if a block tree node is known (downloaded and validated) @@ -747,6 +867,7 @@ func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { if f.lastUpdateStats != nil { f.lastUpdateStats.next = newEntry } + f.lastUpdateStats = newEntry for p := range f.peers { f.checkUpdateStats(p, newEntry) @@ -769,6 +890,7 @@ func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) { p.Log().Debug("Unknown peer to check update stats") return } + if newEntry != nil && fp.firstUpdateStats == nil { fp.firstUpdateStats = newEntry } -- cgit v1.2.3