diff options
Diffstat (limited to 'lds/handler.go')
-rw-r--r-- | lds/handler.go | 61 |
1 files changed, 56 insertions, 5 deletions
diff --git a/lds/handler.go b/lds/handler.go index ea8aa40f6..ca27b5b6c 100644 --- a/lds/handler.go +++ b/lds/handler.go @@ -95,6 +95,10 @@ type txPool interface { Status(hashes []common.Hash) []core.TxStatus } +type governance interface { + GetRoundHeight(uint64) uint64 +} + type ProtocolManager struct { lightSync bool txpool txPool @@ -103,6 +107,7 @@ type ProtocolManager struct { chainConfig *params.ChainConfig iConfig *light.IndexerConfig blockchain BlockChain + gov governance chainDb ethdb.Database odr *LdsOdr server *LdsServer @@ -131,12 +136,13 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup, gov governance) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, eventMux: mux, blockchain: blockchain, + gov: gov, chainConfig: chainConfig, iConfig: indexerConfig, chainDb: chainDb, @@ -420,10 +426,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { first := true maxNonCanonical := uint64(100) + round := map[uint64]uint64{} + // Gather headers until the fetch or network limits is reached var ( bytes common.StorageSize - headers []*types.Header + headers []*types.HeaderWithGovState unknown bool ) for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit { @@ -445,7 +453,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if origin == nil { break } - headers = append(headers, origin) + headers = append(headers, &types.HeaderWithGovState{Header: origin}) + if round[origin.Round] == 0 { + round[origin.Round] = origin.Number.Uint64() + } bytes += estHeaderRlpSize // Advance to the next header of the query @@ -496,6 +507,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } + if query.WithGov && len(headers) > 0 { + last := headers[len(headers)-1] + currentHeader := pm.blockchain.CurrentHeader() + + // Do not reply if we don't have current gov state + if currentHeader.Number.Uint64() < last.Number.Uint64() { + log.Debug("Current header < last request", + "current", currentHeader.Number.Uint64(), + "last", last.Number.Uint64()) + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) + return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{}) + } + + snapshortHeight := map[uint64]struct{}{} + for r, height := range round { + if r == 0 { + continue + } + h := pm.gov.GetRoundHeight(r) + if h == 0 { + h = height + } + snapshortHeight[h] = struct{}{} + } + + for _, header := range headers { + if _, exist := snapshortHeight[header.Number.Uint64()]; exist { + s, err := pm.blockchain.GetGovStateByHash(header.Hash()) + if err != nil { + log.Warn("Get gov state by hash fail", "number", header.Number.Uint64()) + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) + return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{}) + } + header.GovState = s + } + } + } + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) return p.SendBlockHeaders(req.ReqID, bv, headers) @@ -1090,7 +1141,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s peer := dp.(*peer) cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse, withGov) } }, } _, ok := <-pc.manager.reqDist.queue(rq) @@ -1114,7 +1165,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip peer := dp.(*peer) cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse, withGov) } }, } _, ok := <-pc.manager.reqDist.queue(rq) |