aboutsummaryrefslogtreecommitdiffstats
path: root/les/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/peer.go')
-rw-r--r--les/peer.go133
1 files changed, 108 insertions, 25 deletions
diff --git a/les/peer.go b/les/peer.go
index 3ba2df3fe..104afb6dc 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -18,6 +18,8 @@
package les
import (
+ "crypto/ecdsa"
+ "encoding/binary"
"errors"
"fmt"
"math/big"
@@ -25,9 +27,11 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/flowcontrol"
+ "github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -40,14 +44,23 @@ var (
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
+const (
+ announceTypeNone = iota
+ announceTypeSimple
+ announceTypeSigned
+)
+
type peer struct {
*p2p.Peer
+ pubKey *ecdsa.PublicKey
rw p2p.MsgReadWriter
version int // Protocol version negotiated
network uint64 // Network ID being on
+ announceType, requestAnnounceType uint64
+
id string
headInfo *announceData
@@ -68,9 +81,11 @@ type peer struct {
func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
id := p.ID()
+ pubKey, _ := id.Pubkey()
return &peer{
Peer: p,
+ pubKey: pubKey,
rw: rw,
version: version,
network: network,
@@ -197,16 +212,31 @@ func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error
return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts)
}
-// SendProofs sends a batch of merkle proofs, corresponding to the ones requested.
+// SendProofs sends a batch of legacy LES/1 merkle proofs, corresponding to the ones requested.
func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error {
- return sendResponse(p.rw, ProofsMsg, reqID, bv, proofs)
+ return sendResponse(p.rw, ProofsV1Msg, reqID, bv, proofs)
}
-// SendHeaderProofs sends a batch of header proofs, corresponding to the ones requested.
+// SendProofsV2 sends a batch of merkle proofs, corresponding to the ones requested.
+func (p *peer) SendProofsV2(reqID, bv uint64, proofs light.NodeList) error {
+ return sendResponse(p.rw, ProofsV2Msg, reqID, bv, proofs)
+}
+
+// SendHeaderProofs sends a batch of legacy LES/1 header proofs, corresponding to the ones requested.
func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error {
return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs)
}
+// SendHelperTrieProofs sends a batch of HelperTrie proofs, corresponding to the ones requested.
+func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) error {
+ return sendResponse(p.rw, HelperTrieProofsMsg, reqID, bv, resp)
+}
+
+// SendTxStatus sends a batch of transaction status records, corresponding to the ones requested.
+func (p *peer) SendTxStatus(reqID, bv uint64, status []core.TxStatusData) error {
+ return sendResponse(p.rw, TxStatusMsg, reqID, bv, status)
+}
+
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
@@ -230,7 +260,7 @@ func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error {
// RequestCode fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
-func (p *peer) RequestCode(reqID, cost uint64, reqs []*CodeReq) error {
+func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error {
p.Log().Debug("Fetching batch of codes", "count", len(reqs))
return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs)
}
@@ -242,20 +272,58 @@ func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error {
}
// RequestProofs fetches a batch of merkle proofs from a remote node.
-func (p *peer) RequestProofs(reqID, cost uint64, reqs []*ProofReq) error {
+func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error {
p.Log().Debug("Fetching batch of proofs", "count", len(reqs))
- return sendRequest(p.rw, GetProofsMsg, reqID, cost, reqs)
+ switch p.version {
+ case lpv1:
+ return sendRequest(p.rw, GetProofsV1Msg, reqID, cost, reqs)
+ case lpv2:
+ return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs)
+ default:
+ panic(nil)
+ }
+
+}
+
+// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
+func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error {
+ p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
+ switch p.version {
+ case lpv1:
+ reqsV1 := make([]ChtReq, len(reqs))
+ for i, req := range reqs {
+ if req.HelperTrieType != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 {
+ return fmt.Errorf("Request invalid in LES/1 mode")
+ }
+ blockNum := binary.BigEndian.Uint64(req.Key)
+ // convert HelperTrie request to old CHT request
+ reqsV1[i] = ChtReq{ChtNum: (req.TrieIdx+1)*(light.ChtFrequency/light.ChtV1Frequency) - 1, BlockNum: blockNum, FromLevel: req.FromLevel}
+ }
+ return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqsV1)
+ case lpv2:
+ return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
+ default:
+ panic(nil)
+ }
}
-// RequestHeaderProofs fetches a batch of header merkle proofs from a remote node.
-func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
- p.Log().Debug("Fetching batch of header proofs", "count", len(reqs))
- return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
+// RequestTxStatus fetches a batch of transaction status records from a remote node.
+func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error {
+ p.Log().Debug("Requesting transaction status", "count", len(txHashes))
+ return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
}
+// SendTxStatus sends a batch of transactions to be added to the remote transaction pool.
func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
p.Log().Debug("Fetching batch of transactions", "count", len(txs))
- return p2p.Send(p.rw, SendTxMsg, txs)
+ switch p.version {
+ case lpv1:
+ return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID
+ case lpv2:
+ return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs)
+ default:
+ panic(nil)
+ }
}
type keyValueEntry struct {
@@ -289,7 +357,7 @@ func (l keyValueList) decode() keyValueMap {
func (m keyValueMap) get(key string, val interface{}) error {
enc, ok := m[key]
if !ok {
- return errResp(ErrHandshakeMissingKey, "%s", key)
+ return errResp(ErrMissingKey, "%s", key)
}
if val == nil {
return nil
@@ -348,6 +416,9 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
list := server.fcCostStats.getCurrentList()
send = send.add("flowControl/MRC", list)
p.fcCosts = list.decode()
+ } else {
+ p.requestAnnounceType = announceTypeSimple // set to default until "very light" client mode is implemented
+ send = send.add("announceType", p.requestAnnounceType)
}
recvList, err := p.sendReceiveHandshake(send)
if err != nil {
@@ -392,6 +463,9 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
/*if recv.get("serveStateSince", nil) == nil {
return errResp(ErrUselessPeer, "wanted client, got server")
}*/
+ if recv.get("announceType", &p.announceType) != nil {
+ p.announceType = announceTypeSimple
+ }
p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams)
} else {
if recv.get("serveChainSince", nil) != nil {
@@ -456,11 +530,15 @@ func newPeerSet() *peerSet {
// notify adds a service to be notified about added or removed peers
func (ps *peerSet) notify(n peerSetNotify) {
ps.lock.Lock()
- defer ps.lock.Unlock()
-
ps.notifyList = append(ps.notifyList, n)
+ peers := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- go n.registerPeer(p)
+ peers = append(peers, p)
+ }
+ ps.lock.Unlock()
+
+ for _, p := range peers {
+ n.registerPeer(p)
}
}
@@ -468,8 +546,6 @@ func (ps *peerSet) notify(n peerSetNotify) {
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
- defer ps.lock.Unlock()
-
if ps.closed {
return errClosed
}
@@ -478,8 +554,12 @@ func (ps *peerSet) Register(p *peer) error {
}
ps.peers[p.id] = p
p.sendQueue = newExecQueue(100)
- for _, n := range ps.notifyList {
- go n.registerPeer(p)
+ peers := make([]peerSetNotify, len(ps.notifyList))
+ copy(peers, ps.notifyList)
+ ps.lock.Unlock()
+
+ for _, n := range peers {
+ n.registerPeer(p)
}
return nil
}
@@ -488,19 +568,22 @@ func (ps *peerSet) Register(p *peer) error {
// actions to/from that particular entity. It also initiates disconnection at the networking layer.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
- defer ps.lock.Unlock()
-
if p, ok := ps.peers[id]; !ok {
+ ps.lock.Unlock()
return errNotRegistered
} else {
- for _, n := range ps.notifyList {
- go n.unregisterPeer(p)
+ delete(ps.peers, id)
+ peers := make([]peerSetNotify, len(ps.notifyList))
+ copy(peers, ps.notifyList)
+ ps.lock.Unlock()
+
+ for _, n := range peers {
+ n.unregisterPeer(p)
}
p.sendQueue.quit()
p.Peer.Disconnect(p2p.DiscUselessPeer)
+ return nil
}
- delete(ps.peers, id)
- return nil
}
// AllPeerIDs returns a list of all registered peer IDs