diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-10-24 21:19:09 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-10-24 21:19:09 +0800 |
commit | ca376ead88a5a26626a90abdb62f4de7f6313822 (patch) | |
tree | 71d11e3b6cd40d2bf29033b7e23d30d04e086558 /les/handler.go | |
parent | 6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff) | |
download | go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2 go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.zip |
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions:
* new and more efficient Merkle proofs reply format (when replying to
a multiple Merkle proofs request, we just send a single set of trie
nodes containing all necessary nodes)
* BBT (BloomBitsTrie) works similarly to the existing CHT and contains
the bloombits search data to speed up log searches
* GetTxStatusMsg returns the inclusion position or the
pending/queued/unknown state of a transaction referenced by hash
* an optional signature of new block data (number/hash/td) can be
included in AnnounceMsg to provide an option for "very light
clients" (mobile/embedded devices) to skip expensive Ethash check
and accept multiple signatures of somewhat trusted servers (still a
lot better than trusting a single server completely and retrieving
everything through RPC). The new client mode is not implemented in
this PR, just the protocol extension.
Diffstat (limited to 'les/handler.go')
-rw-r--r-- | les/handler.go | 354 |
1 files changed, 326 insertions, 28 deletions
diff --git a/les/handler.go b/les/handler.go index df7eb6af5..de07b7244 100644 --- a/les/handler.go +++ b/les/handler.go @@ -18,6 +18,7 @@ package les import ( + "bytes" "encoding/binary" "errors" "fmt" @@ -35,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -50,13 +52,14 @@ const ( ethVersion = 63 // equivalent eth version for the downloader - MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request - MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request - MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request - MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request - MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request - MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request - MaxTxSend = 64 // Amount of transactions to be send per request + MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request + MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request + MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request + MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request + MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request + MaxTxSend = 64 // Amount of transactions to be send per request + MaxTxStatus = 256 // Amount of transactions to queried per request disableClientRemovePeer = false ) @@ -86,8 +89,7 @@ type BlockChain interface { } type txPool interface { - // AddRemotes should add the given transactions to the pool. - AddRemotes([]*types.Transaction) error + AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []core.TxStatusData } type ProtocolManager struct { @@ -125,7 +127,7 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, @@ -147,15 +149,16 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.retriever = odr.retriever manager.reqDist = odr.retriever.dist } + // Initiate a sub-protocol for every implemented version we can handle - manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) - for i, version := range ProtocolVersions { + manager.SubProtocols = make([]p2p.Protocol, 0, len(protocolVersions)) + for _, version := range protocolVersions { // Compatible, initialize the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: "les", Version: version, - Length: ProtocolLengths[i], + Length: ProtocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { var entry *poolEntry peer := manager.newPeer(int(version), networkId, p, rw) @@ -315,7 +318,7 @@ func (pm *ProtocolManager) handle(p *peer) error { } } -var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg} +var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg} // handleMsg is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. @@ -362,11 +365,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Block header query, collect the requested headers and reply case AnnounceMsg: p.Log().Trace("Received announce message") + if p.requestAnnounceType == announceTypeNone { + return errResp(ErrUnexpectedResponse, "") + } var req announceData if err := msg.Decode(&req); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } + + if p.requestAnnounceType == announceTypeSigned { + if err := req.checkSignature(p.pubKey); err != nil { + p.Log().Trace("Invalid announcement signature", "err", err) + return err + } + p.Log().Trace("Valid announcement signature") + } + p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth) if pm.fetcher != nil { pm.fetcher.announce(p, &req) @@ -655,7 +670,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { Obj: resp.Receipts, } - case GetProofsMsg: + case GetProofsV1Msg: p.Log().Trace("Received proofs request") // Decode the retrieval message var req struct { @@ -690,9 +705,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } if tr != nil { - proof := tr.Prove(req.Key) + var proof light.NodeList + tr.Prove(req.Key, 0, &proof) proofs = append(proofs, proof) - bytes += len(proof) + bytes += proof.DataSize() } } } @@ -701,7 +717,67 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) return p.SendProofs(req.ReqID, bv, proofs) - case ProofsMsg: + case GetProofsV2Msg: + p.Log().Trace("Received les/2 proofs request") + // Decode the retrieval message + var req struct { + ReqID uint64 + Reqs []ProofReq + } + if err := msg.Decode(&req); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Gather state data until the fetch or network limits is reached + var ( + lastBHash common.Hash + lastAccKey []byte + tr, str *trie.Trie + ) + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxProofsFetch) { + return errResp(ErrRequestRejected, "") + } + + nodes := light.NewNodeSet() + + for _, req := range req.Reqs { + if nodes.DataSize() >= softResponseLimit { + break + } + if tr == nil || req.BHash != lastBHash { + if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil { + tr, _ = trie.New(header.Root, pm.chainDb) + } else { + tr = nil + } + lastBHash = req.BHash + str = nil + } + if tr != nil { + if len(req.AccKey) > 0 { + if str == nil || !bytes.Equal(req.AccKey, lastAccKey) { + sdata := tr.Get(req.AccKey) + str = nil + var acc state.Account + if err := rlp.DecodeBytes(sdata, &acc); err == nil { + str, _ = trie.New(acc.Root, pm.chainDb) + } + lastAccKey = common.CopyBytes(req.AccKey) + } + if str != nil { + str.Prove(req.Key, req.FromLevel, nodes) + } + } else { + tr.Prove(req.Key, req.FromLevel, nodes) + } + } + } + proofs := nodes.NodeList() + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) + return p.SendProofsV2(req.ReqID, bv, proofs) + + case ProofsV1Msg: if pm.odr == nil { return errResp(ErrUnexpectedResponse, "") } @@ -710,14 +786,35 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // A batch of merkle proofs arrived to one of our previous requests var resp struct { ReqID, BV uint64 - Data [][]rlp.RawValue + Data []light.NodeList } if err := msg.Decode(&resp); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.GotReply(resp.ReqID, resp.BV) deliverMsg = &Msg{ - MsgType: MsgProofs, + MsgType: MsgProofsV1, + ReqID: resp.ReqID, + Obj: resp.Data, + } + + case ProofsV2Msg: + if pm.odr == nil { + return errResp(ErrUnexpectedResponse, "") + } + + p.Log().Trace("Received les/2 proofs response") + // A batch of merkle proofs arrived to one of our previous requests + var resp struct { + ReqID, BV uint64 + Data light.NodeList + } + if err := msg.Decode(&resp); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + p.fcServer.GotReply(resp.ReqID, resp.BV) + deliverMsg = &Msg{ + MsgType: MsgProofsV2, ReqID: resp.ReqID, Obj: resp.Data, } @@ -738,22 +835,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { proofs []ChtResp ) reqCnt := len(req.Reqs) - if reject(uint64(reqCnt), MaxHeaderProofsFetch) { + if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) { return errResp(ErrRequestRejected, "") } + trieDb := ethdb.NewTable(pm.chainDb, light.ChtTablePrefix) for _, req := range req.Reqs { if bytes >= softResponseLimit { break } if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil { - if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) { - if tr, _ := trie.New(root, pm.chainDb); tr != nil { + sectionHead := core.GetCanonicalHash(pm.chainDb, (req.ChtNum+1)*light.ChtV1Frequency-1) + if root := light.GetChtRoot(pm.chainDb, req.ChtNum, sectionHead); root != (common.Hash{}) { + if tr, _ := trie.New(root, trieDb); tr != nil { var encNumber [8]byte binary.BigEndian.PutUint64(encNumber[:], req.BlockNum) - proof := tr.Prove(encNumber[:]) + var proof light.NodeList + tr.Prove(encNumber[:], 0, &proof) proofs = append(proofs, ChtResp{Header: header, Proof: proof}) - bytes += len(proof) + estHeaderRlpSize + bytes += proof.DataSize() + estHeaderRlpSize } } } @@ -762,6 +862,73 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) return p.SendHeaderProofs(req.ReqID, bv, proofs) + case GetHelperTrieProofsMsg: + p.Log().Trace("Received helper trie proof request") + // Decode the retrieval message + var req struct { + ReqID uint64 + Reqs []HelperTrieReq + } + if err := msg.Decode(&req); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Gather state data until the fetch or network limits is reached + var ( + auxBytes int + auxData [][]byte + ) + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) { + return errResp(ErrRequestRejected, "") + } + + var ( + lastIdx uint64 + lastType uint + root common.Hash + tr *trie.Trie + ) + + nodes := light.NewNodeSet() + + for _, req := range req.Reqs { + if nodes.DataSize()+auxBytes >= softResponseLimit { + break + } + if tr == nil || req.HelperTrieType != lastType || req.TrieIdx != lastIdx { + var prefix string + root, prefix = pm.getHelperTrie(req.HelperTrieType, req.TrieIdx) + if root != (common.Hash{}) { + if t, err := trie.New(root, ethdb.NewTable(pm.chainDb, prefix)); err == nil { + tr = t + } + } + lastType = req.HelperTrieType + lastIdx = req.TrieIdx + } + if req.AuxReq == auxRoot { + var data []byte + if root != (common.Hash{}) { + data = root[:] + } + auxData = append(auxData, data) + auxBytes += len(data) + } else { + if tr != nil { + tr.Prove(req.Key, req.FromLevel, nodes) + } + if req.AuxReq != 0 { + data := pm.getHelperTrieAuxData(req) + auxData = append(auxData, data) + auxBytes += len(data) + } + } + } + proofs := nodes.NodeList() + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) + return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: proofs, AuxData: auxData}) + case HeaderProofsMsg: if pm.odr == nil { return errResp(ErrUnexpectedResponse, "") @@ -782,9 +949,30 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { Obj: resp.Data, } + case HelperTrieProofsMsg: + if pm.odr == nil { + return errResp(ErrUnexpectedResponse, "") + } + + p.Log().Trace("Received helper trie proof response") + var resp struct { + ReqID, BV uint64 + Data HelperTrieResps + } + if err := msg.Decode(&resp); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + + p.fcServer.GotReply(resp.ReqID, resp.BV) + deliverMsg = &Msg{ + MsgType: MsgHelperTrieProofs, + ReqID: resp.ReqID, + Obj: resp.Data, + } + case SendTxMsg: if pm.txpool == nil { - return errResp(ErrUnexpectedResponse, "") + return errResp(ErrRequestRejected, "") } // Transactions arrived, parse all of them and deliver to the pool var txs []*types.Transaction @@ -796,13 +984,82 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrRequestRejected, "") } - if err := pm.txpool.AddRemotes(txs); err != nil { - return errResp(ErrUnexpectedResponse, "msg: %v", err) + txHashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + txHashes[i] = tx.Hash() } + pm.addOrGetTxStatus(txs, txHashes) _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) + case SendTxV2Msg: + if pm.txpool == nil { + return errResp(ErrRequestRejected, "") + } + // Transactions arrived, parse all of them and deliver to the pool + var req struct { + ReqID uint64 + Txs []*types.Transaction + } + if err := msg.Decode(&req); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + reqCnt := len(req.Txs) + if reject(uint64(reqCnt), MaxTxSend) { + return errResp(ErrRequestRejected, "") + } + + txHashes := make([]common.Hash, len(req.Txs)) + for i, tx := range req.Txs { + txHashes[i] = tx.Hash() + } + + res := pm.addOrGetTxStatus(req.Txs, txHashes) + + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) + return p.SendTxStatus(req.ReqID, bv, res) + + case GetTxStatusMsg: + if pm.txpool == nil { + return errResp(ErrUnexpectedResponse, "") + } + // Transactions arrived, parse all of them and deliver to the pool + var req struct { + ReqID uint64 + TxHashes []common.Hash + } + if err := msg.Decode(&req); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + reqCnt := len(req.TxHashes) + if reject(uint64(reqCnt), MaxTxStatus) { + return errResp(ErrRequestRejected, "") + } + + res := pm.addOrGetTxStatus(nil, req.TxHashes) + + bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) + pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) + return p.SendTxStatus(req.ReqID, bv, res) + + case TxStatusMsg: + if pm.odr == nil { + return errResp(ErrUnexpectedResponse, "") + } + + p.Log().Trace("Received tx status response") + var resp struct { + ReqID, BV uint64 + Status []core.TxStatusData + } + if err := msg.Decode(&resp); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + + p.fcServer.GotReply(resp.ReqID, resp.BV) + default: p.Log().Trace("Received unknown message", "code", msg.Code) return errResp(ErrInvalidMsgCode, "%v", msg.Code) @@ -820,6 +1077,47 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return nil } +// getHelperTrie returns the post-processed trie root for the given trie ID and section index +func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) { + switch id { + case htCanonical: + sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.ChtFrequency-1) + return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix + case htBloomBits: + sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1) + return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix + } + return common.Hash{}, "" +} + +// getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request +func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte { + if req.HelperTrieType == htCanonical && req.AuxReq == auxHeader { + if len(req.Key) != 8 { + return nil + } + blockNum := binary.BigEndian.Uint64(req.Key) + hash := core.GetCanonicalHash(pm.chainDb, blockNum) + return core.GetHeaderRLP(pm.chainDb, hash, blockNum) + } + return nil +} + +func (pm *ProtocolManager) addOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []core.TxStatusData { + status := pm.txpool.AddOrGetTxStatus(txs, txHashes) + for i, _ := range status { + blockHash, blockNum, txIndex := core.GetTxLookupEntry(pm.chainDb, txHashes[i]) + if blockHash != (common.Hash{}) { + enc, err := rlp.EncodeToBytes(core.TxLookupEntry{BlockHash: blockHash, BlockIndex: blockNum, Index: txIndex}) + if err != nil { + panic(err) + } + status[i] = core.TxStatusData{Status: core.TxStatusIncluded, Data: enc} + } + } + return status +} + // NodeInfo retrieves some protocol metadata about the running host node. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo { return ð.EthNodeInfo{ |