diff options
author | Sonic <sonic@dexon.org> | 2019-05-09 16:51:02 +0800 |
---|---|---|
committer | Sonic <sonic@dexon.org> | 2019-05-09 16:51:02 +0800 |
commit | 31f99d2c7da15226627ea71acd8bd55093c6e7f8 (patch) | |
tree | 4781a6017fd3d40a4f7abe9e066081ea663c45f2 | |
parent | 5709a57dcf41cc3cb1c09575dd02081e3f4ad496 (diff) | |
download | dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.gz dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.bz2 dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.lz dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.xz dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.tar.zst dexon-31f99d2c7da15226627ea71acd8bd55093c6e7f8.zip |
lds: implement request headers with gov state
-rw-r--r-- | lds/backend.go | 9 | ||||
-rw-r--r-- | lds/fetcher.go | 2 | ||||
-rw-r--r-- | lds/handler.go | 61 | ||||
-rw-r--r-- | lds/peer.go | 14 | ||||
-rw-r--r-- | lds/protocol.go | 2 | ||||
-rw-r--r-- | lds/server.go | 2 |
6 files changed, 75 insertions, 15 deletions
diff --git a/lds/backend.go b/lds/backend.go index 6289f2233..5a2fea920 100644 --- a/lds/backend.go +++ b/lds/backend.go @@ -78,6 +78,13 @@ type LightDexon struct { wg sync.WaitGroup } +// TODO: implement this +type dummyGov struct{} + +func (d *dummyGov) GetRoundHeight(round uint64) uint64 { + return 0 +} + func New(ctx *node.ServiceContext, config *dex.Config) (*LightDexon, error) { chainDb, err := dex.CreateDB(ctx, config, "lightchaindata") if err != nil { @@ -137,7 +144,7 @@ func New(ctx *node.ServiceContext, config *dex.Config) (*LightDexon, error) { } ldex.txPool = light.NewTxPool(ldex.chainConfig, ldex.blockchain, ldex.relay) - if ldex.protocolManager, err = NewProtocolManager(ldex.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, ldex.eventMux, ldex.engine, ldex.peers, ldex.blockchain, nil, chainDb, ldex.odr, ldex.relay, ldex.serverPool, quitSync, &ldex.wg); err != nil { + if ldex.protocolManager, err = NewProtocolManager(ldex.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, ldex.eventMux, ldex.engine, ldex.peers, ldex.blockchain, nil, chainDb, ldex.odr, ldex.relay, ldex.serverPool, quitSync, &ldex.wg, &dummyGov{}); err != nil { return nil, err } ldex.ApiBackend = &LdsApiBackend{ldex, nil} diff --git a/lds/fetcher.go b/lds/fetcher.go index b58918a08..f1a260abe 100644 --- a/lds/fetcher.go +++ b/lds/fetcher.go @@ -498,7 +498,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) { time.Sleep(hardRequestTimeout) f.timeoutChn <- reqID }() - return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } + return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true, true) } }, } } diff --git a/lds/handler.go b/lds/handler.go index ea8aa40f6..ca27b5b6c 100644 --- a/lds/handler.go +++ b/lds/handler.go @@ -95,6 +95,10 @@ type txPool interface { Status(hashes []common.Hash) []core.TxStatus } +type governance interface { + GetRoundHeight(uint64) uint64 +} + type ProtocolManager struct { lightSync bool txpool txPool @@ -103,6 +107,7 @@ type ProtocolManager struct { chainConfig *params.ChainConfig iConfig *light.IndexerConfig blockchain BlockChain + gov governance chainDb ethdb.Database odr *LdsOdr server *LdsServer @@ -131,12 +136,13 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LdsOdr, txrelay *LdsTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup, gov governance) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, eventMux: mux, blockchain: blockchain, + gov: gov, chainConfig: chainConfig, iConfig: indexerConfig, chainDb: chainDb, @@ -420,10 +426,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { first := true maxNonCanonical := uint64(100) + round := map[uint64]uint64{} + // Gather headers until the fetch or network limits is reached var ( bytes common.StorageSize - headers []*types.Header + headers []*types.HeaderWithGovState unknown bool ) for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit { @@ -445,7 +453,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if origin == nil { break } - headers = append(headers, origin) + headers = append(headers, &types.HeaderWithGovState{Header: origin}) + if round[origin.Round] == 0 { + round[origin.Round] = origin.Number.Uint64() + } bytes += estHeaderRlpSize // Advance to the next header of the query @@ -496,6 +507,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } + if query.WithGov && len(headers) > 0 { + last := headers[len(headers)-1] + currentHeader := pm.blockchain.CurrentHeader() + + // Do not reply if we don't have current gov state + if currentHeader.Number.Uint64() < last.Number.Uint64() { + log.Debug("Current header < last request", + "current", currentHeader.Number.Uint64(), + "last", last.Number.Uint64()) + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) + return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{}) + } + + snapshortHeight := map[uint64]struct{}{} + for r, height := range round { + if r == 0 { + continue + } + h := pm.gov.GetRoundHeight(r) + if h == 0 { + h = height + } + snapshortHeight[h] = struct{}{} + } + + for _, header := range headers { + if _, exist := snapshortHeight[header.Number.Uint64()]; exist { + s, err := pm.blockchain.GetGovStateByHash(header.Hash()) + if err != nil { + log.Warn("Get gov state by hash fail", "number", header.Number.Uint64()) + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) + return p.SendBlockHeaders(req.ReqID, bv, []*types.HeaderWithGovState{}) + } + header.GovState = s + } + } + } + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) return p.SendBlockHeaders(req.ReqID, bv, headers) @@ -1090,7 +1141,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s peer := dp.(*peer) cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse, withGov) } }, } _, ok := <-pc.manager.reqDist.queue(rq) @@ -1114,7 +1165,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip peer := dp.(*peer) cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse, withGov) } }, } _, ok := <-pc.manager.reqDist.queue(rq) diff --git a/lds/peer.go b/lds/peer.go index a5dcf0421..5cd529c4a 100644 --- a/lds/peer.go +++ b/lds/peer.go @@ -185,7 +185,7 @@ func (p *peer) SendAnnounce(request announceData) error { } // SendBlockHeaders sends a batch of block headers to the remote peer. -func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error { +func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.HeaderWithGovState) error { return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers) } @@ -228,16 +228,16 @@ func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error { // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. -func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool, withGov bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse, "withGov", withGov) + return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov}) } // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. -func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool, withGov bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse, "withGov", withGov) + return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov}) } // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes diff --git a/lds/protocol.go b/lds/protocol.go index 2aa03466c..0127a455c 100644 --- a/lds/protocol.go +++ b/lds/protocol.go @@ -172,6 +172,8 @@ type getBlockHeadersData struct { Amount uint64 // Maximum number of headers to retrieve Skip uint64 // Blocks to skip between consecutive headers Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) + + WithGov bool } // hashOrNumber is a combined field for specifying an origin block. diff --git a/lds/server.go b/lds/server.go index 815ea6b09..971bcfdb0 100644 --- a/lds/server.go +++ b/lds/server.go @@ -51,7 +51,7 @@ type LdsServer struct { func NewLdsServer(dex *dex.Dexon, config *dex.Config) (*LdsServer, error) { quitSync := make(chan struct{}) - pm, err := NewProtocolManager(dex.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, dex.EventMux(), dex.Engine(), newPeerSet(), dex.BlockChain(), dex.TxPool(), dex.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup)) + pm, err := NewProtocolManager(dex.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, dex.EventMux(), dex.Engine(), newPeerSet(), dex.BlockChain(), dex.TxPool(), dex.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup), &dummyGov{}) if err != nil { return nil, err } |