diff options
Diffstat (limited to 'les/peer.go')
-rw-r--r-- | les/peer.go | 133 |
1 files changed, 108 insertions, 25 deletions
diff --git a/les/peer.go b/les/peer.go index 3ba2df3fe..104afb6dc 100644 --- a/les/peer.go +++ b/les/peer.go @@ -18,6 +18,8 @@ package les import ( + "crypto/ecdsa" + "encoding/binary" "errors" "fmt" "math/big" @@ -25,9 +27,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" + "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" ) @@ -40,14 +44,23 @@ var ( const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) +const ( + announceTypeNone = iota + announceTypeSimple + announceTypeSigned +) + type peer struct { *p2p.Peer + pubKey *ecdsa.PublicKey rw p2p.MsgReadWriter version int // Protocol version negotiated network uint64 // Network ID being on + announceType, requestAnnounceType uint64 + id string headInfo *announceData @@ -68,9 +81,11 @@ type peer struct { func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() + pubKey, _ := id.Pubkey() return &peer{ Peer: p, + pubKey: pubKey, rw: rw, version: version, network: network, @@ -197,16 +212,31 @@ func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts) } -// SendProofs sends a batch of merkle proofs, corresponding to the ones requested. +// SendProofs sends a batch of legacy LES/1 merkle proofs, corresponding to the ones requested. func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error { - return sendResponse(p.rw, ProofsMsg, reqID, bv, proofs) + return sendResponse(p.rw, ProofsV1Msg, reqID, bv, proofs) } -// SendHeaderProofs sends a batch of header proofs, corresponding to the ones requested. +// SendProofsV2 sends a batch of merkle proofs, corresponding to the ones requested. +func (p *peer) SendProofsV2(reqID, bv uint64, proofs light.NodeList) error { + return sendResponse(p.rw, ProofsV2Msg, reqID, bv, proofs) +} + +// SendHeaderProofs sends a batch of legacy LES/1 header proofs, corresponding to the ones requested. func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error { return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs) } +// SendHelperTrieProofs sends a batch of HelperTrie proofs, corresponding to the ones requested. +func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) error { + return sendResponse(p.rw, HelperTrieProofsMsg, reqID, bv, resp) +} + +// SendTxStatus sends a batch of transaction status records, corresponding to the ones requested. +func (p *peer) SendTxStatus(reqID, bv uint64, status []core.TxStatusData) error { + return sendResponse(p.rw, TxStatusMsg, reqID, bv, status) +} + // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error { @@ -230,7 +260,7 @@ func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error { // RequestCode fetches a batch of arbitrary data from a node's known state // data, corresponding to the specified hashes. -func (p *peer) RequestCode(reqID, cost uint64, reqs []*CodeReq) error { +func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error { p.Log().Debug("Fetching batch of codes", "count", len(reqs)) return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs) } @@ -242,20 +272,58 @@ func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error { } // RequestProofs fetches a batch of merkle proofs from a remote node. -func (p *peer) RequestProofs(reqID, cost uint64, reqs []*ProofReq) error { +func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error { p.Log().Debug("Fetching batch of proofs", "count", len(reqs)) - return sendRequest(p.rw, GetProofsMsg, reqID, cost, reqs) + switch p.version { + case lpv1: + return sendRequest(p.rw, GetProofsV1Msg, reqID, cost, reqs) + case lpv2: + return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs) + default: + panic(nil) + } + +} + +// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node. +func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error { + p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) + switch p.version { + case lpv1: + reqsV1 := make([]ChtReq, len(reqs)) + for i, req := range reqs { + if req.HelperTrieType != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 { + return fmt.Errorf("Request invalid in LES/1 mode") + } + blockNum := binary.BigEndian.Uint64(req.Key) + // convert HelperTrie request to old CHT request + reqsV1[i] = ChtReq{ChtNum: (req.TrieIdx+1)*(light.ChtFrequency/light.ChtV1Frequency) - 1, BlockNum: blockNum, FromLevel: req.FromLevel} + } + return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqsV1) + case lpv2: + return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs) + default: + panic(nil) + } } -// RequestHeaderProofs fetches a batch of header merkle proofs from a remote node. -func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error { - p.Log().Debug("Fetching batch of header proofs", "count", len(reqs)) - return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs) +// RequestTxStatus fetches a batch of transaction status records from a remote node. +func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error { + p.Log().Debug("Requesting transaction status", "count", len(txHashes)) + return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes) } +// SendTxStatus sends a batch of transactions to be added to the remote transaction pool. func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { p.Log().Debug("Fetching batch of transactions", "count", len(txs)) - return p2p.Send(p.rw, SendTxMsg, txs) + switch p.version { + case lpv1: + return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID + case lpv2: + return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs) + default: + panic(nil) + } } type keyValueEntry struct { @@ -289,7 +357,7 @@ func (l keyValueList) decode() keyValueMap { func (m keyValueMap) get(key string, val interface{}) error { enc, ok := m[key] if !ok { - return errResp(ErrHandshakeMissingKey, "%s", key) + return errResp(ErrMissingKey, "%s", key) } if val == nil { return nil @@ -348,6 +416,9 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis list := server.fcCostStats.getCurrentList() send = send.add("flowControl/MRC", list) p.fcCosts = list.decode() + } else { + p.requestAnnounceType = announceTypeSimple // set to default until "very light" client mode is implemented + send = send.add("announceType", p.requestAnnounceType) } recvList, err := p.sendReceiveHandshake(send) if err != nil { @@ -392,6 +463,9 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis /*if recv.get("serveStateSince", nil) == nil { return errResp(ErrUselessPeer, "wanted client, got server") }*/ + if recv.get("announceType", &p.announceType) != nil { + p.announceType = announceTypeSimple + } p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) } else { if recv.get("serveChainSince", nil) != nil { @@ -456,11 +530,15 @@ func newPeerSet() *peerSet { // notify adds a service to be notified about added or removed peers func (ps *peerSet) notify(n peerSetNotify) { ps.lock.Lock() - defer ps.lock.Unlock() - ps.notifyList = append(ps.notifyList, n) + peers := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - go n.registerPeer(p) + peers = append(peers, p) + } + ps.lock.Unlock() + + for _, p := range peers { + n.registerPeer(p) } } @@ -468,8 +546,6 @@ func (ps *peerSet) notify(n peerSetNotify) { // peer is already known. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() - defer ps.lock.Unlock() - if ps.closed { return errClosed } @@ -478,8 +554,12 @@ func (ps *peerSet) Register(p *peer) error { } ps.peers[p.id] = p p.sendQueue = newExecQueue(100) - for _, n := range ps.notifyList { - go n.registerPeer(p) + peers := make([]peerSetNotify, len(ps.notifyList)) + copy(peers, ps.notifyList) + ps.lock.Unlock() + + for _, n := range peers { + n.registerPeer(p) } return nil } @@ -488,19 +568,22 @@ func (ps *peerSet) Register(p *peer) error { // actions to/from that particular entity. It also initiates disconnection at the networking layer. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() - defer ps.lock.Unlock() - if p, ok := ps.peers[id]; !ok { + ps.lock.Unlock() return errNotRegistered } else { - for _, n := range ps.notifyList { - go n.unregisterPeer(p) + delete(ps.peers, id) + peers := make([]peerSetNotify, len(ps.notifyList)) + copy(peers, ps.notifyList) + ps.lock.Unlock() + + for _, n := range peers { + n.unregisterPeer(p) } p.sendQueue.quit() p.Peer.Disconnect(p2p.DiscUselessPeer) + return nil } - delete(ps.peers, id) - return nil } // AllPeerIDs returns a list of all registered peer IDs |