From 31f99d2c7da15226627ea71acd8bd55093c6e7f8 Mon Sep 17 00:00:00 2001
From: Sonic <sonic@dexon.org>
Date: Thu, 9 May 2019 16:51:02 +0800
Subject: lds: implement request headers with gov state

---
 lds/backend.go  |  9 ++++++++-
 lds/fetcher.go  |  2 +-
 lds/handler.go  | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 lds/peer.go     | 14 ++++++-------
 lds/protocol.go |  2 ++
 lds/server.go   |  2 +-
 6 files changed, 75 insertions(+), 15 deletions(-)

(limited to 'lds')

diff --git a/lds/backend.go b/lds/backend.go
index 6289f2233..5a2fea920 100644
--- a/lds/backend.go
+++ b/lds/backend.go
@@ -78,6 +78,13 @@ type LightDexon struct {
 	wg sync.WaitGroup
 }
 
+// TODO: implement this
+type dummyGov struct{}
+
+func (d *dummyGov) GetRoundHeight(round uint64) uint64 {
+	return 0
+}
+
 func New(ctx *node.ServiceContext, config *dex.Config) (*LightDexon, error) {
 	chainDb, err := dex.CreateDB(ctx, config, "lightchaindata")
 	if err != nil {
@@ -137,7 +144,7 @@ func New(ctx *node.ServiceContext, config *dex.Config) (*LightDexon, error) {
 	}
 
 	ldex.txPool = light.NewTxPool(ldex.chainConfig, ldex.blockchain, ldex.relay)
-	if ldex.protocolManager, err = NewProtocolManager(ldex.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, ldex.eventMux, ldex.engine, ldex.peers, ldex.blockchain, nil, chainDb, ldex.odr, ldex.relay, ldex.serverPool, quitSync, &ldex.wg); err != nil {
+	if ldex.protocolManager, err = NewProtocolManager(ldex.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, ldex.eventMux, ldex.engine, ldex.peers, ldex.blockchain, nil, chainDb, ldex.odr, ldex.relay, ldex.serverPool, quitSync, &ldex.wg, &dummyGov{}); err != nil {
 		return nil, err
 	}
 	ldex.ApiBackend = &LdsApiBackend{ldex, nil}
diff --git a/lds/fetcher.go b/lds/fetcher.go
index b58918a08..f1a260abe 100644
--- a/lds/fetcher.go
+++ b/lds/fetcher.go
@@ -498,7 +498,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) {
 					time.Sleep(hardRequestTimeout)
 					f.timeoutChn <- reqID
 				}()
-				return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
+				return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true, true) }
 			},
 		}
 	}
diff --git a/lds/handler.go b/lds/handler.go
index ea8aa40f6..ca27b5b6c 100644
--- a/lds/handler.go
+++ b/lds/handler.go
@@ -95,6 +95,10 @@ type txPool interface {
 	Status(hashes []common.Hash) []core.TxStatus
 }
 
+type governance interface {
+	GetRoundHeight(uint64) uint64
+}
+
 type ProtocolManager struct {
 	lightSync   bool
 	txpool      txPool
@@ -103,6 +107,7 @@ type ProtocolManager struct {
 	chainConfig *params.ChainConfig
 	iConfig     *light.IndexerConfig
 	blockchain  BlockChain
+	gov         governance
 	chainDb     ethdb.Database
 	odr         *LdsOdr
 	server      *LdsServer
@@ -131,12 +136,13 @@ type ProtocolManager struct {
 
 // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 // with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup, gov governance) (*ProtocolManager, error) {
 	// Create the protocol manager with the base fields
 	manager := &ProtocolManager{
 		lightSync:   lightSync,
 		eventMux:    mux,
 		blockchain:  blockchain,
+		gov:         gov,
 		chainConfig: chainConfig,
 		iConfig:     indexerConfig,
 		chainDb:     chainDb,
@@ -420,10 +426,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		first := true
 		maxNonCanonical := uint64(100)
 
+		round := map[uint64]uint64{}
+
 		// Gather headers until the fetch or network limits is reached
 		var (
 			bytes   common.StorageSize
-			headers []*types.Header
+			headers []*types.HeaderWithGovState
 			unknown bool
 		)
 		for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
@@ -445,7 +453,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			if origin == nil {
 				break
 			}
-			headers = append(headers, origin)
+			headers = append(headers, &types.HeaderWithGovState{Header: origin})
+			if round[origin.Round] == 0 {
+				round[origin.Round] = origin.Number.Uint64()
+			}
 			bytes += estHeaderRlpSize
 
 			// Advance to the next header of the query
@@ -496,6 +507,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			}
 		}
 
+		if query.WithGov && len(headers) > 0 {
+			last := headers[len(headers)-1]
+			currentHeader := pm.blockchain.CurrentHeader()
+
+			// Do not reply if we don't have current gov state
+			if currentHeader.Number.Uint64() < last.Number.Uint64() {
+				log.Debug("Current header < last request",
+					"current", currentHeader.Number.Uint64(),
+					"last", last.Number.Uint64())
+				bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
+				pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
+				return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{})
+			}
+
+			snapshortHeight := map[uint64]struct{}{}
+			for r, height := range round {
+				if r == 0 {
+					continue
+				}
+				h := pm.gov.GetRoundHeight(r)
+				if h == 0 {
+					h = height
+				}
+				snapshortHeight[h] = struct{}{}
+			}
+
+			for _, header := range headers {
+				if _, exist := snapshortHeight[header.Number.Uint64()]; exist {
+					s, err := pm.blockchain.GetGovStateByHash(header.Hash())
+					if err != nil {
+						log.Warn("Get gov state by hash fail", "number", header.Number.Uint64())
+						bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
+						pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
+						return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{})
+					}
+					header.GovState = s
+				}
+			}
+		}
+
 		bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
 		pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
 		return p.SendBlockHeaders(req.ReqID, bv, headers)
@@ -1090,7 +1141,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
 			peer := dp.(*peer)
 			cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
 			peer.fcServer.QueueRequest(reqID, cost)
-			return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+			return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse, withGov) }
 		},
 	}
 	_, ok := <-pc.manager.reqDist.queue(rq)
@@ -1114,7 +1165,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
 			peer := dp.(*peer)
 			cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
 			peer.fcServer.QueueRequest(reqID, cost)
-			return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+			return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse, withGov) }
 		},
 	}
 	_, ok := <-pc.manager.reqDist.queue(rq)
diff --git a/lds/peer.go b/lds/peer.go
index a5dcf0421..5cd529c4a 100644
--- a/lds/peer.go
+++ b/lds/peer.go
@@ -185,7 +185,7 @@ func (p *peer) SendAnnounce(request announceData) error {
 }
 
 // SendBlockHeaders sends a batch of block headers to the remote peer.
-func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error {
+func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.HeaderWithGovState) error {
 	return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers)
 }
 
@@ -228,16 +228,16 @@ func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error {
 
 // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
 // specified header query, based on the hash of an origin block.
-func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
-	p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
-	return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool, withGov bool) error {
+	p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse, "withGov", withGov)
+	return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov})
 }
 
 // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
 // specified header query, based on the number of an origin block.
-func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error {
-	p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
-	return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool, withGov bool) error {
+	p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse, "withGov", withGov)
+	return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov})
 }
 
 // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
diff --git a/lds/protocol.go b/lds/protocol.go
index 2aa03466c..0127a455c 100644
--- a/lds/protocol.go
+++ b/lds/protocol.go
@@ -172,6 +172,8 @@ type getBlockHeadersData struct {
 	Amount  uint64       // Maximum number of headers to retrieve
 	Skip    uint64       // Blocks to skip between consecutive headers
 	Reverse bool         // Query direction (false = rising towards latest, true = falling towards genesis)
+
+	WithGov bool
 }
 
 // hashOrNumber is a combined field for specifying an origin block.
diff --git a/lds/server.go b/lds/server.go
index 815ea6b09..971bcfdb0 100644
--- a/lds/server.go
+++ b/lds/server.go
@@ -51,7 +51,7 @@ type LdsServer struct {
 
 func NewLdsServer(dex *dex.Dexon, config *dex.Config) (*LdsServer, error) {
 	quitSync := make(chan struct{})
-	pm, err := NewProtocolManager(dex.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, dex.EventMux(), dex.Engine(), newPeerSet(), dex.BlockChain(), dex.TxPool(), dex.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
+	pm, err := NewProtocolManager(dex.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, dex.EventMux(), dex.Engine(), newPeerSet(), dex.BlockChain(), dex.TxPool(), dex.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup), &dummyGov{})
 	if err != nil {
 		return nil, err
 	}
-- 
cgit v1.2.3