aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-05-09 16:51:02 +0800
committerSonic <sonic@dexon.org>2019-05-09 16:51:02 +0800
commit31f99d2c7da15226627ea71acd8bd55093c6e7f8 (patch)
tree4781a6017fd3d40a4f7abe9e066081ea663c45f2
parent5709a57dcf41cc3cb1c09575dd02081e3f4ad496 (diff)
downloaddexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar
dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.gz
dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.bz2
dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.lz
dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.xz
dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.zst
dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.zip
lds: implement request headers with gov state
-rw-r--r--lds/backend.go9
-rw-r--r--lds/fetcher.go2
-rw-r--r--lds/handler.go61
-rw-r--r--lds/peer.go14
-rw-r--r--lds/protocol.go2
-rw-r--r--lds/server.go2
6 files changed, 75 insertions, 15 deletions
diff --git a/lds/backend.go b/lds/backend.go
index 6289f2233..5a2fea920 100644
--- a/lds/backend.go
+++ b/lds/backend.go
@@ -78,6 +78,13 @@ type LightDexon struct {
wg sync.WaitGroup
}
+// TODO: implement this
+type dummyGov struct{}
+
+func (d *dummyGov) GetRoundHeight(round uint64) uint64 {
+ return 0
+}
+
func New(ctx *node.ServiceContext, config *dex.Config) (*LightDexon, error) {
chainDb, err := dex.CreateDB(ctx, config, "lightchaindata")
if err != nil {
@@ -137,7 +144,7 @@ func New(ctx *node.ServiceContext, config *dex.Config) (*LightDexon, error) {
}
ldex.txPool = light.NewTxPool(ldex.chainConfig, ldex.blockchain, ldex.relay)
- if ldex.protocolManager, err = NewProtocolManager(ldex.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, ldex.eventMux, ldex.engine, ldex.peers, ldex.blockchain, nil, chainDb, ldex.odr, ldex.relay, ldex.serverPool, quitSync, &ldex.wg); err != nil {
+ if ldex.protocolManager, err = NewProtocolManager(ldex.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, ldex.eventMux, ldex.engine, ldex.peers, ldex.blockchain, nil, chainDb, ldex.odr, ldex.relay, ldex.serverPool, quitSync, &ldex.wg, &dummyGov{}); err != nil {
return nil, err
}
ldex.ApiBackend = &LdsApiBackend{ldex, nil}
diff --git a/lds/fetcher.go b/lds/fetcher.go
index b58918a08..f1a260abe 100644
--- a/lds/fetcher.go
+++ b/lds/fetcher.go
@@ -498,7 +498,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) {
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
- return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
+ return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true, true) }
},
}
}
diff --git a/lds/handler.go b/lds/handler.go
index ea8aa40f6..ca27b5b6c 100644
--- a/lds/handler.go
+++ b/lds/handler.go
@@ -95,6 +95,10 @@ type txPool interface {
Status(hashes []common.Hash) []core.TxStatus
}
+type governance interface {
+ GetRoundHeight(uint64) uint64
+}
+
type ProtocolManager struct {
lightSync bool
txpool txPool
@@ -103,6 +107,7 @@ type ProtocolManager struct {
chainConfig *params.ChainConfig
iConfig *light.IndexerConfig
blockchain BlockChain
+ gov governance
chainDb ethdb.Database
odr *LdsOdr
server *LdsServer
@@ -131,12 +136,13 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup, gov governance) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
eventMux: mux,
blockchain: blockchain,
+ gov: gov,
chainConfig: chainConfig,
iConfig: indexerConfig,
chainDb: chainDb,
@@ -420,10 +426,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
first := true
maxNonCanonical := uint64(100)
+ round := map[uint64]uint64{}
+
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
- headers []*types.Header
+ headers []*types.HeaderWithGovState
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
@@ -445,7 +453,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if origin == nil {
break
}
- headers = append(headers, origin)
+ headers = append(headers, &types.HeaderWithGovState{Header: origin})
+ if round[origin.Round] == 0 {
+ round[origin.Round] = origin.Number.Uint64()
+ }
bytes += estHeaderRlpSize
// Advance to the next header of the query
@@ -496,6 +507,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
+ if query.WithGov && len(headers) > 0 {
+ last := headers[len(headers)-1]
+ currentHeader := pm.blockchain.CurrentHeader()
+
+ // Do not reply if we don't have current gov state
+ if currentHeader.Number.Uint64() < last.Number.Uint64() {
+ log.Debug("Current header < last request",
+ "current", currentHeader.Number.Uint64(),
+ "last", last.Number.Uint64())
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
+ return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{})
+ }
+
+ snapshortHeight := map[uint64]struct{}{}
+ for r, height := range round {
+ if r == 0 {
+ continue
+ }
+ h := pm.gov.GetRoundHeight(r)
+ if h == 0 {
+ h = height
+ }
+ snapshortHeight[h] = struct{}{}
+ }
+
+ for _, header := range headers {
+ if _, exist := snapshortHeight[header.Number.Uint64()]; exist {
+ s, err := pm.blockchain.GetGovStateByHash(header.Hash())
+ if err != nil {
+ log.Warn("Get gov state by hash fail", "number", header.Number.Uint64())
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
+ return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{})
+ }
+ header.GovState = s
+ }
+ }
+ }
+
bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
return p.SendBlockHeaders(req.ReqID, bv, headers)
@@ -1090,7 +1141,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse, withGov) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
@@ -1114,7 +1165,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse, withGov) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
diff --git a/lds/peer.go b/lds/peer.go
index a5dcf0421..5cd529c4a 100644
--- a/lds/peer.go
+++ b/lds/peer.go
@@ -185,7 +185,7 @@ func (p *peer) SendAnnounce(request announceData) error {
}
// SendBlockHeaders sends a batch of block headers to the remote peer.
-func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error {
+func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.HeaderWithGovState) error {
return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers)
}
@@ -228,16 +228,16 @@ func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error {
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
-func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
- p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
- return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool, withGov bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse, "withGov", withGov)
+ return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov})
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
-func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error {
- p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
- return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool, withGov bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse, "withGov", withGov)
+ return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov})
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
diff --git a/lds/protocol.go b/lds/protocol.go
index 2aa03466c..0127a455c 100644
--- a/lds/protocol.go
+++ b/lds/protocol.go
@@ -172,6 +172,8 @@ type getBlockHeadersData struct {
Amount uint64 // Maximum number of headers to retrieve
Skip uint64 // Blocks to skip between consecutive headers
Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
+
+ WithGov bool
}
// hashOrNumber is a combined field for specifying an origin block.
diff --git a/lds/server.go b/lds/server.go
index 815ea6b09..971bcfdb0 100644
--- a/lds/server.go
+++ b/lds/server.go
@@ -51,7 +51,7 @@ type LdsServer struct {
func NewLdsServer(dex *dex.Dexon, config *dex.Config) (*LdsServer, error) {
quitSync := make(chan struct{})
- pm, err := NewProtocolManager(dex.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, dex.EventMux(), dex.Engine(), newPeerSet(), dex.BlockChain(), dex.TxPool(), dex.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
+ pm, err := NewProtocolManager(dex.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, dex.EventMux(), dex.Engine(), newPeerSet(), dex.BlockChain(), dex.TxPool(), dex.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup), &dummyGov{})
if err != nil {
return nil, err
}