aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-10-24 21:19:09 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-10-24 21:19:09 +0800
commitca376ead88a5a26626a90abdb62f4de7f6313822 (patch)
tree71d11e3b6cd40d2bf29033b7e23d30d04e086558 /les/handler.go
parent6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff)
downloadgo-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.zip
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions: * new and more efficient Merkle proofs reply format (when replying to a multiple Merkle proofs request, we just send a single set of trie nodes containing all necessary nodes) * BBT (BloomBitsTrie) works similarly to the existing CHT and contains the bloombits search data to speed up log searches * GetTxStatusMsg returns the inclusion position or the pending/queued/unknown state of a transaction referenced by hash * an optional signature of new block data (number/hash/td) can be included in AnnounceMsg to provide an option for "very light clients" (mobile/embedded devices) to skip expensive Ethash check and accept multiple signatures of somewhat trusted servers (still a lot better than trusting a single server completely and retrieving everything through RPC). The new client mode is not implemented in this PR, just the protocol extension.
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go354
1 files changed, 326 insertions, 28 deletions
diff --git a/les/handler.go b/les/handler.go
index df7eb6af5..de07b7244 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -18,6 +18,7 @@
package les
import (
+ "bytes"
"encoding/binary"
"errors"
"fmt"
@@ -35,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -50,13 +52,14 @@ const (
ethVersion = 63 // equivalent eth version for the downloader
- MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
- MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
- MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
- MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
- MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
- MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
- MaxTxSend = 64 // Amount of transactions to be send per request
+ MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
+ MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
+ MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
+ MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
+ MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
+ MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
+ MaxTxSend = 64 // Amount of transactions to be send per request
+ MaxTxStatus = 256 // Amount of transactions to queried per request
disableClientRemovePeer = false
)
@@ -86,8 +89,7 @@ type BlockChain interface {
}
type txPool interface {
- // AddRemotes should add the given transactions to the pool.
- AddRemotes([]*types.Transaction) error
+ AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []core.TxStatusData
}
type ProtocolManager struct {
@@ -125,7 +127,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
@@ -147,15 +149,16 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.retriever = odr.retriever
manager.reqDist = odr.retriever.dist
}
+
// Initiate a sub-protocol for every implemented version we can handle
- manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
- for i, version := range ProtocolVersions {
+ manager.SubProtocols = make([]p2p.Protocol, 0, len(protocolVersions))
+ for _, version := range protocolVersions {
// Compatible, initialize the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: "les",
Version: version,
- Length: ProtocolLengths[i],
+ Length: ProtocolLengths[version],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry
peer := manager.newPeer(int(version), networkId, p, rw)
@@ -315,7 +318,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
}
}
-var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
+var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
@@ -362,11 +365,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Block header query, collect the requested headers and reply
case AnnounceMsg:
p.Log().Trace("Received announce message")
+ if p.requestAnnounceType == announceTypeNone {
+ return errResp(ErrUnexpectedResponse, "")
+ }
var req announceData
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
+
+ if p.requestAnnounceType == announceTypeSigned {
+ if err := req.checkSignature(p.pubKey); err != nil {
+ p.Log().Trace("Invalid announcement signature", "err", err)
+ return err
+ }
+ p.Log().Trace("Valid announcement signature")
+ }
+
p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
if pm.fetcher != nil {
pm.fetcher.announce(p, &req)
@@ -655,7 +670,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
Obj: resp.Receipts,
}
- case GetProofsMsg:
+ case GetProofsV1Msg:
p.Log().Trace("Received proofs request")
// Decode the retrieval message
var req struct {
@@ -690,9 +705,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
if tr != nil {
- proof := tr.Prove(req.Key)
+ var proof light.NodeList
+ tr.Prove(req.Key, 0, &proof)
proofs = append(proofs, proof)
- bytes += len(proof)
+ bytes += proof.DataSize()
}
}
}
@@ -701,7 +717,67 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
return p.SendProofs(req.ReqID, bv, proofs)
- case ProofsMsg:
+ case GetProofsV2Msg:
+ p.Log().Trace("Received les/2 proofs request")
+ // Decode the retrieval message
+ var req struct {
+ ReqID uint64
+ Reqs []ProofReq
+ }
+ if err := msg.Decode(&req); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ lastBHash common.Hash
+ lastAccKey []byte
+ tr, str *trie.Trie
+ )
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxProofsFetch) {
+ return errResp(ErrRequestRejected, "")
+ }
+
+ nodes := light.NewNodeSet()
+
+ for _, req := range req.Reqs {
+ if nodes.DataSize() >= softResponseLimit {
+ break
+ }
+ if tr == nil || req.BHash != lastBHash {
+ if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
+ tr, _ = trie.New(header.Root, pm.chainDb)
+ } else {
+ tr = nil
+ }
+ lastBHash = req.BHash
+ str = nil
+ }
+ if tr != nil {
+ if len(req.AccKey) > 0 {
+ if str == nil || !bytes.Equal(req.AccKey, lastAccKey) {
+ sdata := tr.Get(req.AccKey)
+ str = nil
+ var acc state.Account
+ if err := rlp.DecodeBytes(sdata, &acc); err == nil {
+ str, _ = trie.New(acc.Root, pm.chainDb)
+ }
+ lastAccKey = common.CopyBytes(req.AccKey)
+ }
+ if str != nil {
+ str.Prove(req.Key, req.FromLevel, nodes)
+ }
+ } else {
+ tr.Prove(req.Key, req.FromLevel, nodes)
+ }
+ }
+ }
+ proofs := nodes.NodeList()
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
+ return p.SendProofsV2(req.ReqID, bv, proofs)
+
+ case ProofsV1Msg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
@@ -710,14 +786,35 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// A batch of merkle proofs arrived to one of our previous requests
var resp struct {
ReqID, BV uint64
- Data [][]rlp.RawValue
+ Data []light.NodeList
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.GotReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
- MsgType: MsgProofs,
+ MsgType: MsgProofsV1,
+ ReqID: resp.ReqID,
+ Obj: resp.Data,
+ }
+
+ case ProofsV2Msg:
+ if pm.odr == nil {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+
+ p.Log().Trace("Received les/2 proofs response")
+ // A batch of merkle proofs arrived to one of our previous requests
+ var resp struct {
+ ReqID, BV uint64
+ Data light.NodeList
+ }
+ if err := msg.Decode(&resp); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ p.fcServer.GotReply(resp.ReqID, resp.BV)
+ deliverMsg = &Msg{
+ MsgType: MsgProofsV2,
ReqID: resp.ReqID,
Obj: resp.Data,
}
@@ -738,22 +835,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
proofs []ChtResp
)
reqCnt := len(req.Reqs)
- if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
+ if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
return errResp(ErrRequestRejected, "")
}
+ trieDb := ethdb.NewTable(pm.chainDb, light.ChtTablePrefix)
for _, req := range req.Reqs {
if bytes >= softResponseLimit {
break
}
if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
- if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
- if tr, _ := trie.New(root, pm.chainDb); tr != nil {
+ sectionHead := core.GetCanonicalHash(pm.chainDb, (req.ChtNum+1)*light.ChtV1Frequency-1)
+ if root := light.GetChtRoot(pm.chainDb, req.ChtNum, sectionHead); root != (common.Hash{}) {
+ if tr, _ := trie.New(root, trieDb); tr != nil {
var encNumber [8]byte
binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
- proof := tr.Prove(encNumber[:])
+ var proof light.NodeList
+ tr.Prove(encNumber[:], 0, &proof)
proofs = append(proofs, ChtResp{Header: header, Proof: proof})
- bytes += len(proof) + estHeaderRlpSize
+ bytes += proof.DataSize() + estHeaderRlpSize
}
}
}
@@ -762,6 +862,73 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
return p.SendHeaderProofs(req.ReqID, bv, proofs)
+ case GetHelperTrieProofsMsg:
+ p.Log().Trace("Received helper trie proof request")
+ // Decode the retrieval message
+ var req struct {
+ ReqID uint64
+ Reqs []HelperTrieReq
+ }
+ if err := msg.Decode(&req); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ auxBytes int
+ auxData [][]byte
+ )
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
+ return errResp(ErrRequestRejected, "")
+ }
+
+ var (
+ lastIdx uint64
+ lastType uint
+ root common.Hash
+ tr *trie.Trie
+ )
+
+ nodes := light.NewNodeSet()
+
+ for _, req := range req.Reqs {
+ if nodes.DataSize()+auxBytes >= softResponseLimit {
+ break
+ }
+ if tr == nil || req.HelperTrieType != lastType || req.TrieIdx != lastIdx {
+ var prefix string
+ root, prefix = pm.getHelperTrie(req.HelperTrieType, req.TrieIdx)
+ if root != (common.Hash{}) {
+ if t, err := trie.New(root, ethdb.NewTable(pm.chainDb, prefix)); err == nil {
+ tr = t
+ }
+ }
+ lastType = req.HelperTrieType
+ lastIdx = req.TrieIdx
+ }
+ if req.AuxReq == auxRoot {
+ var data []byte
+ if root != (common.Hash{}) {
+ data = root[:]
+ }
+ auxData = append(auxData, data)
+ auxBytes += len(data)
+ } else {
+ if tr != nil {
+ tr.Prove(req.Key, req.FromLevel, nodes)
+ }
+ if req.AuxReq != 0 {
+ data := pm.getHelperTrieAuxData(req)
+ auxData = append(auxData, data)
+ auxBytes += len(data)
+ }
+ }
+ }
+ proofs := nodes.NodeList()
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
+ return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: proofs, AuxData: auxData})
+
case HeaderProofsMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
@@ -782,9 +949,30 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
Obj: resp.Data,
}
+ case HelperTrieProofsMsg:
+ if pm.odr == nil {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+
+ p.Log().Trace("Received helper trie proof response")
+ var resp struct {
+ ReqID, BV uint64
+ Data HelperTrieResps
+ }
+ if err := msg.Decode(&resp); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ p.fcServer.GotReply(resp.ReqID, resp.BV)
+ deliverMsg = &Msg{
+ MsgType: MsgHelperTrieProofs,
+ ReqID: resp.ReqID,
+ Obj: resp.Data,
+ }
+
case SendTxMsg:
if pm.txpool == nil {
- return errResp(ErrUnexpectedResponse, "")
+ return errResp(ErrRequestRejected, "")
}
// Transactions arrived, parse all of them and deliver to the pool
var txs []*types.Transaction
@@ -796,13 +984,82 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "")
}
- if err := pm.txpool.AddRemotes(txs); err != nil {
- return errResp(ErrUnexpectedResponse, "msg: %v", err)
+ txHashes := make([]common.Hash, len(txs))
+ for i, tx := range txs {
+ txHashes[i] = tx.Hash()
}
+ pm.addOrGetTxStatus(txs, txHashes)
_, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
+ case SendTxV2Msg:
+ if pm.txpool == nil {
+ return errResp(ErrRequestRejected, "")
+ }
+ // Transactions arrived, parse all of them and deliver to the pool
+ var req struct {
+ ReqID uint64
+ Txs []*types.Transaction
+ }
+ if err := msg.Decode(&req); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ reqCnt := len(req.Txs)
+ if reject(uint64(reqCnt), MaxTxSend) {
+ return errResp(ErrRequestRejected, "")
+ }
+
+ txHashes := make([]common.Hash, len(req.Txs))
+ for i, tx := range req.Txs {
+ txHashes[i] = tx.Hash()
+ }
+
+ res := pm.addOrGetTxStatus(req.Txs, txHashes)
+
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
+ return p.SendTxStatus(req.ReqID, bv, res)
+
+ case GetTxStatusMsg:
+ if pm.txpool == nil {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+ // Transactions arrived, parse all of them and deliver to the pool
+ var req struct {
+ ReqID uint64
+ TxHashes []common.Hash
+ }
+ if err := msg.Decode(&req); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ reqCnt := len(req.TxHashes)
+ if reject(uint64(reqCnt), MaxTxStatus) {
+ return errResp(ErrRequestRejected, "")
+ }
+
+ res := pm.addOrGetTxStatus(nil, req.TxHashes)
+
+ bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
+ pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
+ return p.SendTxStatus(req.ReqID, bv, res)
+
+ case TxStatusMsg:
+ if pm.odr == nil {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+
+ p.Log().Trace("Received tx status response")
+ var resp struct {
+ ReqID, BV uint64
+ Status []core.TxStatusData
+ }
+ if err := msg.Decode(&resp); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ p.fcServer.GotReply(resp.ReqID, resp.BV)
+
default:
p.Log().Trace("Received unknown message", "code", msg.Code)
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@@ -820,6 +1077,47 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return nil
}
+// getHelperTrie returns the post-processed trie root for the given trie ID and section index
+func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
+ switch id {
+ case htCanonical:
+ sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.ChtFrequency-1)
+ return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
+ case htBloomBits:
+ sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1)
+ return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
+ }
+ return common.Hash{}, ""
+}
+
+// getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request
+func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
+ if req.HelperTrieType == htCanonical && req.AuxReq == auxHeader {
+ if len(req.Key) != 8 {
+ return nil
+ }
+ blockNum := binary.BigEndian.Uint64(req.Key)
+ hash := core.GetCanonicalHash(pm.chainDb, blockNum)
+ return core.GetHeaderRLP(pm.chainDb, hash, blockNum)
+ }
+ return nil
+}
+
+func (pm *ProtocolManager) addOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []core.TxStatusData {
+ status := pm.txpool.AddOrGetTxStatus(txs, txHashes)
+ for i, _ := range status {
+ blockHash, blockNum, txIndex := core.GetTxLookupEntry(pm.chainDb, txHashes[i])
+ if blockHash != (common.Hash{}) {
+ enc, err := rlp.EncodeToBytes(core.TxLookupEntry{BlockHash: blockHash, BlockIndex: blockNum, Index: txIndex})
+ if err != nil {
+ panic(err)
+ }
+ status[i] = core.TxStatusData{Status: core.TxStatusIncluded, Data: enc}
+ }
+ }
+ return status
+}
+
// NodeInfo retrieves some protocol metadata about the running host node.
func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
return &eth.EthNodeInfo{