diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2019-02-26 19:32:48 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2019-02-26 19:32:48 +0800 |
commit | c2003ed63b975c6318e4dd7e65b69c60777b0ddf (patch) | |
tree | 0045c39070c4d2b9fcaa93cbcbdd8d5ea4f4a29f /les/peer.go | |
parent | c2b33a117f686069e36d58d937e1c75d72d7b94c (diff) | |
download | go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.gz go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.bz2 go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.lz go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.xz go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.zst go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.zip |
les, les/flowcontrol: improved request serving and flow control (#18230)
This change
- implements concurrent LES request serving even for a single peer.
- replaces the request cost estimation method with a cost table based on
benchmarks which gives much more consistent results. Until now the
allowed number of light peers was just a guess which probably contributed
a lot to the fluctuating quality of available service. Everything related
to request cost is implemented in a single object, the 'cost tracker'. It
uses a fixed cost table with a global 'correction factor'. Benchmark code
is included and can be run at any time to adapt costs to low-level
implementation changes.
- reimplements flowcontrol.ClientManager in a cleaner and more efficient
way, with added capabilities: There is now control over bandwidth, which
allows using the flow control parameters for client prioritization.
Target utilization over 100 percent is now supported to model concurrent
request processing. Total serving bandwidth is reduced during block
processing to prevent database contention.
- implements an RPC API for the LES servers allowing server operators to
assign priority bandwidth to certain clients and change prioritized
status even while the client is connected. The new API is meant for
cases where server operators charge for LES using an off-protocol mechanism.
- adds a unit test for the new client manager.
- adds an end-to-end test using the network simulator that tests bandwidth
control functions through the new API.
Diffstat (limited to 'les/peer.go')
-rw-r--r-- | les/peer.go | 254 |
1 files changed, 195 insertions, 59 deletions
diff --git a/les/peer.go b/les/peer.go index 9ae94b20f..8b506de62 100644 --- a/les/peer.go +++ b/les/peer.go @@ -14,7 +14,6 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -// Package les implements the Light Ethereum Subprotocol. package les import ( @@ -25,6 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" @@ -42,6 +42,17 @@ var ( const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) +// capacity limitation for parameter updates +const ( + allowedUpdateBytes = 100000 // initial/maximum allowed update size + allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance +) + +// if the total encoded size of a sent transaction batch is over txSizeCostLimit +// per transaction then the request cost is calculated as proportional to the +// encoded size instead of the transaction count +const txSizeCostLimit = 0x10000 + const ( announceTypeNone = iota announceTypeSimple @@ -63,17 +74,24 @@ type peer struct { headInfo *announceData lock sync.RWMutex - announceChn chan announceData - sendQueue *execQueue + sendQueue *execQueue + + errCh chan error + // responseLock ensures that responses are queued in the same order as + // RequestProcessed is called + responseLock sync.Mutex + responseCount uint64 poolEntry *poolEntry hasBlock func(common.Hash, uint64, bool) bool responseErrors int + updateCounter uint64 + updateTime mclock.AbsTime - fcClient *flowcontrol.ClientNode // nil if the peer is server only - fcServer *flowcontrol.ServerNode // nil if the peer is client only - fcServerParams *flowcontrol.ServerParams - fcCosts requestCostTable + fcClient *flowcontrol.ClientNode // nil if the peer is server only + fcServer *flowcontrol.ServerNode // nil if the peer is client only + fcParams flowcontrol.ServerParams + fcCosts requestCostTable isTrusted bool isOnlyAnnounce bool @@ -83,14 +101,34 @@ func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.Ms id := p.ID() return &peer{ - Peer: p, - rw: rw, - version: version, - network: network, - id: fmt.Sprintf("%x", id[:8]), - announceChn: make(chan announceData, 20), - isTrusted: isTrusted, + Peer: p, + rw: rw, + version: version, + network: network, + id: fmt.Sprintf("%x", id), + isTrusted: isTrusted, + } +} + +// rejectUpdate returns true if a parameter update has to be rejected because +// the size and/or rate of updates exceed the capacity limitation +func (p *peer) rejectUpdate(size uint64) bool { + now := mclock.Now() + if p.updateCounter == 0 { + p.updateTime = now + } else { + dt := now - p.updateTime + r := uint64(dt / mclock.AbsTime(allowedUpdateRate)) + if p.updateCounter > r { + p.updateCounter -= r + p.updateTime += mclock.AbsTime(allowedUpdateRate * time.Duration(r)) + } else { + p.updateCounter = 0 + p.updateTime = now + } } + p.updateCounter += size + return p.updateCounter > allowedUpdateBytes } func (p *peer) canQueue() bool { @@ -147,6 +185,20 @@ func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) { return p.fcServer.CanSend(maxCost) } +// updateCapacity updates the request serving capacity assigned to a given client +// and also sends an announcement about the updated flow control parameters +func (p *peer) updateCapacity(cap uint64) { + p.responseLock.Lock() + defer p.responseLock.Unlock() + + p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio} + p.fcClient.UpdateParams(p.fcParams) + var kvList keyValueList + kvList = kvList.add("flowControl/MRR", cap) + kvList = kvList.add("flowControl/BL", cap*bufLimitRatio) + p.queueSend(func() { p.SendAnnounce(announceData{Update: kvList}) }) +} + func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error { type req struct { ReqID uint64 @@ -155,12 +207,27 @@ func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) return p2p.Send(w, msgcode, req{reqID, data}) } -func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{}) error { +// reply struct represents a reply with the actual data already RLP encoded and +// only the bv (buffer value) missing. This allows the serving mechanism to +// calculate the bv value which depends on the data size before sending the reply. +type reply struct { + w p2p.MsgWriter + msgcode, reqID uint64 + data rlp.RawValue +} + +// send sends the reply with the calculated buffer value +func (r *reply) send(bv uint64) error { type resp struct { ReqID, BV uint64 - Data interface{} + Data rlp.RawValue } - return p2p.Send(w, msgcode, resp{reqID, bv, data}) + return p2p.Send(r.w, r.msgcode, resp{r.reqID, bv, r.data}) +} + +// size returns the RLP encoded size of the message data +func (r *reply) size() uint32 { + return uint32(len(r.data)) } func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { @@ -168,8 +235,34 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { defer p.lock.RUnlock() cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) - if cost > p.fcServerParams.BufLimit { - cost = p.fcServerParams.BufLimit + if cost > p.fcParams.BufLimit { + cost = p.fcParams.BufLimit + } + return cost +} + +func (p *peer) GetTxRelayCost(amount, size int) uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + + var msgcode uint64 + switch p.version { + case lpv1: + msgcode = SendTxMsg + case lpv2: + msgcode = SendTxV2Msg + default: + panic(nil) + } + + cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) + sizeCost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(size)/txSizeCostLimit + if sizeCost > cost { + cost = sizeCost + } + + if cost > p.fcParams.BufLimit { + cost = p.fcParams.BufLimit } return cost } @@ -188,52 +281,61 @@ func (p *peer) SendAnnounce(request announceData) error { return p2p.Send(p.rw, AnnounceMsg, request) } -// SendBlockHeaders sends a batch of block headers to the remote peer. -func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error { - return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers) +// ReplyBlockHeaders creates a reply with a batch of block headers +func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply { + data, _ := rlp.EncodeToBytes(headers) + return &reply{p.rw, BlockHeadersMsg, reqID, data} } -// SendBlockBodiesRLP sends a batch of block contents to the remote peer from +// ReplyBlockBodiesRLP creates a reply with a batch of block contents from // an already RLP encoded format. -func (p *peer) SendBlockBodiesRLP(reqID, bv uint64, bodies []rlp.RawValue) error { - return sendResponse(p.rw, BlockBodiesMsg, reqID, bv, bodies) +func (p *peer) ReplyBlockBodiesRLP(reqID uint64, bodies []rlp.RawValue) *reply { + data, _ := rlp.EncodeToBytes(bodies) + return &reply{p.rw, BlockBodiesMsg, reqID, data} } -// SendCodeRLP sends a batch of arbitrary internal data, corresponding to the +// ReplyCode creates a reply with a batch of arbitrary internal data, corresponding to the // hashes requested. -func (p *peer) SendCode(reqID, bv uint64, data [][]byte) error { - return sendResponse(p.rw, CodeMsg, reqID, bv, data) +func (p *peer) ReplyCode(reqID uint64, codes [][]byte) *reply { + data, _ := rlp.EncodeToBytes(codes) + return &reply{p.rw, CodeMsg, reqID, data} } -// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the +// ReplyReceiptsRLP creates a reply with a batch of transaction receipts, corresponding to the // ones requested from an already RLP encoded format. -func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error { - return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts) +func (p *peer) ReplyReceiptsRLP(reqID uint64, receipts []rlp.RawValue) *reply { + data, _ := rlp.EncodeToBytes(receipts) + return &reply{p.rw, ReceiptsMsg, reqID, data} } -// SendProofs sends a batch of legacy LES/1 merkle proofs, corresponding to the ones requested. -func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error { - return sendResponse(p.rw, ProofsV1Msg, reqID, bv, proofs) +// ReplyProofs creates a reply with a batch of legacy LES/1 merkle proofs, corresponding to the ones requested. +func (p *peer) ReplyProofs(reqID uint64, proofs proofsData) *reply { + data, _ := rlp.EncodeToBytes(proofs) + return &reply{p.rw, ProofsV1Msg, reqID, data} } -// SendProofsV2 sends a batch of merkle proofs, corresponding to the ones requested. -func (p *peer) SendProofsV2(reqID, bv uint64, proofs light.NodeList) error { - return sendResponse(p.rw, ProofsV2Msg, reqID, bv, proofs) +// ReplyProofsV2 creates a reply with a batch of merkle proofs, corresponding to the ones requested. +func (p *peer) ReplyProofsV2(reqID uint64, proofs light.NodeList) *reply { + data, _ := rlp.EncodeToBytes(proofs) + return &reply{p.rw, ProofsV2Msg, reqID, data} } -// SendHeaderProofs sends a batch of legacy LES/1 header proofs, corresponding to the ones requested. -func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error { - return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs) +// ReplyHeaderProofs creates a reply with a batch of legacy LES/1 header proofs, corresponding to the ones requested. +func (p *peer) ReplyHeaderProofs(reqID uint64, proofs []ChtResp) *reply { + data, _ := rlp.EncodeToBytes(proofs) + return &reply{p.rw, HeaderProofsMsg, reqID, data} } -// SendHelperTrieProofs sends a batch of HelperTrie proofs, corresponding to the ones requested. -func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) error { - return sendResponse(p.rw, HelperTrieProofsMsg, reqID, bv, resp) +// ReplyHelperTrieProofs creates a reply with a batch of HelperTrie proofs, corresponding to the ones requested. +func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply { + data, _ := rlp.EncodeToBytes(resp) + return &reply{p.rw, HelperTrieProofsMsg, reqID, data} } -// SendTxStatus sends a batch of transaction status records, corresponding to the ones requested. -func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error { - return sendResponse(p.rw, TxStatusMsg, reqID, bv, stats) +// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested. +func (p *peer) ReplyTxStatus(reqID uint64, stats []txStatus) *reply { + data, _ := rlp.EncodeToBytes(stats) + return &reply{p.rw, TxStatusMsg, reqID, data} } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the @@ -311,9 +413,9 @@ func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes) } -// SendTxStatus sends a batch of transactions to be added to the remote transaction pool. -func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { - p.Log().Debug("Fetching batch of transactions", "count", len(txs)) +// SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool. +func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error { + p.Log().Debug("Sending batch of transactions", "size", len(txs)) switch p.version { case lpv1: return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID @@ -344,12 +446,14 @@ func (l keyValueList) add(key string, val interface{}) keyValueList { return append(l, entry) } -func (l keyValueList) decode() keyValueMap { +func (l keyValueList) decode() (keyValueMap, uint64) { m := make(keyValueMap) + var size uint64 for _, entry := range l { m[entry.Key] = entry.Value + size += uint64(len(entry.Key)) + uint64(len(entry.Value)) + 8 } - return m + return m, size } func (m keyValueMap) get(key string, val interface{}) error { @@ -414,9 +518,15 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis } send = send.add("flowControl/BL", server.defParams.BufLimit) send = send.add("flowControl/MRR", server.defParams.MinRecharge) - list := server.fcCostStats.getCurrentList() - send = send.add("flowControl/MRC", list) - p.fcCosts = list.decode() + var costList RequestCostList + if server.costTracker != nil { + costList = server.costTracker.makeCostList() + } else { + costList = testCostList() + } + send = send.add("flowControl/MRC", costList) + p.fcCosts = costList.decode() + p.fcParams = server.defParams } else { //on client node p.announceType = announceTypeSimple @@ -430,8 +540,10 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis if err != nil { return err } - - recv := recvList.decode() + recv, size := recvList.decode() + if p.rejectUpdate(size) { + return errResp(ErrRequestRejected, "") + } var rGenesis, rHash common.Hash var rVersion, rNetwork, rNum uint64 @@ -492,7 +604,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis return errResp(ErrUselessPeer, "peer cannot serve requests") } - params := &flowcontrol.ServerParams{} + var params flowcontrol.ServerParams if err := recv.get("flowControl/BL", ¶ms.BufLimit); err != nil { return err } @@ -503,14 +615,38 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis if err := recv.get("flowControl/MRC", &MRC); err != nil { return err } - p.fcServerParams = params - p.fcServer = flowcontrol.NewServerNode(params) + p.fcParams = params + p.fcServer = flowcontrol.NewServerNode(params, &mclock.System{}) p.fcCosts = MRC.decode() } p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} return nil } +// updateFlowControl updates the flow control parameters belonging to the server +// node if the announced key/value set contains relevant fields +func (p *peer) updateFlowControl(update keyValueMap) { + if p.fcServer == nil { + return + } + params := p.fcParams + updateParams := false + if update.get("flowControl/BL", ¶ms.BufLimit) == nil { + updateParams = true + } + if update.get("flowControl/MRR", ¶ms.MinRecharge) == nil { + updateParams = true + } + if updateParams { + p.fcParams = params + p.fcServer.UpdateParams(params) + } + var MRC RequestCostList + if update.get("flowControl/MRC", &MRC) == nil { + p.fcCosts = MRC.decode() + } +} + // String implements fmt.Stringer. func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, |