aboutsummaryrefslogtreecommitdiffstats
path: root/lds/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'lds/handler.go')
-rw-r--r--lds/handler.go61
1 files changed, 56 insertions, 5 deletions
diff --git a/lds/handler.go b/lds/handler.go
index ea8aa40f6..ca27b5b6c 100644
--- a/lds/handler.go
+++ b/lds/handler.go
@@ -95,6 +95,10 @@ type txPool interface {
Status(hashes []common.Hash) []core.TxStatus
}
+type governance interface {
+ GetRoundHeight(uint64) uint64
+}
+
type ProtocolManager struct {
lightSync bool
txpool txPool
@@ -103,6 +107,7 @@ type ProtocolManager struct {
chainConfig *params.ChainConfig
iConfig *light.IndexerConfig
blockchain BlockChain
+ gov governance
chainDb ethdb.Database
odr *LdsOdr
server *LdsServer
@@ -131,12 +136,13 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup, gov governance) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
eventMux: mux,
blockchain: blockchain,
+ gov: gov,
chainConfig: chainConfig,
iConfig: indexerConfig,
chainDb: chainDb,
@@ -420,10 +426,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
first := true
maxNonCanonical := uint64(100)
+ round := map[uint64]uint64{}
+
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
- headers []*types.Header
+ headers []*types.HeaderWithGovState
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
@@ -445,7 +453,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if origin == nil {
break
}
- headers = append(headers, origin)
+ headers = append(headers, &types.HeaderWithGovState{Header: origin})
+ if round[origin.Round] == 0 {
+ round[origin.Round] = origin.Number.Uint64()
+ }
bytes += estHeaderRlpSize
// Advance to the next header of the query
@@ -496,6 +507,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
+ if query.WithGov && len(headers) > 0 {
+ last := headers[len(headers)-1]
+ currentHeader := pm.blockchain.CurrentHeader()
+
+ // Do not reply if we don't have current gov state
+ if currentHeader.Number.Uint64() < last.Number.Uint64() {
+ log.Debug("Current header < last request",
+ "current", currentHeader.Number.Uint64(),
+ "last", last.Number.Uint64())
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
+ return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{})
+ }
+
+ snapshortHeight := map[uint64]struct{}{}
+ for r, height := range round {
+ if r == 0 {
+ continue
+ }
+ h := pm.gov.GetRoundHeight(r)
+ if h == 0 {
+ h = height
+ }
+ snapshortHeight[h] = struct{}{}
+ }
+
+ for _, header := range headers {
+ if _, exist := snapshortHeight[header.Number.Uint64()]; exist {
+ s, err := pm.blockchain.GetGovStateByHash(header.Hash())
+ if err != nil {
+ log.Warn("Get gov state by hash fail", "number", header.Number.Uint64())
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
+ return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{})
+ }
+ header.GovState = s
+ }
+ }
+ }
+
bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
return p.SendBlockHeaders(req.ReqID, bv, headers)
@@ -1090,7 +1141,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse, withGov) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
@@ -1114,7 +1165,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse, withGov) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)