aboutsummaryrefslogtreecommitdiffstats
path: root/les/peer.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2019-02-26 19:32:48 +0800
committerFelix Lange <fjl@users.noreply.github.com>2019-02-26 19:32:48 +0800
commitc2003ed63b975c6318e4dd7e65b69c60777b0ddf (patch)
tree0045c39070c4d2b9fcaa93cbcbdd8d5ea4f4a29f /les/peer.go
parentc2b33a117f686069e36d58d937e1c75d72d7b94c (diff)
downloadgo-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.gz
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.bz2
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.lz
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.xz
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.zst
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.zip
les, les/flowcontrol: improved request serving and flow control (#18230)
This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
Diffstat (limited to 'les/peer.go')
-rw-r--r--les/peer.go254
1 files changed, 195 insertions, 59 deletions
diff --git a/les/peer.go b/les/peer.go
index 9ae94b20f..8b506de62 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// Package les implements the Light Ethereum Subprotocol.
package les
import (
@@ -25,6 +24,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/flowcontrol"
@@ -42,6 +42,17 @@ var (
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
+// capacity limitation for parameter updates
+const (
+ allowedUpdateBytes = 100000 // initial/maximum allowed update size
+ allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance
+)
+
+// if the total encoded size of a sent transaction batch is over txSizeCostLimit
+// per transaction then the request cost is calculated as proportional to the
+// encoded size instead of the transaction count
+const txSizeCostLimit = 0x10000
+
const (
announceTypeNone = iota
announceTypeSimple
@@ -63,17 +74,24 @@ type peer struct {
headInfo *announceData
lock sync.RWMutex
- announceChn chan announceData
- sendQueue *execQueue
+ sendQueue *execQueue
+
+ errCh chan error
+ // responseLock ensures that responses are queued in the same order as
+ // RequestProcessed is called
+ responseLock sync.Mutex
+ responseCount uint64
poolEntry *poolEntry
hasBlock func(common.Hash, uint64, bool) bool
responseErrors int
+ updateCounter uint64
+ updateTime mclock.AbsTime
- fcClient *flowcontrol.ClientNode // nil if the peer is server only
- fcServer *flowcontrol.ServerNode // nil if the peer is client only
- fcServerParams *flowcontrol.ServerParams
- fcCosts requestCostTable
+ fcClient *flowcontrol.ClientNode // nil if the peer is server only
+ fcServer *flowcontrol.ServerNode // nil if the peer is client only
+ fcParams flowcontrol.ServerParams
+ fcCosts requestCostTable
isTrusted bool
isOnlyAnnounce bool
@@ -83,14 +101,34 @@ func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.Ms
id := p.ID()
return &peer{
- Peer: p,
- rw: rw,
- version: version,
- network: network,
- id: fmt.Sprintf("%x", id[:8]),
- announceChn: make(chan announceData, 20),
- isTrusted: isTrusted,
+ Peer: p,
+ rw: rw,
+ version: version,
+ network: network,
+ id: fmt.Sprintf("%x", id),
+ isTrusted: isTrusted,
+ }
+}
+
+// rejectUpdate returns true if a parameter update has to be rejected because
+// the size and/or rate of updates exceed the capacity limitation
+func (p *peer) rejectUpdate(size uint64) bool {
+ now := mclock.Now()
+ if p.updateCounter == 0 {
+ p.updateTime = now
+ } else {
+ dt := now - p.updateTime
+ r := uint64(dt / mclock.AbsTime(allowedUpdateRate))
+ if p.updateCounter > r {
+ p.updateCounter -= r
+ p.updateTime += mclock.AbsTime(allowedUpdateRate * time.Duration(r))
+ } else {
+ p.updateCounter = 0
+ p.updateTime = now
+ }
}
+ p.updateCounter += size
+ return p.updateCounter > allowedUpdateBytes
}
func (p *peer) canQueue() bool {
@@ -147,6 +185,20 @@ func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
return p.fcServer.CanSend(maxCost)
}
+// updateCapacity updates the request serving capacity assigned to a given client
+// and also sends an announcement about the updated flow control parameters
+func (p *peer) updateCapacity(cap uint64) {
+ p.responseLock.Lock()
+ defer p.responseLock.Unlock()
+
+ p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio}
+ p.fcClient.UpdateParams(p.fcParams)
+ var kvList keyValueList
+ kvList = kvList.add("flowControl/MRR", cap)
+ kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
+ p.queueSend(func() { p.SendAnnounce(announceData{Update: kvList}) })
+}
+
func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
type req struct {
ReqID uint64
@@ -155,12 +207,27 @@ func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{})
return p2p.Send(w, msgcode, req{reqID, data})
}
-func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{}) error {
+// reply struct represents a reply with the actual data already RLP encoded and
+// only the bv (buffer value) missing. This allows the serving mechanism to
+// calculate the bv value which depends on the data size before sending the reply.
+type reply struct {
+ w p2p.MsgWriter
+ msgcode, reqID uint64
+ data rlp.RawValue
+}
+
+// send sends the reply with the calculated buffer value
+func (r *reply) send(bv uint64) error {
type resp struct {
ReqID, BV uint64
- Data interface{}
+ Data rlp.RawValue
}
- return p2p.Send(w, msgcode, resp{reqID, bv, data})
+ return p2p.Send(r.w, r.msgcode, resp{r.reqID, bv, r.data})
+}
+
+// size returns the RLP encoded size of the message data
+func (r *reply) size() uint32 {
+ return uint32(len(r.data))
}
func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
@@ -168,8 +235,34 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
defer p.lock.RUnlock()
cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
- if cost > p.fcServerParams.BufLimit {
- cost = p.fcServerParams.BufLimit
+ if cost > p.fcParams.BufLimit {
+ cost = p.fcParams.BufLimit
+ }
+ return cost
+}
+
+func (p *peer) GetTxRelayCost(amount, size int) uint64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ var msgcode uint64
+ switch p.version {
+ case lpv1:
+ msgcode = SendTxMsg
+ case lpv2:
+ msgcode = SendTxV2Msg
+ default:
+ panic(nil)
+ }
+
+ cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
+ sizeCost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(size)/txSizeCostLimit
+ if sizeCost > cost {
+ cost = sizeCost
+ }
+
+ if cost > p.fcParams.BufLimit {
+ cost = p.fcParams.BufLimit
}
return cost
}
@@ -188,52 +281,61 @@ func (p *peer) SendAnnounce(request announceData) error {
return p2p.Send(p.rw, AnnounceMsg, request)
}
-// SendBlockHeaders sends a batch of block headers to the remote peer.
-func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error {
- return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers)
+// ReplyBlockHeaders creates a reply with a batch of block headers
+func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply {
+ data, _ := rlp.EncodeToBytes(headers)
+ return &reply{p.rw, BlockHeadersMsg, reqID, data}
}
-// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
+// ReplyBlockBodiesRLP creates a reply with a batch of block contents from
// an already RLP encoded format.
-func (p *peer) SendBlockBodiesRLP(reqID, bv uint64, bodies []rlp.RawValue) error {
- return sendResponse(p.rw, BlockBodiesMsg, reqID, bv, bodies)
+func (p *peer) ReplyBlockBodiesRLP(reqID uint64, bodies []rlp.RawValue) *reply {
+ data, _ := rlp.EncodeToBytes(bodies)
+ return &reply{p.rw, BlockBodiesMsg, reqID, data}
}
-// SendCodeRLP sends a batch of arbitrary internal data, corresponding to the
+// ReplyCode creates a reply with a batch of arbitrary internal data, corresponding to the
// hashes requested.
-func (p *peer) SendCode(reqID, bv uint64, data [][]byte) error {
- return sendResponse(p.rw, CodeMsg, reqID, bv, data)
+func (p *peer) ReplyCode(reqID uint64, codes [][]byte) *reply {
+ data, _ := rlp.EncodeToBytes(codes)
+ return &reply{p.rw, CodeMsg, reqID, data}
}
-// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
+// ReplyReceiptsRLP creates a reply with a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
-func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error {
- return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts)
+func (p *peer) ReplyReceiptsRLP(reqID uint64, receipts []rlp.RawValue) *reply {
+ data, _ := rlp.EncodeToBytes(receipts)
+ return &reply{p.rw, ReceiptsMsg, reqID, data}
}
-// SendProofs sends a batch of legacy LES/1 merkle proofs, corresponding to the ones requested.
-func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error {
- return sendResponse(p.rw, ProofsV1Msg, reqID, bv, proofs)
+// ReplyProofs creates a reply with a batch of legacy LES/1 merkle proofs, corresponding to the ones requested.
+func (p *peer) ReplyProofs(reqID uint64, proofs proofsData) *reply {
+ data, _ := rlp.EncodeToBytes(proofs)
+ return &reply{p.rw, ProofsV1Msg, reqID, data}
}
-// SendProofsV2 sends a batch of merkle proofs, corresponding to the ones requested.
-func (p *peer) SendProofsV2(reqID, bv uint64, proofs light.NodeList) error {
- return sendResponse(p.rw, ProofsV2Msg, reqID, bv, proofs)
+// ReplyProofsV2 creates a reply with a batch of merkle proofs, corresponding to the ones requested.
+func (p *peer) ReplyProofsV2(reqID uint64, proofs light.NodeList) *reply {
+ data, _ := rlp.EncodeToBytes(proofs)
+ return &reply{p.rw, ProofsV2Msg, reqID, data}
}
-// SendHeaderProofs sends a batch of legacy LES/1 header proofs, corresponding to the ones requested.
-func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error {
- return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs)
+// ReplyHeaderProofs creates a reply with a batch of legacy LES/1 header proofs, corresponding to the ones requested.
+func (p *peer) ReplyHeaderProofs(reqID uint64, proofs []ChtResp) *reply {
+ data, _ := rlp.EncodeToBytes(proofs)
+ return &reply{p.rw, HeaderProofsMsg, reqID, data}
}
-// SendHelperTrieProofs sends a batch of HelperTrie proofs, corresponding to the ones requested.
-func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) error {
- return sendResponse(p.rw, HelperTrieProofsMsg, reqID, bv, resp)
+// ReplyHelperTrieProofs creates a reply with a batch of HelperTrie proofs, corresponding to the ones requested.
+func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply {
+ data, _ := rlp.EncodeToBytes(resp)
+ return &reply{p.rw, HelperTrieProofsMsg, reqID, data}
}
-// SendTxStatus sends a batch of transaction status records, corresponding to the ones requested.
-func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error {
- return sendResponse(p.rw, TxStatusMsg, reqID, bv, stats)
+// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
+func (p *peer) ReplyTxStatus(reqID uint64, stats []txStatus) *reply {
+ data, _ := rlp.EncodeToBytes(stats)
+ return &reply{p.rw, TxStatusMsg, reqID, data}
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
@@ -311,9 +413,9 @@ func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error
return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
}
-// SendTxStatus sends a batch of transactions to be added to the remote transaction pool.
-func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
- p.Log().Debug("Fetching batch of transactions", "count", len(txs))
+// SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool.
+func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error {
+ p.Log().Debug("Sending batch of transactions", "size", len(txs))
switch p.version {
case lpv1:
return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID
@@ -344,12 +446,14 @@ func (l keyValueList) add(key string, val interface{}) keyValueList {
return append(l, entry)
}
-func (l keyValueList) decode() keyValueMap {
+func (l keyValueList) decode() (keyValueMap, uint64) {
m := make(keyValueMap)
+ var size uint64
for _, entry := range l {
m[entry.Key] = entry.Value
+ size += uint64(len(entry.Key)) + uint64(len(entry.Value)) + 8
}
- return m
+ return m, size
}
func (m keyValueMap) get(key string, val interface{}) error {
@@ -414,9 +518,15 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
}
send = send.add("flowControl/BL", server.defParams.BufLimit)
send = send.add("flowControl/MRR", server.defParams.MinRecharge)
- list := server.fcCostStats.getCurrentList()
- send = send.add("flowControl/MRC", list)
- p.fcCosts = list.decode()
+ var costList RequestCostList
+ if server.costTracker != nil {
+ costList = server.costTracker.makeCostList()
+ } else {
+ costList = testCostList()
+ }
+ send = send.add("flowControl/MRC", costList)
+ p.fcCosts = costList.decode()
+ p.fcParams = server.defParams
} else {
//on client node
p.announceType = announceTypeSimple
@@ -430,8 +540,10 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
if err != nil {
return err
}
-
- recv := recvList.decode()
+ recv, size := recvList.decode()
+ if p.rejectUpdate(size) {
+ return errResp(ErrRequestRejected, "")
+ }
var rGenesis, rHash common.Hash
var rVersion, rNetwork, rNum uint64
@@ -492,7 +604,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
return errResp(ErrUselessPeer, "peer cannot serve requests")
}
- params := &flowcontrol.ServerParams{}
+ var params flowcontrol.ServerParams
if err := recv.get("flowControl/BL", &params.BufLimit); err != nil {
return err
}
@@ -503,14 +615,38 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
if err := recv.get("flowControl/MRC", &MRC); err != nil {
return err
}
- p.fcServerParams = params
- p.fcServer = flowcontrol.NewServerNode(params)
+ p.fcParams = params
+ p.fcServer = flowcontrol.NewServerNode(params, &mclock.System{})
p.fcCosts = MRC.decode()
}
p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
return nil
}
+// updateFlowControl updates the flow control parameters belonging to the server
+// node if the announced key/value set contains relevant fields
+func (p *peer) updateFlowControl(update keyValueMap) {
+ if p.fcServer == nil {
+ return
+ }
+ params := p.fcParams
+ updateParams := false
+ if update.get("flowControl/BL", &params.BufLimit) == nil {
+ updateParams = true
+ }
+ if update.get("flowControl/MRR", &params.MinRecharge) == nil {
+ updateParams = true
+ }
+ if updateParams {
+ p.fcParams = params
+ p.fcServer.UpdateParams(params)
+ }
+ var MRC RequestCostList
+ if update.get("flowControl/MRC", &MRC) == nil {
+ p.fcCosts = MRC.decode()
+ }
+}
+
// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,