aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2019-05-31 02:51:13 +0800
committerGitHub <noreply@github.com>2019-05-31 02:51:13 +0800
commit58497f46bd0bdd105828c30500e863e826e598cd (patch)
tree7c658530edfebb6e47ed5f993753f4b48fd1747e
parent3d58268bba92c6c8f7f035bafcd1608bc22cee51 (diff)
downloadgo-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.gz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.bz2
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.lz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.xz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.zst
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.zip
les, les/flowcontrol: implement LES/3 (#19329)
les, les/flowcontrol: implement LES/3
-rw-r--r--cmd/utils/flags.go8
-rw-r--r--core/blockchain.go12
-rw-r--r--core/blockchain_test.go22
-rw-r--r--les/api.go32
-rw-r--r--les/costtracker.go188
-rw-r--r--les/csvlogger/csvlogger.go227
-rw-r--r--les/distributor.go33
-rw-r--r--les/execqueue.go9
-rw-r--r--les/flowcontrol/control.go111
-rw-r--r--les/flowcontrol/manager.go193
-rw-r--r--les/flowcontrol/manager_test.go12
-rw-r--r--les/freeclient.go57
-rw-r--r--les/freeclient_test.go4
-rw-r--r--les/handler.go649
-rw-r--r--les/handler_test.go51
-rw-r--r--les/helper_test.go29
-rw-r--r--les/peer.go110
-rw-r--r--les/peer_test.go4
-rw-r--r--les/protocol.go10
-rw-r--r--les/retrieve.go70
-rw-r--r--les/server.go91
-rw-r--r--les/servingqueue.go219
22 files changed, 1533 insertions, 608 deletions
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index ddeb44f34..93d162370 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -205,13 +205,13 @@ var (
}
LightBandwidthInFlag = cli.IntFlag{
Name: "lightbwin",
- Usage: "Incoming bandwidth limit for light server (1000 bytes/sec, 0 = unlimited)",
- Value: 1000,
+ Usage: "Incoming bandwidth limit for light server (kilobytes/sec, 0 = unlimited)",
+ Value: 0,
}
LightBandwidthOutFlag = cli.IntFlag{
Name: "lightbwout",
- Usage: "Outgoing bandwidth limit for light server (1000 bytes/sec, 0 = unlimited)",
- Value: 5000,
+ Usage: "Outgoing bandwidth limit for light server (kilobytes/sec, 0 = unlimited)",
+ Value: 0,
}
LightPeersFlag = cli.IntFlag{
Name: "lightpeers",
diff --git a/core/blockchain.go b/core/blockchain.go
index 711959692..2355f0ea3 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -74,7 +74,7 @@ const (
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
badBlockLimit = 10
- triesInMemory = 128
+ TriesInMemory = 128
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
@@ -799,7 +799,7 @@ func (bc *BlockChain) Stop() {
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()
- for _, offset := range []uint64{0, 1, triesInMemory - 1} {
+ for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
@@ -1224,7 +1224,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))
- if current := block.NumberU64(); current > triesInMemory {
+ if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
@@ -1234,7 +1234,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
- chosen := current - triesInMemory
+ chosen := current - TriesInMemory
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
@@ -1246,8 +1246,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
- if chosen < lastWrite+triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
- log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory)
+ if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
+ log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true)
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 8dfcda6d4..27115af52 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -1534,7 +1534,7 @@ func TestTrieForkGC(t *testing.T) {
db := rawdb.NewMemoryDatabase()
genesis := new(Genesis).MustCommit(db)
- blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
+ blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
// Generate a bunch of fork blocks, each side forking from the canonical chain
forks := make([]*types.Block, len(blocks))
@@ -1563,7 +1563,7 @@ func TestTrieForkGC(t *testing.T) {
}
}
// Dereference all the recent tries and ensure no past trie is left in
- for i := 0; i < triesInMemory; i++ {
+ for i := 0; i < TriesInMemory; i++ {
chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
}
@@ -1582,8 +1582,8 @@ func TestLargeReorgTrieGC(t *testing.T) {
genesis := new(Genesis).MustCommit(db)
shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 64, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
- original, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) })
- competitor, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) })
+ original, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) })
+ competitor, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*TriesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) })
// Import the shared chain and the original canonical one
diskdb := rawdb.NewMemoryDatabase()
@@ -1618,7 +1618,7 @@ func TestLargeReorgTrieGC(t *testing.T) {
if _, err := chain.InsertChain(competitor[len(competitor)-2:]); err != nil {
t.Fatalf("failed to finalize competitor chain: %v", err)
}
- for i, block := range competitor[:len(competitor)-triesInMemory] {
+ for i, block := range competitor[:len(competitor)-TriesInMemory] {
if node, _ := chain.stateCache.TrieDB().Node(block.Root()); node != nil {
t.Fatalf("competitor %d: competing chain state missing", i)
}
@@ -1753,7 +1753,7 @@ func TestLowDiffLongChain(t *testing.T) {
// We must use a pretty long chain to ensure that the fork doesn't overtake us
// until after at least 128 blocks post tip
- blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 6*triesInMemory, func(i int, b *BlockGen) {
+ blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 6*TriesInMemory, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{1})
b.OffsetTime(-9)
})
@@ -1771,7 +1771,7 @@ func TestLowDiffLongChain(t *testing.T) {
}
// Generate fork chain, starting from an early block
parent := blocks[10]
- fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 8*triesInMemory, func(i int, b *BlockGen) {
+ fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 8*TriesInMemory, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{2})
})
@@ -1806,7 +1806,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon
genesis := new(Genesis).MustCommit(db)
// Generate and import the canonical chain
- blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*triesInMemory, nil)
+ blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*TriesInMemory, nil)
diskdb := rawdb.NewMemoryDatabase()
new(Genesis).MustCommit(diskdb)
chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
@@ -1817,9 +1817,9 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
}
- lastPrunedIndex := len(blocks) - triesInMemory - 1
+ lastPrunedIndex := len(blocks) - TriesInMemory - 1
lastPrunedBlock := blocks[lastPrunedIndex]
- firstNonPrunedBlock := blocks[len(blocks)-triesInMemory]
+ firstNonPrunedBlock := blocks[len(blocks)-TriesInMemory]
// Verify pruning of lastPrunedBlock
if chain.HasBlockAndState(lastPrunedBlock.Hash(), lastPrunedBlock.NumberU64()) {
@@ -1836,7 +1836,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon
// Generate fork chain, make it longer than canon
parentIndex := lastPrunedIndex + blocksBetweenCommonAncestorAndPruneblock
parent := blocks[parentIndex]
- fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 2*triesInMemory, func(i int, b *BlockGen) {
+ fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 2*TriesInMemory, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{2})
})
// Prepend the parent(s)
diff --git a/les/api.go b/les/api.go
index a933cbd06..3a8d49ca5 100644
--- a/les/api.go
+++ b/les/api.go
@@ -19,11 +19,13 @@ package les
import (
"context"
"errors"
+ "fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
)
@@ -99,7 +101,7 @@ func (s tcSubs) send(tc uint64, underrun bool) {
// MinimumCapacity queries minimum assignable capacity for a single client
func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 {
- return hexutil.Uint64(minCapacity)
+ return hexutil.Uint64(api.server.minCapacity)
}
// FreeClientCapacity queries the capacity provided for free clients
@@ -115,7 +117,7 @@ func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 {
// Note: assigned capacity can be changed while the client is connected with
// immediate effect.
func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error {
- if cap != 0 && cap < minCapacity {
+ if cap != 0 && cap < api.server.minCapacity {
return ErrMinCap
}
return api.server.priorityClientPool.setClientCapacity(id, cap)
@@ -144,6 +146,8 @@ type priorityClientPool struct {
totalCap, totalCapAnnounced uint64
totalConnectedCap, freeClientCap uint64
maxPeers, priorityCount int
+ logger *csvlogger.Logger
+ logTotalPriConn *csvlogger.Channel
subs tcSubs
updateSchedule []scheduledUpdate
@@ -164,12 +168,14 @@ type priorityClientInfo struct {
}
// newPriorityClientPool creates a new priority client pool
-func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool) *priorityClientPool {
+func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool {
return &priorityClientPool{
- clients: make(map[enode.ID]priorityClientInfo),
- freeClientCap: freeClientCap,
- ps: ps,
- child: child,
+ clients: make(map[enode.ID]priorityClientInfo),
+ freeClientCap: freeClientCap,
+ ps: ps,
+ child: child,
+ logger: eventLogger,
+ logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0),
}
}
@@ -185,6 +191,7 @@ func (v *priorityClientPool) registerPeer(p *peer) {
id := p.ID()
c := v.clients[id]
+ v.logger.Event(fmt.Sprintf("priorityClientPool: registerPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes()))
if c.connected {
return
}
@@ -192,6 +199,7 @@ func (v *priorityClientPool) registerPeer(p *peer) {
v.child.registerPeer(p)
}
if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap {
+ v.logger.Event(fmt.Sprintf("priorityClientPool: rejected, %x", id.Bytes()))
go v.ps.Unregister(p.id)
return
}
@@ -202,6 +210,8 @@ func (v *priorityClientPool) registerPeer(p *peer) {
if c.cap != 0 {
v.priorityCount++
v.totalConnectedCap += c.cap
+ v.logger.Event(fmt.Sprintf("priorityClientPool: accepted with %d capacity, %x", c.cap, id.Bytes()))
+ v.logTotalPriConn.Update(float64(v.totalConnectedCap))
if v.child != nil {
v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
}
@@ -217,6 +227,7 @@ func (v *priorityClientPool) unregisterPeer(p *peer) {
id := p.ID()
c := v.clients[id]
+ v.logger.Event(fmt.Sprintf("priorityClientPool: unregisterPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes()))
if !c.connected {
return
}
@@ -225,6 +236,7 @@ func (v *priorityClientPool) unregisterPeer(p *peer) {
v.clients[id] = c
v.priorityCount--
v.totalConnectedCap -= c.cap
+ v.logTotalPriConn.Update(float64(v.totalConnectedCap))
if v.child != nil {
v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
}
@@ -299,8 +311,10 @@ func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) {
if v.priorityCount > count || v.totalConnectedCap > totalCap {
for id, c := range v.clients {
if c.connected {
+ v.logger.Event(fmt.Sprintf("priorityClientPool: setLimitsNow kicked out, %x", id.Bytes()))
c.connected = false
v.totalConnectedCap -= c.cap
+ v.logTotalPriConn.Update(float64(v.totalConnectedCap))
v.priorityCount--
v.clients[id] = c
go v.ps.Unregister(c.peer.id)
@@ -356,6 +370,7 @@ func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
v.priorityCount--
}
v.totalConnectedCap += cap - c.cap
+ v.logTotalPriConn.Update(float64(v.totalConnectedCap))
if v.child != nil {
v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
}
@@ -374,6 +389,9 @@ func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
} else {
delete(v.clients, id)
}
+ if c.connected {
+ v.logger.Event(fmt.Sprintf("priorityClientPool: changed capacity to %d, %x", cap, id.Bytes()))
+ }
return nil
}
diff --git a/les/costtracker.go b/les/costtracker.go
index 332d73343..e463c9f8b 100644
--- a/les/costtracker.go
+++ b/les/costtracker.go
@@ -18,6 +18,7 @@ package les
import (
"encoding/binary"
+ "fmt"
"math"
"sync"
"sync/atomic"
@@ -26,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/log"
)
@@ -52,7 +54,7 @@ var (
GetCodeMsg: {0, 80},
GetProofsV2Msg: {0, 80},
GetHelperTrieProofsMsg: {0, 20},
- SendTxV2Msg: {0, 66000},
+ SendTxV2Msg: {0, 16500},
GetTxStatusMsg: {0, 50},
}
// maximum outgoing message size estimates
@@ -66,17 +68,27 @@ var (
SendTxV2Msg: {0, 100},
GetTxStatusMsg: {0, 100},
}
- minBufLimit = uint64(50000000 * maxCostFactor) // minimum buffer limit allowed for a client
- minCapacity = (minBufLimit-1)/bufLimitRatio + 1 // minimum capacity allowed for a client
+ // request amounts that have to fit into the minimum buffer size minBufferMultiplier times
+ minBufferReqAmount = map[uint64]uint64{
+ GetBlockHeadersMsg: 192,
+ GetBlockBodiesMsg: 1,
+ GetReceiptsMsg: 1,
+ GetCodeMsg: 1,
+ GetProofsV2Msg: 1,
+ GetHelperTrieProofsMsg: 16,
+ SendTxV2Msg: 8,
+ GetTxStatusMsg: 64,
+ }
+ minBufferMultiplier = 3
)
const (
maxCostFactor = 2 // ratio of maximum and average cost estimates
- gfInitWeight = time.Second * 10
- gfMaxWeight = time.Hour
gfUsageThreshold = 0.5
gfUsageTC = time.Second
- gfDbKey = "_globalCostFactor"
+ gfRaiseTC = time.Second * 200
+ gfDropTC = time.Second * 50
+ gfDbKey = "_globalCostFactorV3"
)
// costTracker is responsible for calculating costs and cost estimates on the
@@ -94,21 +106,30 @@ type costTracker struct {
inSizeFactor, outSizeFactor float64
gf, utilTarget float64
+ minBufLimit uint64
gfUpdateCh chan gfUpdate
gfLock sync.RWMutex
totalRechargeCh chan uint64
- stats map[uint64][]uint64
+ stats map[uint64][]uint64
+ logger *csvlogger.Logger
+ logRecentTime, logRecentAvg, logTotalRecharge, logRelCost *csvlogger.Channel
}
-// newCostTracker creates a cost tracker and loads the cost factor statistics from the database
-func newCostTracker(db ethdb.Database, config *eth.Config) *costTracker {
+// newCostTracker creates a cost tracker and loads the cost factor statistics from the database.
+// It also returns the minimum capacity that can be assigned to any peer.
+func newCostTracker(db ethdb.Database, config *eth.Config, logger *csvlogger.Logger) (*costTracker, uint64) {
utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100
ct := &costTracker{
- db: db,
- stopCh: make(chan chan struct{}),
- utilTarget: utilTarget,
+ db: db,
+ stopCh: make(chan chan struct{}),
+ utilTarget: utilTarget,
+ logger: logger,
+ logRelCost: logger.NewMinMaxChannel("relativeCost", true),
+ logRecentTime: logger.NewMinMaxChannel("recentTime", true),
+ logRecentAvg: logger.NewMinMaxChannel("recentAvg", true),
+ logTotalRecharge: logger.NewChannel("totalRecharge", 0.01),
}
if config.LightBandwidthIn > 0 {
ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn)
@@ -123,7 +144,16 @@ func newCostTracker(db ethdb.Database, config *eth.Config) *costTracker {
}
}
ct.gfLoop()
- return ct
+ costList := ct.makeCostList(ct.globalFactor() * 1.25)
+ for _, c := range costList {
+ amount := minBufferReqAmount[c.MsgCode]
+ cost := c.BaseCost + amount*c.ReqCost
+ if cost > ct.minBufLimit {
+ ct.minBufLimit = cost
+ }
+ }
+ ct.minBufLimit *= uint64(minBufferMultiplier)
+ return ct, (ct.minBufLimit-1)/bufLimitRatio + 1
}
// stop stops the cost tracker and saves the cost factor statistics to the database
@@ -138,16 +168,14 @@ func (ct *costTracker) stop() {
// makeCostList returns upper cost estimates based on the hardcoded cost estimate
// tables and the optionally specified incoming/outgoing bandwidth limits
-func (ct *costTracker) makeCostList() RequestCostList {
- maxCost := func(avgTime, inSize, outSize uint64) uint64 {
- globalFactor := ct.globalFactor()
-
- cost := avgTime * maxCostFactor
- inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor * maxCostFactor)
+func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList {
+ maxCost := func(avgTimeCost, inSize, outSize uint64) uint64 {
+ cost := avgTimeCost * maxCostFactor
+ inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor)
if inSizeCost > cost {
cost = inSizeCost
}
- outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor * maxCostFactor)
+ outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor)
if outSizeCost > cost {
cost = outSizeCost
}
@@ -155,17 +183,29 @@ func (ct *costTracker) makeCostList() RequestCostList {
}
var list RequestCostList
for code, data := range reqAvgTimeCost {
+ baseCost := maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost)
+ reqCost := maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost)
+ if ct.minBufLimit != 0 {
+ // if minBufLimit is set then always enforce maximum request cost <= minBufLimit
+ maxCost := baseCost + reqCost*minBufferReqAmount[code]
+ if maxCost > ct.minBufLimit {
+ mul := 0.999 * float64(ct.minBufLimit) / float64(maxCost)
+ baseCost = uint64(float64(baseCost) * mul)
+ reqCost = uint64(float64(reqCost) * mul)
+ }
+ }
+
list = append(list, requestCostListItem{
MsgCode: code,
- BaseCost: maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost),
- ReqCost: maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost),
+ BaseCost: baseCost,
+ ReqCost: reqCost,
})
}
return list
}
type gfUpdate struct {
- avgTime, servingTime float64
+ avgTimeCost, servingTime float64
}
// gfLoop starts an event loop which updates the global cost factor which is
@@ -178,45 +218,74 @@ type gfUpdate struct {
// total allowed serving time per second but nominated in cost units, should
// also be scaled with the cost factor and is also updated by this loop.
func (ct *costTracker) gfLoop() {
- var gfUsage, gfSum, gfWeight float64
+ var gfLog, recentTime, recentAvg float64
lastUpdate := mclock.Now()
expUpdate := lastUpdate
data, _ := ct.db.Get([]byte(gfDbKey))
- if len(data) == 16 {
- gfSum = math.Float64frombits(binary.BigEndian.Uint64(data[0:8]))
- gfWeight = math.Float64frombits(binary.BigEndian.Uint64(data[8:16]))
+ if len(data) == 8 {
+ gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:]))
}
- if gfWeight < float64(gfInitWeight) {
- gfSum = float64(gfInitWeight)
- gfWeight = float64(gfInitWeight)
- }
- gf := gfSum / gfWeight
+ gf := math.Exp(gfLog)
ct.gf = gf
+ totalRecharge := ct.utilTarget * gf
ct.gfUpdateCh = make(chan gfUpdate, 100)
+ threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / 1000000
go func() {
+ saveCostFactor := func() {
+ var data [8]byte
+ binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog))
+ ct.db.Put([]byte(gfDbKey), data[:])
+ log.Debug("global cost factor saved", "value", gf)
+ }
+ saveTicker := time.NewTicker(time.Minute * 10)
+
for {
select {
case r := <-ct.gfUpdateCh:
now := mclock.Now()
- max := r.servingTime * gf
- if r.avgTime > max {
- max = r.avgTime
+ if ct.logRelCost != nil && r.avgTimeCost > 1e-20 {
+ ct.logRelCost.Update(r.servingTime * gf / r.avgTimeCost)
+ }
+ if r.servingTime > 1000000000 {
+ ct.logger.Event(fmt.Sprintf("Very long servingTime = %f avgTimeCost = %f costFactor = %f", r.servingTime, r.avgTimeCost, gf))
}
dt := float64(now - expUpdate)
expUpdate = now
- gfUsage = gfUsage*math.Exp(-dt/float64(gfUsageTC)) + max*1000000/float64(gfUsageTC)
+ exp := math.Exp(-dt / float64(gfUsageTC))
+ // calculate gf correction until now, based on previous values
+ var gfCorr float64
+ max := recentTime
+ if recentAvg > max {
+ max = recentAvg
+ }
+ // we apply continuous correction when MAX(recentTime, recentAvg) > threshold
+ if max > threshold {
+ // calculate correction time between last expUpdate and now
+ if max*exp >= threshold {
+ gfCorr = dt
+ } else {
+ gfCorr = math.Log(max/threshold) * float64(gfUsageTC)
+ }
+ // calculate log(gf) correction with the right direction and time constant
+ if recentTime > recentAvg {
+ // drop gf if actual serving times are larger than average estimates
+ gfCorr /= -float64(gfDropTC)
+ } else {
+ // raise gf if actual serving times are smaller than average estimates
+ gfCorr /= float64(gfRaiseTC)
+ }
+ }
+ // update recent cost values with current request
+ recentTime = recentTime*exp + r.servingTime
+ recentAvg = recentAvg*exp + r.avgTimeCost/gf
- if gfUsage >= gfUsageThreshold*ct.utilTarget*gf {
- gfSum += r.avgTime
- gfWeight += r.servingTime
+ if gfCorr != 0 {
+ gfLog += gfCorr
+ gf = math.Exp(gfLog)
if time.Duration(now-lastUpdate) > time.Second {
- gf = gfSum / gfWeight
- if gfWeight >= float64(gfMaxWeight) {
- gfSum = gf * float64(gfMaxWeight)
- gfWeight = float64(gfMaxWeight)
- }
+ totalRecharge = ct.utilTarget * gf
lastUpdate = now
ct.gfLock.Lock()
ct.gf = gf
@@ -224,19 +293,22 @@ func (ct *costTracker) gfLoop() {
ct.gfLock.Unlock()
if ch != nil {
select {
- case ct.totalRechargeCh <- uint64(ct.utilTarget * gf):
+ case ct.totalRechargeCh <- uint64(totalRecharge):
default:
}
}
- log.Debug("global cost factor updated", "gf", gf, "weight", time.Duration(gfWeight))
+ log.Debug("global cost factor updated", "gf", gf)
}
}
+ ct.logRecentTime.Update(recentTime)
+ ct.logRecentAvg.Update(recentAvg)
+ ct.logTotalRecharge.Update(totalRecharge)
+
+ case <-saveTicker.C:
+ saveCostFactor()
+
case stopCh := <-ct.stopCh:
- var data [16]byte
- binary.BigEndian.PutUint64(data[0:8], math.Float64bits(gfSum))
- binary.BigEndian.PutUint64(data[8:16], math.Float64bits(gfWeight))
- ct.db.Put([]byte(gfDbKey), data[:])
- log.Debug("global cost factor saved", "sum", time.Duration(gfSum), "weight", time.Duration(gfWeight))
+ saveCostFactor()
close(stopCh)
return
}
@@ -275,15 +347,15 @@ func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 {
// average estimate statistics
func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
avg := reqAvgTimeCost[code]
- avgTime := avg.baseCost + amount*avg.reqCost
+ avgTimeCost := avg.baseCost + amount*avg.reqCost
select {
- case ct.gfUpdateCh <- gfUpdate{float64(avgTime), float64(servingTime)}:
+ case ct.gfUpdateCh <- gfUpdate{float64(avgTimeCost), float64(servingTime)}:
default:
}
if makeCostStats {
realCost <<= 4
l := 0
- for l < 9 && realCost > avgTime {
+ for l < 9 && realCost > avgTimeCost {
l++
realCost >>= 1
}
@@ -339,8 +411,8 @@ type (
}
)
-// getCost calculates the estimated cost for a given request type and amount
-func (table requestCostTable) getCost(code, amount uint64) uint64 {
+// getMaxCost calculates the estimated cost for a given request type and amount
+func (table requestCostTable) getMaxCost(code, amount uint64) uint64 {
costs := table[code]
return costs.baseCost + amount*costs.reqCost
}
@@ -360,7 +432,7 @@ func (list RequestCostList) decode(protocolLength uint64) requestCostTable {
}
// testCostList returns a dummy request cost list used by tests
-func testCostList() RequestCostList {
+func testCostList(testCost uint64) RequestCostList {
cl := make(RequestCostList, len(reqAvgTimeCost))
var max uint64
for code := range reqAvgTimeCost {
@@ -372,7 +444,7 @@ func testCostList() RequestCostList {
for code := uint64(0); code <= max; code++ {
if _, ok := reqAvgTimeCost[code]; ok {
cl[i].MsgCode = code
- cl[i].BaseCost = 0
+ cl[i].BaseCost = testCost
cl[i].ReqCost = 0
i++
}
diff --git a/les/csvlogger/csvlogger.go b/les/csvlogger/csvlogger.go
new file mode 100644
index 000000000..9a4093cb9
--- /dev/null
+++ b/les/csvlogger/csvlogger.go
@@ -0,0 +1,227 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package csvlogger
+
+import (
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// Logger is a metrics/events logger that writes logged values and events into a comma separated file
+type Logger struct {
+ file *os.File
+ started mclock.AbsTime
+ channels []*Channel
+ period time.Duration
+ stopCh, stopped chan struct{}
+ storeCh chan string
+ eventHeader string
+}
+
+// NewLogger creates a new Logger
+func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger {
+ if fileName == "" {
+ return nil
+ }
+ f, err := os.Create(fileName)
+ if err != nil {
+ log.Error("Error creating log file", "name", fileName, "error", err)
+ return nil
+ }
+ return &Logger{
+ file: f,
+ period: updatePeriod,
+ stopCh: make(chan struct{}),
+ storeCh: make(chan string, 1),
+ eventHeader: eventHeader,
+ }
+}
+
+// NewChannel creates a new value logger channel that writes values in a single
+// column. If the relative change of the value is bigger than the given threshold
+// then a new line is added immediately (threshold can also be 0).
+func (l *Logger) NewChannel(name string, threshold float64) *Channel {
+ if l == nil {
+ return nil
+ }
+ c := &Channel{
+ logger: l,
+ name: name,
+ threshold: threshold,
+ }
+ l.channels = append(l.channels, c)
+ return c
+}
+
+// NewMinMaxChannel creates a new value logger channel that writes the minimum and
+// maximum of the tracked value in two columns. It never triggers adding a new line.
+// If zeroDefault is true then 0 is written to both min and max columns if no update
+// was given during the last period. If it is false then the last update will appear
+// in both columns.
+func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel {
+ if l == nil {
+ return nil
+ }
+ c := &Channel{
+ logger: l,
+ name: name,
+ minmax: true,
+ mmZeroDefault: zeroDefault,
+ }
+ l.channels = append(l.channels, c)
+ return c
+}
+
+func (l *Logger) store(event string) {
+ s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000)
+ for _, ch := range l.channels {
+ s += ", " + ch.store()
+ }
+ if event != "" {
+ s += ", " + event
+ }
+ l.file.WriteString(s + "\n")
+}
+
+// Start writes the header line and starts the logger
+func (l *Logger) Start() {
+ if l == nil {
+ return
+ }
+ l.started = mclock.Now()
+ s := "Time"
+ for _, ch := range l.channels {
+ s += ", " + ch.header()
+ }
+ if l.eventHeader != "" {
+ s += ", " + l.eventHeader
+ }
+ l.file.WriteString(s + "\n")
+ go func() {
+ timer := time.NewTimer(l.period)
+ for {
+ select {
+ case <-timer.C:
+ l.store("")
+ timer.Reset(l.period)
+ case event := <-l.storeCh:
+ l.store(event)
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Reset(l.period)
+ case <-l.stopCh:
+ close(l.stopped)
+ return
+ }
+ }
+ }()
+}
+
+// Stop stops the logger and closes the file
+func (l *Logger) Stop() {
+ if l == nil {
+ return
+ }
+ l.stopped = make(chan struct{})
+ close(l.stopCh)
+ <-l.stopped
+ l.file.Close()
+}
+
+// Event immediately adds a new line and adds the given event string in the last column
+func (l *Logger) Event(event string) {
+ if l == nil {
+ return
+ }
+ select {
+ case l.storeCh <- event:
+ case <-l.stopCh:
+ }
+}
+
+// Channel represents a logger channel tracking a single value
+type Channel struct {
+ logger *Logger
+ lock sync.Mutex
+ name string
+ threshold, storeMin, storeMax, lastValue, min, max float64
+ minmax, mmSet, mmZeroDefault bool
+}
+
+// Update updates the tracked value
+func (lc *Channel) Update(value float64) {
+ if lc == nil {
+ return
+ }
+ lc.lock.Lock()
+ defer lc.lock.Unlock()
+
+ lc.lastValue = value
+ if lc.minmax {
+ if value > lc.max || !lc.mmSet {
+ lc.max = value
+ }
+ if value < lc.min || !lc.mmSet {
+ lc.min = value
+ }
+ lc.mmSet = true
+ } else {
+ if value < lc.storeMin || value > lc.storeMax {
+ select {
+ case lc.logger.storeCh <- "":
+ default:
+ }
+ }
+ }
+}
+
+func (lc *Channel) store() (s string) {
+ lc.lock.Lock()
+ defer lc.lock.Unlock()
+
+ if lc.minmax {
+ s = fmt.Sprintf("%g, %g", lc.min, lc.max)
+ lc.mmSet = false
+ if lc.mmZeroDefault {
+ lc.min = 0
+ } else {
+ lc.min = lc.lastValue
+ }
+ lc.max = lc.min
+ } else {
+ s = fmt.Sprintf("%g", lc.lastValue)
+ lc.storeMin = lc.lastValue * (1 - lc.threshold)
+ lc.storeMax = lc.lastValue * (1 + lc.threshold)
+ if lc.lastValue < 0 {
+ lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin
+ }
+ }
+ return
+}
+
+func (lc *Channel) header() string {
+ if lc.minmax {
+ return lc.name + " (min), " + lc.name + " (max)"
+ }
+ return lc.name
+}
diff --git a/les/distributor.go b/les/distributor.go
index 1de267f27..9235adc03 100644
--- a/les/distributor.go
+++ b/les/distributor.go
@@ -62,9 +62,10 @@ type distReq struct {
canSend func(distPeer) bool
request func(distPeer) func()
- reqOrder uint64
- sentChn chan distPeer
- element *list.Element
+ reqOrder uint64
+ sentChn chan distPeer
+ element *list.Element
+ waitForPeers mclock.AbsTime
}
// newRequestDistributor creates a new request distributor
@@ -106,7 +107,11 @@ func (d *requestDistributor) registerTestPeer(p distPeer) {
// distMaxWait is the maximum waiting time after which further necessary waiting
// times are recalculated based on new feedback from the servers
-const distMaxWait = time.Millisecond * 10
+const distMaxWait = time.Millisecond * 50
+
+// waitForPeers is the time window in which a request does not fail even if it
+// has no suitable peers to send to at the moment
+const waitForPeers = time.Second * 3
// main event loop
func (d *requestDistributor) loop() {
@@ -179,8 +184,6 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
checkedPeers := make(map[distPeer]struct{})
elem := d.reqQueue.Front()
var (
- bestPeer distPeer
- bestReq *distReq
bestWait time.Duration
sel *weightedRandomSelect
)
@@ -188,9 +191,18 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
d.peerLock.RLock()
defer d.peerLock.RUnlock()
- for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+ peerCount := len(d.peers)
+ for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
+ now := d.clock.Now()
+ if req.waitForPeers > now {
+ canSend = true
+ wait := time.Duration(req.waitForPeers - now)
+ if bestWait == 0 || wait < bestWait {
+ bestWait = wait
+ }
+ }
for peer := range d.peers {
if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
canSend = true
@@ -202,9 +214,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
}
sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
} else {
- if bestReq == nil || wait < bestWait {
- bestPeer = peer
- bestReq = req
+ if bestWait == 0 || wait < bestWait {
bestWait = wait
}
}
@@ -223,7 +233,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
c := sel.choose().(selectPeerItem)
return c.peer, c.req, 0
}
- return bestPeer, bestReq, bestWait
+ return nil, nil, bestWait
}
// queue adds a request to the distribution queue, returns a channel where the
@@ -237,6 +247,7 @@ func (d *requestDistributor) queue(r *distReq) chan distPeer {
if r.reqOrder == 0 {
d.lastReqOrder++
r.reqOrder = d.lastReqOrder
+ r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers)
}
back := d.reqQueue.Back()
diff --git a/les/execqueue.go b/les/execqueue.go
index 614721bf0..e0c88a990 100644
--- a/les/execqueue.go
+++ b/les/execqueue.go
@@ -44,7 +44,7 @@ func (q *execQueue) loop() {
func (q *execQueue) waitNext(drop bool) (f func()) {
q.mu.Lock()
- if drop {
+ if drop && len(q.funcs) > 0 {
// Remove the function that just executed. We do this here instead of when
// dequeuing so len(q.funcs) includes the function that is running.
q.funcs = append(q.funcs[:0], q.funcs[1:]...)
@@ -84,6 +84,13 @@ func (q *execQueue) queue(f func()) bool {
return ok
}
+// clear drops all queued functions
+func (q *execQueue) clear() {
+ q.mu.Lock()
+ q.funcs = q.funcs[:0]
+ q.mu.Unlock()
+}
+
// quit stops the exec queue.
// quit waits for the current execution to finish before returning.
func (q *execQueue) quit() {
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go
index c03f673b2..490013677 100644
--- a/les/flowcontrol/control.go
+++ b/les/flowcontrol/control.go
@@ -56,11 +56,12 @@ type scheduledUpdate struct {
// (used in server mode only)
type ClientNode struct {
params ServerParams
- bufValue uint64
+ bufValue int64
lastTime mclock.AbsTime
updateSchedule []scheduledUpdate
sumCost uint64 // sum of req costs received from this client
accepted map[uint64]uint64 // value = sumCost after accepting the given req
+ connected bool
lock sync.Mutex
cm *ClientManager
log *logger
@@ -70,11 +71,12 @@ type ClientNode struct {
// NewClientNode returns a new ClientNode
func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode {
node := &ClientNode{
- cm: cm,
- params: params,
- bufValue: params.BufLimit,
- lastTime: cm.clock.Now(),
- accepted: make(map[uint64]uint64),
+ cm: cm,
+ params: params,
+ bufValue: int64(params.BufLimit),
+ lastTime: cm.clock.Now(),
+ accepted: make(map[uint64]uint64),
+ connected: true,
}
if keepLogs > 0 {
node.log = newLogger(keepLogs)
@@ -85,9 +87,55 @@ func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode {
// Disconnect should be called when a client is disconnected
func (node *ClientNode) Disconnect() {
+ node.lock.Lock()
+ defer node.lock.Unlock()
+
+ node.connected = false
node.cm.disconnect(node)
}
+// BufferStatus returns the current buffer value and limit
+func (node *ClientNode) BufferStatus() (uint64, uint64) {
+ node.lock.Lock()
+ defer node.lock.Unlock()
+
+ if !node.connected {
+ return 0, 0
+ }
+ now := node.cm.clock.Now()
+ node.update(now)
+ node.cm.updateBuffer(node, 0, now)
+ bv := node.bufValue
+ if bv < 0 {
+ bv = 0
+ }
+ return uint64(bv), node.params.BufLimit
+}
+
+// OneTimeCost subtracts the given amount from the node's buffer.
+//
+// Note: this call can take the buffer into the negative region internally.
+// In this case zero buffer value is returned by exported calls and no requests
+// are accepted.
+func (node *ClientNode) OneTimeCost(cost uint64) {
+ node.lock.Lock()
+ defer node.lock.Unlock()
+
+ now := node.cm.clock.Now()
+ node.update(now)
+ node.bufValue -= int64(cost)
+ node.cm.updateBuffer(node, -int64(cost), now)
+}
+
+// Freeze notifies the client manager about a client freeze event in which case
+// the total capacity allowance is slightly reduced.
+func (node *ClientNode) Freeze() {
+ node.lock.Lock()
+ frozenCap := node.params.MinRecharge
+ node.lock.Unlock()
+ node.cm.reduceTotalCapacity(frozenCap)
+}
+
// update recalculates the buffer value at a specified time while also performing
// scheduled flow control parameter updates if necessary
func (node *ClientNode) update(now mclock.AbsTime) {
@@ -105,9 +153,9 @@ func (node *ClientNode) recalcBV(now mclock.AbsTime) {
if now < node.lastTime {
dt = 0
}
- node.bufValue += node.params.MinRecharge * dt / uint64(fcTimeConst)
- if node.bufValue > node.params.BufLimit {
- node.bufValue = node.params.BufLimit
+ node.bufValue += int64(node.params.MinRecharge * dt / uint64(fcTimeConst))
+ if node.bufValue > int64(node.params.BufLimit) {
+ node.bufValue = int64(node.params.BufLimit)
}
if node.log != nil {
node.log.add(now, fmt.Sprintf("updated bv=%d MRR=%d BufLimit=%d", node.bufValue, node.params.MinRecharge, node.params.BufLimit))
@@ -139,11 +187,11 @@ func (node *ClientNode) UpdateParams(params ServerParams) {
// updateParams updates the flow control parameters of the node
func (node *ClientNode) updateParams(params ServerParams, now mclock.AbsTime) {
- diff := params.BufLimit - node.params.BufLimit
- if int64(diff) > 0 {
+ diff := int64(params.BufLimit - node.params.BufLimit)
+ if diff > 0 {
node.bufValue += diff
- } else if node.bufValue > params.BufLimit {
- node.bufValue = params.BufLimit
+ } else if node.bufValue > int64(params.BufLimit) {
+ node.bufValue = int64(params.BufLimit)
}
node.cm.updateParams(node, params, now)
}
@@ -157,14 +205,14 @@ func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bo
now := node.cm.clock.Now()
node.update(now)
- if maxCost > node.bufValue {
+ if int64(maxCost) > node.bufValue {
if node.log != nil {
node.log.add(now, fmt.Sprintf("rejected reqID=%d bv=%d maxCost=%d", reqID, node.bufValue, maxCost))
node.log.dump(now)
}
- return false, maxCost - node.bufValue, 0
+ return false, maxCost - uint64(node.bufValue), 0
}
- node.bufValue -= maxCost
+ node.bufValue -= int64(maxCost)
node.sumCost += maxCost
if node.log != nil {
node.log.add(now, fmt.Sprintf("accepted reqID=%d bv=%d maxCost=%d sumCost=%d", reqID, node.bufValue, maxCost, node.sumCost))
@@ -174,19 +222,22 @@ func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bo
}
// RequestProcessed should be called when the request has been processed
-func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) (bv uint64) {
+func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64 {
node.lock.Lock()
defer node.lock.Unlock()
now := node.cm.clock.Now()
node.update(now)
node.cm.processed(node, maxCost, realCost, now)
- bv = node.bufValue + node.sumCost - node.accepted[index]
+ bv := node.bufValue + int64(node.sumCost-node.accepted[index])
if node.log != nil {
node.log.add(now, fmt.Sprintf("processed reqID=%d bv=%d maxCost=%d realCost=%d sumCost=%d oldSumCost=%d reportedBV=%d", reqID, node.bufValue, maxCost, realCost, node.sumCost, node.accepted[index], bv))
}
delete(node.accepted, index)
- return
+ if bv < 0 {
+ return 0
+ }
+ return uint64(bv)
}
// ServerNode is the flow control system's representation of a server
@@ -345,6 +396,28 @@ func (node *ServerNode) ReceivedReply(reqID, bv uint64) {
}
}
+// ResumeFreeze cleans all pending requests and sets the buffer estimate to the
+// reported value after resuming from a frozen state
+func (node *ServerNode) ResumeFreeze(bv uint64) {
+ node.lock.Lock()
+ defer node.lock.Unlock()
+
+ for reqID := range node.pending {
+ delete(node.pending, reqID)
+ }
+ now := node.clock.Now()
+ node.recalcBLE(now)
+ if bv > node.params.BufLimit {
+ bv = node.params.BufLimit
+ }
+ node.bufEstimate = bv
+ node.bufRecharge = node.bufEstimate < node.params.BufLimit
+ node.lastTime = now
+ if node.log != nil {
+ node.log.add(now, fmt.Sprintf("unfreeze bv=%d sumCost=%d", bv, node.sumCost))
+ }
+}
+
// DumpLogs dumps the event log if logging is used
func (node *ServerNode) DumpLogs() {
node.lock.Lock()
diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go
index 532e6a405..68f1a47c9 100644
--- a/les/flowcontrol/manager.go
+++ b/les/flowcontrol/manager.go
@@ -47,9 +47,9 @@ type cmNodeFields struct {
const FixedPointMultiplier = 1000000
var (
- capFactorDropTC = 1 / float64(time.Second*10) // time constant for dropping the capacity factor
- capFactorRaiseTC = 1 / float64(time.Hour) // time constant for raising the capacity factor
- capFactorRaiseThreshold = 0.75 // connected / total capacity ratio threshold for raising the capacity factor
+ capacityDropFactor = 0.1
+ capacityRaiseTC = 1 / (3 * float64(time.Hour)) // time constant for raising the capacity factor
+ capacityRaiseThresholdRatio = 1.125 // total/connected capacity ratio threshold for raising the capacity factor
)
// ClientManager controls the capacity assigned to the clients of a server.
@@ -61,10 +61,14 @@ type ClientManager struct {
clock mclock.Clock
lock sync.Mutex
enabledCh chan struct{}
+ stop chan chan struct{}
curve PieceWiseLinear
sumRecharge, totalRecharge, totalConnected uint64
- capLogFactor, totalCapacity float64
+ logTotalCap, totalCapacity float64
+ logTotalCapRaiseLimit float64
+ minLogTotalCap, maxLogTotalCap float64
+ capacityRaiseThreshold uint64
capLastUpdate mclock.AbsTime
totalCapacityCh chan uint64
@@ -106,13 +110,35 @@ func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager
clock: clock,
rcQueue: prque.New(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }),
capLastUpdate: clock.Now(),
+ stop: make(chan chan struct{}),
}
if curve != nil {
cm.SetRechargeCurve(curve)
}
+ go func() {
+ // regularly recalculate and update total capacity
+ for {
+ select {
+ case <-time.After(time.Minute):
+ cm.lock.Lock()
+ cm.updateTotalCapacity(cm.clock.Now(), true)
+ cm.lock.Unlock()
+ case stop := <-cm.stop:
+ close(stop)
+ return
+ }
+ }
+ }()
return cm
}
+// Stop stops the client manager
+func (cm *ClientManager) Stop() {
+ stop := make(chan struct{})
+ cm.stop <- stop
+ <-stop
+}
+
// SetRechargeCurve updates the recharge curve
func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear) {
cm.lock.Lock()
@@ -120,13 +146,29 @@ func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear) {
now := cm.clock.Now()
cm.updateRecharge(now)
- cm.updateCapFactor(now, false)
cm.curve = curve
if len(curve) > 0 {
cm.totalRecharge = curve[len(curve)-1].Y
} else {
cm.totalRecharge = 0
}
+}
+
+// SetCapacityRaiseThreshold sets a threshold value used for raising capFactor.
+// Either if the difference between total allowed and connected capacity is less
+// than this threshold or if their ratio is less than capacityRaiseThresholdRatio
+// then capFactor is allowed to slowly raise.
+func (cm *ClientManager) SetCapacityLimits(min, max, raiseThreshold uint64) {
+ if min < 1 {
+ min = 1
+ }
+ cm.minLogTotalCap = math.Log(float64(min))
+ if max < 1 {
+ max = 1
+ }
+ cm.maxLogTotalCap = math.Log(float64(max))
+ cm.logTotalCap = cm.maxLogTotalCap
+ cm.capacityRaiseThreshold = raiseThreshold
cm.refreshCapacity()
}
@@ -141,8 +183,9 @@ func (cm *ClientManager) connect(node *ClientNode) {
node.corrBufValue = int64(node.params.BufLimit)
node.rcLastIntValue = cm.rcLastIntValue
node.queueIndex = -1
- cm.updateCapFactor(now, true)
+ cm.updateTotalCapacity(now, true)
cm.totalConnected += node.params.MinRecharge
+ cm.updateRaiseLimit()
}
// disconnect should be called when a client is disconnected
@@ -152,8 +195,9 @@ func (cm *ClientManager) disconnect(node *ClientNode) {
now := cm.clock.Now()
cm.updateRecharge(cm.clock.Now())
- cm.updateCapFactor(now, true)
+ cm.updateTotalCapacity(now, true)
cm.totalConnected -= node.params.MinRecharge
+ cm.updateRaiseLimit()
}
// accepted is called when a request with given maximum cost is accepted.
@@ -174,18 +218,24 @@ func (cm *ClientManager) accepted(node *ClientNode, maxCost uint64, now mclock.A
//
// Note: processed should always be called for all accepted requests
func (cm *ClientManager) processed(node *ClientNode, maxCost, realCost uint64, now mclock.AbsTime) {
- cm.lock.Lock()
- defer cm.lock.Unlock()
-
if realCost > maxCost {
realCost = maxCost
}
- cm.updateNodeRc(node, int64(maxCost-realCost), &node.params, now)
- if uint64(node.corrBufValue) > node.bufValue {
+ cm.updateBuffer(node, int64(maxCost-realCost), now)
+}
+
+// updateBuffer recalulates the corrected buffer value, adds the given value to it
+// and updates the node's actual buffer value if possible
+func (cm *ClientManager) updateBuffer(node *ClientNode, add int64, now mclock.AbsTime) {
+ cm.lock.Lock()
+ defer cm.lock.Unlock()
+
+ cm.updateNodeRc(node, add, &node.params, now)
+ if node.corrBufValue > node.bufValue {
if node.log != nil {
node.log.add(now, fmt.Sprintf("corrected bv=%d oldBv=%d", node.corrBufValue, node.bufValue))
}
- node.bufValue = uint64(node.corrBufValue)
+ node.bufValue = node.corrBufValue
}
}
@@ -195,11 +245,30 @@ func (cm *ClientManager) updateParams(node *ClientNode, params ServerParams, now
defer cm.lock.Unlock()
cm.updateRecharge(now)
- cm.updateCapFactor(now, true)
+ cm.updateTotalCapacity(now, true)
cm.totalConnected += params.MinRecharge - node.params.MinRecharge
+ cm.updateRaiseLimit()
cm.updateNodeRc(node, 0, &params, now)
}
+// updateRaiseLimit recalculates the limiting value until which logTotalCap
+// can be raised when no client freeze events occur
+func (cm *ClientManager) updateRaiseLimit() {
+ if cm.capacityRaiseThreshold == 0 {
+ cm.logTotalCapRaiseLimit = 0
+ return
+ }
+ limit := float64(cm.totalConnected + cm.capacityRaiseThreshold)
+ limit2 := float64(cm.totalConnected) * capacityRaiseThresholdRatio
+ if limit2 > limit {
+ limit = limit2
+ }
+ if limit < 1 {
+ limit = 1
+ }
+ cm.logTotalCapRaiseLimit = math.Log(limit)
+}
+
// updateRecharge updates the recharge integrator and checks the recharge queue
// for nodes with recently filled buffers
func (cm *ClientManager) updateRecharge(now mclock.AbsTime) {
@@ -208,9 +277,15 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) {
// updating is done in multiple steps if node buffers are filled and sumRecharge
// is decreased before the given target time
for cm.sumRecharge > 0 {
- bonusRatio := cm.curve.ValueAt(cm.sumRecharge) / float64(cm.sumRecharge)
- if bonusRatio < 1 {
- bonusRatio = 1
+ sumRecharge := cm.sumRecharge
+ if sumRecharge > cm.totalRecharge {
+ sumRecharge = cm.totalRecharge
+ }
+ bonusRatio := float64(1)
+ v := cm.curve.ValueAt(sumRecharge)
+ s := float64(sumRecharge)
+ if v > s && s > 0 {
+ bonusRatio = v / s
}
dt := now - lastUpdate
// fetch the client that finishes first
@@ -228,7 +303,6 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) {
// finished recharging, update corrBufValue and sumRecharge if necessary and do next step
if rcqNode.corrBufValue < int64(rcqNode.params.BufLimit) {
rcqNode.corrBufValue = int64(rcqNode.params.BufLimit)
- cm.updateCapFactor(lastUpdate, true)
cm.sumRecharge -= rcqNode.params.MinRecharge
}
cm.rcLastIntValue = rcqNode.rcFullIntValue
@@ -249,9 +323,6 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve
node.rcLastIntValue = cm.rcLastIntValue
}
node.corrBufValue += bvc
- if node.corrBufValue < 0 {
- node.corrBufValue = 0
- }
diff := int64(params.BufLimit - node.params.BufLimit)
if diff > 0 {
node.corrBufValue += diff
@@ -261,15 +332,14 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve
node.corrBufValue = int64(params.BufLimit)
isFull = true
}
- sumRecharge := cm.sumRecharge
if !wasFull {
- sumRecharge -= node.params.MinRecharge
+ cm.sumRecharge -= node.params.MinRecharge
}
if params != &node.params {
node.params = *params
}
if !isFull {
- sumRecharge += node.params.MinRecharge
+ cm.sumRecharge += node.params.MinRecharge
if node.queueIndex != -1 {
cm.rcQueue.Remove(node.queueIndex)
}
@@ -277,63 +347,54 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve
node.rcFullIntValue = cm.rcLastIntValue + (int64(node.params.BufLimit)-node.corrBufValue)*FixedPointMultiplier/int64(node.params.MinRecharge)
cm.rcQueue.Push(node, -node.rcFullIntValue)
}
- if sumRecharge != cm.sumRecharge {
- cm.updateCapFactor(now, true)
- cm.sumRecharge = sumRecharge
- }
+}
+// reduceTotalCapacity reduces the total capacity allowance in case of a client freeze event
+func (cm *ClientManager) reduceTotalCapacity(frozenCap uint64) {
+ cm.lock.Lock()
+ defer cm.lock.Unlock()
+
+ ratio := float64(1)
+ if frozenCap < cm.totalConnected {
+ ratio = float64(frozenCap) / float64(cm.totalConnected)
+ }
+ now := cm.clock.Now()
+ cm.updateTotalCapacity(now, false)
+ cm.logTotalCap -= capacityDropFactor * ratio
+ if cm.logTotalCap < cm.minLogTotalCap {
+ cm.logTotalCap = cm.minLogTotalCap
+ }
+ cm.updateTotalCapacity(now, true)
}
-// updateCapFactor updates the total capacity factor. The capacity factor allows
+// updateTotalCapacity updates the total capacity factor. The capacity factor allows
// the total capacity of the system to go over the allowed total recharge value
-// if the sum of momentarily recharging clients only exceeds the total recharge
-// allowance in a very small fraction of time.
-// The capacity factor is dropped quickly (with a small time constant) if sumRecharge
-// exceeds totalRecharge. It is raised slowly (with a large time constant) if most
-// of the total capacity is used by connected clients (totalConnected is larger than
-// totalCapacity*capFactorRaiseThreshold) and sumRecharge stays under
-// totalRecharge*totalConnected/totalCapacity.
-func (cm *ClientManager) updateCapFactor(now mclock.AbsTime, refresh bool) {
- if cm.totalRecharge == 0 {
- return
- }
+// if clients go to frozen state sufficiently rarely.
+// The capacity factor is dropped instantly by a small amount if a clients is frozen.
+// It is raised slowly (with a large time constant) if the total connected capacity
+// is close to the total allowed amount and no clients are frozen.
+func (cm *ClientManager) updateTotalCapacity(now mclock.AbsTime, refresh bool) {
dt := now - cm.capLastUpdate
cm.capLastUpdate = now
- var d float64
- if cm.sumRecharge > cm.totalRecharge {
- d = (1 - float64(cm.sumRecharge)/float64(cm.totalRecharge)) * capFactorDropTC
- } else {
- totalConnected := float64(cm.totalConnected)
- var connRatio float64
- if totalConnected < cm.totalCapacity {
- connRatio = totalConnected / cm.totalCapacity
- } else {
- connRatio = 1
- }
- if connRatio > capFactorRaiseThreshold {
- sumRecharge := float64(cm.sumRecharge)
- limit := float64(cm.totalRecharge) * connRatio
- if sumRecharge < limit {
- d = (1 - sumRecharge/limit) * (connRatio - capFactorRaiseThreshold) * (1 / (1 - capFactorRaiseThreshold)) * capFactorRaiseTC
- }
+ if cm.logTotalCap < cm.logTotalCapRaiseLimit {
+ cm.logTotalCap += capacityRaiseTC * float64(dt)
+ if cm.logTotalCap > cm.logTotalCapRaiseLimit {
+ cm.logTotalCap = cm.logTotalCapRaiseLimit
}
}
- if d != 0 {
- cm.capLogFactor += d * float64(dt)
- if cm.capLogFactor < 0 {
- cm.capLogFactor = 0
- }
- if refresh {
- cm.refreshCapacity()
- }
+ if cm.logTotalCap > cm.maxLogTotalCap {
+ cm.logTotalCap = cm.maxLogTotalCap
+ }
+ if refresh {
+ cm.refreshCapacity()
}
}
// refreshCapacity recalculates the total capacity value and sends an update to the subscription
// channel if the relative change of the value since the last update is more than 0.1 percent
func (cm *ClientManager) refreshCapacity() {
- totalCapacity := float64(cm.totalRecharge) * math.Exp(cm.capLogFactor)
+ totalCapacity := math.Exp(cm.logTotalCap)
if totalCapacity >= cm.totalCapacity*0.999 && totalCapacity <= cm.totalCapacity*1.001 {
return
}
diff --git a/les/flowcontrol/manager_test.go b/les/flowcontrol/manager_test.go
index 4e7746d40..b32ec5599 100644
--- a/les/flowcontrol/manager_test.go
+++ b/les/flowcontrol/manager_test.go
@@ -63,7 +63,7 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random
}
m := NewClientManager(PieceWiseLinear{{0, totalCapacity}}, clock)
for _, n := range nodes {
- n.bufLimit = n.capacity * 6000 //uint64(2000+rand.Intn(10000))
+ n.bufLimit = n.capacity * 6000
n.node = NewClientNode(m, ServerParams{BufLimit: n.bufLimit, MinRecharge: n.capacity})
}
maxNodes := make([]int, maxCapacityNodes)
@@ -73,6 +73,7 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random
maxNodes[i] = rand.Intn(nodeCount)
}
+ var sendCount int
for i := 0; i < testLength; i++ {
now := clock.Now()
for _, idx := range maxNodes {
@@ -83,13 +84,15 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random
maxNodes[rand.Intn(maxCapacityNodes)] = rand.Intn(nodeCount)
}
- sendCount := randomSend
- for sendCount > 0 {
+ sendCount += randomSend
+ failCount := randomSend * 10
+ for sendCount > 0 && failCount > 0 {
if nodes[rand.Intn(nodeCount)].send(t, now) {
sendCount--
+ } else {
+ failCount--
}
}
-
clock.Run(time.Millisecond)
}
@@ -117,7 +120,6 @@ func (n *testNode) send(t *testing.T, now mclock.AbsTime) bool {
if bv < testMaxCost {
n.waitUntil = now + mclock.AbsTime((testMaxCost-bv)*1001000/n.capacity)
}
- //n.waitUntil = now + mclock.AbsTime(float64(testMaxCost)*1001000/float64(n.capacity)*(1-float64(bv)/float64(n.bufLimit)))
n.totalCost += rcost
return true
}
diff --git a/les/freeclient.go b/les/freeclient.go
index d859337c2..958b7daa7 100644
--- a/les/freeclient.go
+++ b/les/freeclient.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -52,6 +53,8 @@ type freeClientPool struct {
connectedLimit, totalLimit int
freeClientCap uint64
+ logger *csvlogger.Logger
+ logTotalFreeConn *csvlogger.Channel
addressMap map[string]*freeClientPoolEntry
connPool, disconnPool *prque.Prque
@@ -66,16 +69,18 @@ const (
)
// newFreeClientPool creates a new free client pool
-func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool {
+func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string), metricsLogger, eventLogger *csvlogger.Logger) *freeClientPool {
pool := &freeClientPool{
- db: db,
- clock: clock,
- addressMap: make(map[string]*freeClientPoolEntry),
- connPool: prque.New(poolSetIndex),
- disconnPool: prque.New(poolSetIndex),
- freeClientCap: freeClientCap,
- totalLimit: totalLimit,
- removePeer: removePeer,
+ db: db,
+ clock: clock,
+ addressMap: make(map[string]*freeClientPoolEntry),
+ connPool: prque.New(poolSetIndex),
+ disconnPool: prque.New(poolSetIndex),
+ freeClientCap: freeClientCap,
+ totalLimit: totalLimit,
+ logger: eventLogger,
+ logTotalFreeConn: metricsLogger.NewChannel("totalFreeConn", 0),
+ removePeer: removePeer,
}
pool.loadFromDb()
return pool
@@ -88,10 +93,25 @@ func (f *freeClientPool) stop() {
f.lock.Unlock()
}
+// freeClientId returns a string identifier for the peer. Multiple peers with the
+// same identifier can not be in the free client pool simultaneously.
+func freeClientId(p *peer) string {
+ if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
+ if addr.IP.IsLoopback() {
+ // using peer id instead of loopback ip address allows multiple free
+ // connections from local machine to own server
+ return p.id
+ } else {
+ return addr.IP.String()
+ }
+ }
+ return ""
+}
+
// registerPeer implements clientPool
func (f *freeClientPool) registerPeer(p *peer) {
- if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
- if !f.connect(addr.IP.String(), p.id) {
+ if freeId := freeClientId(p); freeId != "" {
+ if !f.connect(freeId, p.id) {
f.removePeer(p.id)
}
}
@@ -107,7 +127,9 @@ func (f *freeClientPool) connect(address, id string) bool {
return false
}
+ f.logger.Event("freeClientPool: connecting from " + address + ", " + id)
if f.connectedLimit == 0 {
+ f.logger.Event("freeClientPool: rejected, " + id)
log.Debug("Client rejected", "address", address)
return false
}
@@ -119,6 +141,7 @@ func (f *freeClientPool) connect(address, id string) bool {
f.addressMap[address] = e
} else {
if e.connected {
+ f.logger.Event("freeClientPool: already connected, " + id)
log.Debug("Client already connected", "address", address)
return false
}
@@ -131,9 +154,11 @@ func (f *freeClientPool) connect(address, id string) bool {
if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
// kick it out and accept the new client
f.dropClient(i, now)
+ f.logger.Event("freeClientPool: kicked out, " + i.id)
} else {
// keep the old client and reject the new one
f.connPool.Push(i, i.linUsage)
+ f.logger.Event("freeClientPool: rejected, " + id)
log.Debug("Client rejected", "address", address)
return false
}
@@ -142,17 +167,19 @@ func (f *freeClientPool) connect(address, id string) bool {
e.connected = true
e.id = id
f.connPool.Push(e, e.linUsage)
+ f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
f.disconnPool.Pop()
}
+ f.logger.Event("freeClientPool: accepted, " + id)
log.Debug("Client accepted", "address", address)
return true
}
// unregisterPeer implements clientPool
func (f *freeClientPool) unregisterPeer(p *peer) {
- if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
- f.disconnect(addr.IP.String())
+ if freeId := freeClientId(p); freeId != "" {
+ f.disconnect(freeId)
}
}
@@ -174,9 +201,11 @@ func (f *freeClientPool) disconnect(address string) {
}
f.connPool.Remove(e.index)
+ f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
f.calcLogUsage(e, now)
e.connected = false
f.disconnPool.Push(e, -e.logUsage)
+ f.logger.Event("freeClientPool: disconnected, " + e.id)
log.Debug("Client disconnected", "address", address)
}
@@ -194,6 +223,7 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) {
for f.connPool.Size() > f.connectedLimit {
i := f.connPool.PopItem().(*freeClientPoolEntry)
f.dropClient(i, now)
+ f.logger.Event("freeClientPool: setLimits kicked out, " + i.id)
}
}
@@ -201,6 +231,7 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) {
// disconnected pool
func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) {
f.connPool.Remove(i.index)
+ f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
f.calcLogUsage(i, now)
i.connected = false
f.disconnPool.Push(i, -i.logUsage)
diff --git a/les/freeclient_test.go b/les/freeclient_test.go
index 191822264..5a58a6c1c 100644
--- a/les/freeclient_test.go
+++ b/les/freeclient_test.go
@@ -61,7 +61,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
}
disconnCh <- i
}
- pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
+ pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil)
)
pool.setLimits(connLimit, uint64(connLimit))
@@ -130,7 +130,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
// close and restart pool
pool.stop()
- pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
+ pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil)
pool.setLimits(connLimit, uint64(connLimit))
// try connecting all known peers (connLimit should be filled up)
diff --git a/les/handler.go b/les/handler.go
index f53a4722f..59bfd81cd 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
@@ -118,6 +119,7 @@ type ProtocolManager struct {
wg *sync.WaitGroup
eventMux *event.TypeMux
+ logger *csvlogger.Logger
// Callbacks
synced func() bool
@@ -165,8 +167,6 @@ func NewProtocolManager(
if odr != nil {
manager.retriever = odr.retriever
manager.reqDist = odr.retriever.dist
- } else {
- manager.servingQueue = newServingQueue(int64(time.Millisecond * 10))
}
if ulcConfig != nil {
@@ -272,6 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
// In server mode we try to check into the client pool after handshake
if pm.client && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
+ pm.logger.Event("Rejected (too many peers), " + p.id)
return p2p.DiscTooManyPeers
}
// Reject light clients if server is not synced.
@@ -290,6 +291,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
)
if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
+ pm.logger.Event("Handshake error: " + err.Error() + ", " + p.id)
return err
}
if p.fcClient != nil {
@@ -303,9 +305,12 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
+ pm.logger.Event("Peer registration error: " + err.Error() + ", " + p.id)
return err
}
+ pm.logger.Event("Connection established, " + p.id)
defer func() {
+ pm.logger.Event("Closed connection, " + p.id)
pm.removePeer(p.id)
}()
@@ -326,6 +331,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
+ pm.logger.Event("Message handling error: " + err.Error() + ", " + p.id)
p.Log().Debug("Light Ethereum message handling failed", "err", err)
if p.fcServer != nil {
p.fcServer.DumpLogs()
@@ -358,23 +364,40 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
)
accept := func(reqID, reqCnt, maxCnt uint64) bool {
- if reqCnt == 0 {
- return false
+ inSizeCost := func() uint64 {
+ if pm.server.costTracker != nil {
+ return pm.server.costTracker.realCost(0, msg.Size, 0)
+ }
+ return 0
}
- if p.fcClient == nil || reqCnt > maxCnt {
+ if p.isFrozen() || reqCnt == 0 || p.fcClient == nil || reqCnt > maxCnt {
+ p.fcClient.OneTimeCost(inSizeCost())
return false
}
- maxCost = p.fcCosts.getCost(msg.Code, reqCnt)
+ maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
+ gf := float64(1)
+ if pm.server.costTracker != nil {
+ gf = pm.server.costTracker.globalFactor()
+ if gf < 0.001 {
+ p.Log().Error("Invalid global cost factor", "globalFactor", gf)
+ gf = 1
+ }
+ }
+ maxTime := uint64(float64(maxCost) / gf)
if accepted, bufShort, servingPriority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost); !accepted {
- if bufShort > 0 {
- p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
- }
+ p.freezeClient()
+ p.Log().Warn("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
+ p.fcClient.OneTimeCost(inSizeCost())
return false
} else {
- task = pm.servingQueue.newTask(servingPriority)
+ task = pm.servingQueue.newTask(p, maxTime, servingPriority)
+ }
+ if task.start() {
+ return true
}
- return task.start()
+ p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost())
+ return false
}
if msg.Size > ProtocolMaxMsgSize {
@@ -388,6 +411,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.responseLock.Lock()
defer p.responseLock.Unlock()
+ if p.isFrozen() {
+ amount = 0
+ reply = nil
+ }
var replySize uint32
if reply != nil {
replySize = reply.size()
@@ -395,7 +422,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var realCost uint64
if pm.server.costTracker != nil {
realCost = pm.server.costTracker.realCost(servingTime, msg.Size, replySize)
- pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
+ if amount != 0 {
+ pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
+ }
} else {
realCost = maxCost
}
@@ -463,94 +492,94 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
query := req.Query
- if !accept(req.ReqID, query.Amount, MaxHeaderFetch) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- hashMode := query.Origin.Hash != (common.Hash{})
- first := true
- maxNonCanonical := uint64(100)
-
- // Gather headers until the fetch or network limits is reached
- var (
- bytes common.StorageSize
- headers []*types.Header
- unknown bool
- )
- for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
- if !first && !task.waitOrStop() {
- return
- }
- // Retrieve the next header satisfying the query
- var origin *types.Header
- if hashMode {
- if first {
- origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
- if origin != nil {
- query.Origin.Number = origin.Number.Uint64()
+ if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
+ go func() {
+ hashMode := query.Origin.Hash != (common.Hash{})
+ first := true
+ maxNonCanonical := uint64(100)
+
+ // Gather headers until the fetch or network limits is reached
+ var (
+ bytes common.StorageSize
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
+ if !first && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
+ }
+ // Retrieve the next header satisfying the query
+ var origin *types.Header
+ if hashMode {
+ if first {
+ origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
+ if origin != nil {
+ query.Origin.Number = origin.Number.Uint64()
+ }
+ } else {
+ origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
- origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
+ origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
- } else {
- origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
- }
- if origin == nil {
- break
- }
- headers = append(headers, origin)
- bytes += estHeaderRlpSize
-
- // Advance to the next header of the query
- switch {
- case hashMode && query.Reverse:
- // Hash based traversal towards the genesis block
- ancestor := query.Skip + 1
- if ancestor == 0 {
- unknown = true
- } else {
- query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
- unknown = (query.Origin.Hash == common.Hash{})
+ if origin == nil {
+ break
}
- case hashMode && !query.Reverse:
- // Hash based traversal towards the leaf block
- var (
- current = origin.Number.Uint64()
- next = current + query.Skip + 1
- )
- if next <= current {
- infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
- p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
- unknown = true
- } else {
- if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
- nextHash := header.Hash()
- expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
- if expOldHash == query.Origin.Hash {
- query.Origin.Hash, query.Origin.Number = nextHash, next
+ headers = append(headers, origin)
+ bytes += estHeaderRlpSize
+
+ // Advance to the next header of the query
+ switch {
+ case hashMode && query.Reverse:
+ // Hash based traversal towards the genesis block
+ ancestor := query.Skip + 1
+ if ancestor == 0 {
+ unknown = true
+ } else {
+ query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
+ unknown = (query.Origin.Hash == common.Hash{})
+ }
+ case hashMode && !query.Reverse:
+ // Hash based traversal towards the leaf block
+ var (
+ current = origin.Number.Uint64()
+ next = current + query.Skip + 1
+ )
+ if next <= current {
+ infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
+ p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
+ unknown = true
+ } else {
+ if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
+ nextHash := header.Hash()
+ expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
+ if expOldHash == query.Origin.Hash {
+ query.Origin.Hash, query.Origin.Number = nextHash, next
+ } else {
+ unknown = true
+ }
} else {
unknown = true
}
+ }
+ case query.Reverse:
+ // Number based traversal towards the genesis block
+ if query.Origin.Number >= query.Skip+1 {
+ query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}
- }
- case query.Reverse:
- // Number based traversal towards the genesis block
- if query.Origin.Number >= query.Skip+1 {
- query.Origin.Number -= query.Skip + 1
- } else {
- unknown = true
- }
- case !query.Reverse:
- // Number based traversal towards the leaf block
- query.Origin.Number += query.Skip + 1
+ case !query.Reverse:
+ // Number based traversal towards the leaf block
+ query.Origin.Number += query.Skip + 1
+ }
+ first = false
}
- first = false
- }
- sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done())
- }()
+ sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done())
+ }()
+ }
case BlockHeadersMsg:
if pm.downloader == nil {
@@ -592,27 +621,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bodies []rlp.RawValue
)
reqCnt := len(req.Hashes)
- if !accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- for i, hash := range req.Hashes {
- if i != 0 && !task.waitOrStop() {
- return
- }
- if bytes >= softResponseLimit {
- break
- }
- // Retrieve the requested block body, stopping if enough was found
- if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
- if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
- bodies = append(bodies, data)
- bytes += len(data)
+ if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
+ go func() {
+ for i, hash := range req.Hashes {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
+ }
+ if bytes >= softResponseLimit {
+ break
+ }
+ // Retrieve the requested block body, stopping if enough was found
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
+ if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
+ bodies = append(bodies, data)
+ bytes += len(data)
+ }
}
}
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyBlockBodiesRLP(req.ReqID, bodies), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyBlockBodiesRLP(req.ReqID, bodies), task.done())
+ }()
+ }
case BlockBodiesMsg:
if pm.odr == nil {
@@ -651,45 +680,45 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
data [][]byte
)
reqCnt := len(req.Reqs)
- if !accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- for i, req := range req.Reqs {
- if i != 0 && !task.waitOrStop() {
- return
- }
- // Look up the root hash belonging to the request
- number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash)
- if number == nil {
- p.Log().Warn("Failed to retrieve block num for code", "hash", req.BHash)
- continue
- }
- header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number)
- if header == nil {
- p.Log().Warn("Failed to retrieve header for code", "block", *number, "hash", req.BHash)
- continue
- }
- triedb := pm.blockchain.StateCache().TrieDB()
+ if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
+ go func() {
+ for i, request := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
+ }
+ // Look up the root hash belonging to the request
+ number := rawdb.ReadHeaderNumber(pm.chainDb, request.BHash)
+ if number == nil {
+ p.Log().Warn("Failed to retrieve block num for code", "hash", request.BHash)
+ continue
+ }
+ header := rawdb.ReadHeader(pm.chainDb, request.BHash, *number)
+ if header == nil {
+ p.Log().Warn("Failed to retrieve header for code", "block", *number, "hash", request.BHash)
+ continue
+ }
+ triedb := pm.blockchain.StateCache().TrieDB()
- account, err := pm.getAccount(triedb, header.Root, common.BytesToHash(req.AccKey))
- if err != nil {
- p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "err", err)
- continue
- }
- code, err := triedb.Node(common.BytesToHash(account.CodeHash))
- if err != nil {
- p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
- continue
- }
- // Accumulate the code and abort if enough data was retrieved
- data = append(data, code)
- if bytes += len(code); bytes >= softResponseLimit {
- break
+ account, err := pm.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey))
+ if err != nil {
+ p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
+ continue
+ }
+ code, err := triedb.Node(common.BytesToHash(account.CodeHash))
+ if err != nil {
+ p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
+ continue
+ }
+ // Accumulate the code and abort if enough data was retrieved
+ data = append(data, code)
+ if bytes += len(code); bytes >= softResponseLimit {
+ break
+ }
}
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyCode(req.ReqID, data), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyCode(req.ReqID, data), task.done())
+ }()
+ }
case CodeMsg:
if pm.odr == nil {
@@ -728,37 +757,37 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
receipts []rlp.RawValue
)
reqCnt := len(req.Hashes)
- if !accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- for i, hash := range req.Hashes {
- if i != 0 && !task.waitOrStop() {
- return
- }
- if bytes >= softResponseLimit {
- break
- }
- // Retrieve the requested block's receipts, skipping if unknown to us
- var results types.Receipts
- if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
- results = rawdb.ReadRawReceipts(pm.chainDb, hash, *number)
- }
- if results == nil {
- if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
- continue
+ if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
+ go func() {
+ for i, hash := range req.Hashes {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
+ }
+ if bytes >= softResponseLimit {
+ break
+ }
+ // Retrieve the requested block's receipts, skipping if unknown to us
+ var results types.Receipts
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
+ results = rawdb.ReadRawReceipts(pm.chainDb, hash, *number)
+ }
+ if results == nil {
+ if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
+ continue
+ }
+ }
+ // If known, encode and queue for response packet
+ if encoded, err := rlp.EncodeToBytes(results); err != nil {
+ log.Error("Failed to encode receipt", "err", err)
+ } else {
+ receipts = append(receipts, encoded)
+ bytes += len(encoded)
}
}
- // If known, encode and queue for response packet
- if encoded, err := rlp.EncodeToBytes(results); err != nil {
- log.Error("Failed to encode receipt", "err", err)
- } else {
- receipts = append(receipts, encoded)
- bytes += len(encoded)
- }
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyReceiptsRLP(req.ReqID, receipts), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyReceiptsRLP(req.ReqID, receipts), task.done())
+ }()
+ }
case ReceiptsMsg:
if pm.odr == nil {
@@ -797,70 +826,70 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
root common.Hash
)
reqCnt := len(req.Reqs)
- if !accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- nodes := light.NewNodeSet()
-
- for i, req := range req.Reqs {
- if i != 0 && !task.waitOrStop() {
- return
- }
- // Look up the root hash belonging to the request
- var (
- number *uint64
- header *types.Header
- trie state.Trie
- )
- if req.BHash != lastBHash {
- root, lastBHash = common.Hash{}, req.BHash
-
- if number = rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number == nil {
- p.Log().Warn("Failed to retrieve block num for proof", "hash", req.BHash)
- continue
+ if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
+ go func() {
+ nodes := light.NewNodeSet()
+
+ for i, request := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
}
- if header = rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header == nil {
- p.Log().Warn("Failed to retrieve header for proof", "block", *number, "hash", req.BHash)
- continue
+ // Look up the root hash belonging to the request
+ var (
+ number *uint64
+ header *types.Header
+ trie state.Trie
+ )
+ if request.BHash != lastBHash {
+ root, lastBHash = common.Hash{}, request.BHash
+
+ if number = rawdb.ReadHeaderNumber(pm.chainDb, request.BHash); number == nil {
+ p.Log().Warn("Failed to retrieve block num for proof", "hash", request.BHash)
+ continue
+ }
+ if header = rawdb.ReadHeader(pm.chainDb, request.BHash, *number); header == nil {
+ p.Log().Warn("Failed to retrieve header for proof", "block", *number, "hash", request.BHash)
+ continue
+ }
+ root = header.Root
}
- root = header.Root
- }
- // Open the account or storage trie for the request
- statedb := pm.blockchain.StateCache()
-
- switch len(req.AccKey) {
- case 0:
- // No account key specified, open an account trie
- trie, err = statedb.OpenTrie(root)
- if trie == nil || err != nil {
- p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
- continue
+ // Open the account or storage trie for the request
+ statedb := pm.blockchain.StateCache()
+
+ switch len(request.AccKey) {
+ case 0:
+ // No account key specified, open an account trie
+ trie, err = statedb.OpenTrie(root)
+ if trie == nil || err != nil {
+ p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
+ continue
+ }
+ default:
+ // Account key specified, open a storage trie
+ account, err := pm.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey))
+ if err != nil {
+ p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
+ continue
+ }
+ trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root)
+ if trie == nil || err != nil {
+ p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err)
+ continue
+ }
}
- default:
- // Account key specified, open a storage trie
- account, err := pm.getAccount(statedb.TrieDB(), root, common.BytesToHash(req.AccKey))
- if err != nil {
- p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "err", err)
+ // Prove the user's request from the account or stroage trie
+ if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil {
+ p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
continue
}
- trie, err = statedb.OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
- if trie == nil || err != nil {
- p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "root", account.Root, "err", err)
- continue
+ if nodes.DataSize() >= softResponseLimit {
+ break
}
}
- // Prove the user's request from the account or stroage trie
- if err := trie.Prove(req.Key, req.FromLevel, nodes); err != nil {
- p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
- continue
- }
- if nodes.DataSize() >= softResponseLimit {
- break
- }
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofsV2(req.ReqID, nodes.NodeList()), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofsV2(req.ReqID, nodes.NodeList()), task.done())
+ }()
+ }
case ProofsV2Msg:
if pm.odr == nil {
@@ -899,53 +928,53 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
auxData [][]byte
)
reqCnt := len(req.Reqs)
- if !accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
-
- var (
- lastIdx uint64
- lastType uint
- root common.Hash
- auxTrie *trie.Trie
- )
- nodes := light.NewNodeSet()
- for i, req := range req.Reqs {
- if i != 0 && !task.waitOrStop() {
- return
- }
- if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx {
- auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx
+ if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
+ go func() {
- var prefix string
- if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) {
- auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(pm.chainDb, prefix)))
- }
- }
- if req.AuxReq == auxRoot {
- var data []byte
- if root != (common.Hash{}) {
- data = root[:]
+ var (
+ lastIdx uint64
+ lastType uint
+ root common.Hash
+ auxTrie *trie.Trie
+ )
+ nodes := light.NewNodeSet()
+ for i, request := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
}
- auxData = append(auxData, data)
- auxBytes += len(data)
- } else {
- if auxTrie != nil {
- auxTrie.Prove(req.Key, req.FromLevel, nodes)
+ if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx {
+ auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx
+
+ var prefix string
+ if root, prefix = pm.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) {
+ auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(pm.chainDb, prefix)))
+ }
}
- if req.AuxReq != 0 {
- data := pm.getHelperTrieAuxData(req)
+ if request.AuxReq == auxRoot {
+ var data []byte
+ if root != (common.Hash{}) {
+ data = root[:]
+ }
auxData = append(auxData, data)
auxBytes += len(data)
+ } else {
+ if auxTrie != nil {
+ auxTrie.Prove(request.Key, request.FromLevel, nodes)
+ }
+ if request.AuxReq != 0 {
+ data := pm.getHelperTrieAuxData(request)
+ auxData = append(auxData, data)
+ auxBytes += len(data)
+ }
+ }
+ if nodes.DataSize()+auxBytes >= softResponseLimit {
+ break
}
}
- if nodes.DataSize()+auxBytes >= softResponseLimit {
- break
- }
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}), task.done())
+ }()
+ }
case HelperTrieProofsMsg:
if pm.odr == nil {
@@ -981,27 +1010,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(req.Txs)
- if !accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- stats := make([]light.TxStatus, len(req.Txs))
- for i, tx := range req.Txs {
- if i != 0 && !task.waitOrStop() {
- return
- }
- hash := tx.Hash()
- stats[i] = pm.txStatus(hash)
- if stats[i].Status == core.TxStatusUnknown {
- if errs := pm.txpool.AddRemotes([]*types.Transaction{tx}); errs[0] != nil {
- stats[i].Error = errs[0].Error()
- continue
+ if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
+ go func() {
+ stats := make([]light.TxStatus, len(req.Txs))
+ for i, tx := range req.Txs {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
}
+ hash := tx.Hash()
stats[i] = pm.txStatus(hash)
+ if stats[i].Status == core.TxStatusUnknown {
+ if errs := pm.txpool.AddRemotes([]*types.Transaction{tx}); errs[0] != nil {
+ stats[i].Error = errs[0].Error()
+ continue
+ }
+ stats[i] = pm.txStatus(hash)
+ }
}
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
+ }()
+ }
case GetTxStatusMsg:
if pm.txpool == nil {
@@ -1016,19 +1045,19 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(req.Hashes)
- if !accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
- return errResp(ErrRequestRejected, "")
- }
- go func() {
- stats := make([]light.TxStatus, len(req.Hashes))
- for i, hash := range req.Hashes {
- if i != 0 && !task.waitOrStop() {
- return
+ if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
+ go func() {
+ stats := make([]light.TxStatus, len(req.Hashes))
+ for i, hash := range req.Hashes {
+ if i != 0 && !task.waitOrStop() {
+ sendResponse(req.ReqID, 0, nil, task.servingTime)
+ return
+ }
+ stats[i] = pm.txStatus(hash)
}
- stats[i] = pm.txStatus(hash)
- }
- sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
- }()
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
+ }()
+ }
case TxStatusMsg:
if pm.odr == nil {
@@ -1053,6 +1082,26 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
Obj: resp.Status,
}
+ case StopMsg:
+ if pm.odr == nil {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+ p.freezeServer(true)
+ pm.retriever.frozen(p)
+ p.Log().Warn("Service stopped")
+
+ case ResumeMsg:
+ if pm.odr == nil {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+ var bv uint64
+ if err := msg.Decode(&bv); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ p.fcServer.ResumeFreeze(bv)
+ p.freezeServer(false)
+ p.Log().Warn("Service resumed")
+
default:
p.Log().Trace("Received unknown message", "code", msg.Code)
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
diff --git a/les/handler_test.go b/les/handler_test.go
index d8051c145..6e24165cf 100644
--- a/les/handler_test.go
+++ b/les/handler_test.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -438,7 +439,7 @@ func TestTransactionStatusLes2(t *testing.T) {
config.Journal = ""
txpool := core.NewTxPool(config, params.TestChainConfig, chain)
pm.txpool = txpool
- peer, _ := newTestPeer(t, "peer", 2, pm, true)
+ peer, _ := newTestPeer(t, "peer", 2, pm, true, 0)
defer peer.close()
var reqID uint64
@@ -519,3 +520,51 @@ func TestTransactionStatusLes2(t *testing.T) {
test(tx1, false, light.TxStatus{Status: core.TxStatusPending})
test(tx2, false, light.TxStatus{Status: core.TxStatusPending})
}
+
+func TestStopResumeLes3(t *testing.T) {
+ db := rawdb.NewMemoryDatabase()
+ clock := &mclock.Simulated{}
+ testCost := testBufLimit / 10
+ pm, err := newTestProtocolManager(false, 0, nil, nil, nil, db, nil, testCost, clock)
+ if err != nil {
+ t.Fatalf("Failed to create protocol manager: %v", err)
+ }
+ peer, _ := newTestPeer(t, "peer", 3, pm, true, testCost)
+ defer peer.close()
+
+ expBuf := testBufLimit
+ var reqID uint64
+
+ req := func() {
+ reqID++
+ sendRequest(peer.app, GetBlockHeadersMsg, reqID, testCost, &getBlockHeadersData{Origin: hashOrNumber{Hash: common.Hash{1}}, Amount: 1})
+ }
+
+ for i := 1; i <= 5; i++ {
+ // send requests while we still have enough buffer and expect a response
+ for expBuf >= testCost {
+ req()
+ expBuf -= testCost
+ if err := expectResponse(peer.app, BlockHeadersMsg, reqID, expBuf, nil); err != nil {
+ t.Errorf("expected response and failed: %v", err)
+ }
+ }
+ // send some more requests in excess and expect a single StopMsg
+ c := i
+ for c > 0 {
+ req()
+ c--
+ }
+ if err := p2p.ExpectMsg(peer.app, StopMsg, nil); err != nil {
+ t.Errorf("expected StopMsg and failed: %v", err)
+ }
+ // wait until the buffer is recharged by half of the limit
+ wait := testBufLimit / testBufRecharge / 2
+ clock.Run(time.Millisecond * time.Duration(wait))
+ // expect a ResumeMsg with the partially recharged buffer value
+ expBuf += testBufRecharge * wait
+ if err := p2p.ExpectMsg(peer.app, ResumeMsg, expBuf); err != nil {
+ t.Errorf("expected ResumeMsg and failed: %v", err)
+ }
+ }
+}
diff --git a/les/helper_test.go b/les/helper_test.go
index 878e44404..dbb081344 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -62,7 +62,8 @@ var (
testEventEmitterCode = common.Hex2Bytes("60606040523415600e57600080fd5b7f57050ab73f6b9ebdd9f76b8d4997793f48cf956e965ee070551b9ca0bb71584e60405160405180910390a160358060476000396000f3006060604052600080fd00a165627a7a723058203f727efcad8b5811f8cb1fc2620ce5e8c63570d697aef968172de296ea3994140029")
testEventEmitterAddr common.Address
- testBufLimit = uint64(100)
+ testBufLimit = uint64(1000000)
+ testBufRecharge = uint64(1000)
)
/*
@@ -138,7 +139,7 @@ func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.Indexe
// newTestProtocolManager creates a new protocol manager for testing purposes,
// with the given number of blocks already known, potential notification
// channels for different events and relative chain indexers array.
-func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig) (*ProtocolManager, error) {
+func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig, testCost uint64, clock mclock.Clock) (*ProtocolManager, error) {
var (
evmux = new(event.TypeMux)
engine = ethash.NewFaker()
@@ -177,14 +178,15 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
if !lightSync {
srv := &LesServer{lesCommons: lesCommons{protocolManager: pm}}
pm.server = srv
+ pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1, nil)
pm.servingQueue.setThreads(4)
srv.defParams = flowcontrol.ServerParams{
BufLimit: testBufLimit,
- MinRecharge: 1,
+ MinRecharge: testBufRecharge,
}
-
- srv.fcManager = flowcontrol.NewClientManager(nil, &mclock.System{})
+ srv.testCost = testCost
+ srv.fcManager = flowcontrol.NewClientManager(nil, clock)
}
pm.Start(1000)
return pm, nil
@@ -195,7 +197,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
// channels for different events and relative chain indexers array. In case of an error, the constructor force-
// fails the test.
func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig) *ProtocolManager {
- pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db, ulcConfig)
+ pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db, ulcConfig, 0, &mclock.System{})
if err != nil {
t.Fatalf("Failed to create protocol manager: %v", err)
}
@@ -210,7 +212,7 @@ type testPeer struct {
}
// newTestPeer creates a new peer registered at the given protocol manager.
-func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, shake bool) (*testPeer, <-chan error) {
+func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, shake bool, testCost uint64) (*testPeer, <-chan error) {
// Create a message pipe to communicate through
app, net := p2p.MsgPipe()
@@ -242,7 +244,7 @@ func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, sh
head = pm.blockchain.CurrentHeader()
td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64())
)
- tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash())
+ tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), testCost)
}
return tp, errc
}
@@ -282,7 +284,7 @@ func newTestPeerPair(name string, version int, pm, pm2 *ProtocolManager) (*peer,
// handshake simulates a trivial handshake that expects the same state from the
// remote side as we are simulating locally.
-func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash) {
+func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, testCost uint64) {
var expList keyValueList
expList = expList.add("protocolVersion", uint64(p.version))
expList = expList.add("networkId", uint64(NetworkId))
@@ -295,10 +297,11 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
expList = expList.add("serveHeaders", nil)
expList = expList.add("serveChainSince", uint64(0))
expList = expList.add("serveStateSince", uint64(0))
+ expList = expList.add("serveRecentState", uint64(core.TriesInMemory-4))
expList = expList.add("txRelay", nil)
expList = expList.add("flowControl/BL", testBufLimit)
- expList = expList.add("flowControl/MRR", uint64(1))
- expList = expList.add("flowControl/MRC", testCostList())
+ expList = expList.add("flowControl/MRR", testBufRecharge)
+ expList = expList.add("flowControl/MRC", testCostList(testCost))
if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil {
t.Fatalf("status recv: %v", err)
@@ -309,7 +312,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
p.fcParams = flowcontrol.ServerParams{
BufLimit: testBufLimit,
- MinRecharge: 1,
+ MinRecharge: testBufRecharge,
}
}
@@ -338,7 +341,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*cor
cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig)
pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db, nil)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
+ peer, _ := newTestPeer(t, "peer", protocol, pm, true, 0)
cIndexer.Start(pm.blockchain.(*core.BlockChain))
bIndexer.Start(pm.blockchain.(*core.BlockChain))
diff --git a/les/peer.go b/les/peer.go
index 42c13ab7d..6792d0611 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -20,11 +20,14 @@ import (
"errors"
"fmt"
"math/big"
+ "math/rand"
"sync"
+ "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/flowcontrol"
@@ -47,10 +50,16 @@ const (
allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance
)
+const (
+ freezeTimeBase = time.Millisecond * 700 // fixed component of client freeze time
+ freezeTimeRandom = time.Millisecond * 600 // random component of client freeze time
+ freezeCheckPeriod = time.Millisecond * 100 // buffer value recheck period after initial freeze time has elapsed
+)
+
// if the total encoded size of a sent transaction batch is over txSizeCostLimit
// per transaction then the request cost is calculated as proportional to the
// encoded size instead of the transaction count
-const txSizeCostLimit = 0x10000
+const txSizeCostLimit = 0x4000
const (
announceTypeNone = iota
@@ -86,14 +95,17 @@ type peer struct {
responseErrors int
updateCounter uint64
updateTime mclock.AbsTime
+ frozen uint32 // 1 if client is in frozen state
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
fcParams flowcontrol.ServerParams
fcCosts requestCostTable
- isTrusted bool
- isOnlyAnnounce bool
+ isTrusted bool
+ isOnlyAnnounce bool
+ chainSince, chainRecent uint64
+ stateSince, stateRecent uint64
}
func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@@ -129,8 +141,59 @@ func (p *peer) rejectUpdate(size uint64) bool {
return p.updateCounter > allowedUpdateBytes
}
+// freezeClient temporarily puts the client in a frozen state which means all
+// unprocessed and subsequent requests are dropped. Unfreezing happens automatically
+// after a short time if the client's buffer value is at least in the slightly positive
+// region. The client is also notified about being frozen/unfrozen with a Stop/Resume
+// message.
+func (p *peer) freezeClient() {
+ if p.version < lpv3 {
+ // if Stop/Resume is not supported then just drop the peer after setting
+ // its frozen status permanently
+ atomic.StoreUint32(&p.frozen, 1)
+ p.Peer.Disconnect(p2p.DiscUselessPeer)
+ return
+ }
+ if atomic.SwapUint32(&p.frozen, 1) == 0 {
+ go func() {
+ p.SendStop()
+ time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom))))
+ for {
+ bufValue, bufLimit := p.fcClient.BufferStatus()
+ if bufLimit == 0 {
+ return
+ }
+ if bufValue <= bufLimit/8 {
+ time.Sleep(freezeCheckPeriod)
+ } else {
+ atomic.StoreUint32(&p.frozen, 0)
+ p.SendResume(bufValue)
+ break
+ }
+ }
+ }()
+ }
+}
+
+// freezeServer processes Stop/Resume messages from the given server
+func (p *peer) freezeServer(frozen bool) {
+ var f uint32
+ if frozen {
+ f = 1
+ }
+ if atomic.SwapUint32(&p.frozen, f) != f && frozen {
+ p.sendQueue.clear()
+ }
+}
+
+// isFrozen returns true if the client is frozen or the server has put our
+// client in frozen state
+func (p *peer) isFrozen() bool {
+ return atomic.LoadUint32(&p.frozen) != 0
+}
+
func (p *peer) canQueue() bool {
- return p.sendQueue.canQueue()
+ return p.sendQueue.canQueue() && !p.isFrozen()
}
func (p *peer) queueSend(f func()) {
@@ -265,10 +328,21 @@ func (p *peer) GetTxRelayCost(amount, size int) uint64 {
// HasBlock checks if the peer has a given block
func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool {
+ var head, since, recent uint64
p.lock.RLock()
+ if p.headInfo != nil {
+ head = p.headInfo.Number
+ }
+ if hasState {
+ since = p.stateSince
+ recent = p.stateRecent
+ } else {
+ since = p.chainSince
+ recent = p.chainRecent
+ }
hasBlock := p.hasBlock
p.lock.RUnlock()
- return hasBlock != nil && hasBlock(hash, number, hasState)
+ return head >= number && number >= since && (recent == 0 || number+recent+4 > head) && hasBlock != nil && hasBlock(hash, number, hasState)
}
// SendAnnounce announces the availability of a number of blocks through
@@ -277,6 +351,16 @@ func (p *peer) SendAnnounce(request announceData) error {
return p2p.Send(p.rw, AnnounceMsg, request)
}
+// SendStop notifies the client about being in frozen state
+func (p *peer) SendStop() error {
+ return p2p.Send(p.rw, StopMsg, struct{}{})
+}
+
+// SendResume notifies the client about getting out of frozen state
+func (p *peer) SendResume(bv uint64) error {
+ return p2p.Send(p.rw, ResumeMsg, bv)
+}
+
// ReplyBlockHeaders creates a reply with a batch of block headers
func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply {
data, _ := rlp.EncodeToBytes(headers)
@@ -464,19 +548,19 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
send = send.add("genesisHash", genesis)
if server != nil {
if !server.onlyAnnounce {
- //only announce server. It sends only announse requests
send = send.add("serveHeaders", nil)
send = send.add("serveChainSince", uint64(0))
send = send.add("serveStateSince", uint64(0))
+ send = send.add("serveRecentState", uint64(core.TriesInMemory-4))
send = send.add("txRelay", nil)
}
send = send.add("flowControl/BL", server.defParams.BufLimit)
send = send.add("flowControl/MRR", server.defParams.MinRecharge)
var costList RequestCostList
if server.costTracker != nil {
- costList = server.costTracker.makeCostList()
+ costList = server.costTracker.makeCostList(server.costTracker.globalFactor())
} else {
- costList = testCostList()
+ costList = testCostList(server.testCost)
}
send = send.add("flowControl/MRC", costList)
p.fcCosts = costList.decode(ProtocolLengths[uint(p.version)])
@@ -544,12 +628,18 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams)
} else {
//mark OnlyAnnounce server if "serveHeaders", "serveChainSince", "serveStateSince" or "txRelay" fields don't exist
- if recv.get("serveChainSince", nil) != nil {
+ if recv.get("serveChainSince", &p.chainSince) != nil {
p.isOnlyAnnounce = true
}
- if recv.get("serveStateSince", nil) != nil {
+ if recv.get("serveRecentChain", &p.chainRecent) != nil {
+ p.chainRecent = 0
+ }
+ if recv.get("serveStateSince", &p.stateSince) != nil {
p.isOnlyAnnounce = true
}
+ if recv.get("serveRecentState", &p.stateRecent) != nil {
+ p.stateRecent = 0
+ }
if recv.get("txRelay", nil) != nil {
p.isOnlyAnnounce = true
}
diff --git a/les/peer_test.go b/les/peer_test.go
index 62adf90fe..d5ce16694 100644
--- a/les/peer_test.go
+++ b/les/peer_test.go
@@ -54,7 +54,7 @@ func TestPeerHandshakeSetAnnounceTypeToAnnounceTypeSignedForTrustedPeer(t *testi
l = l.add("txRelay", nil)
l = l.add("flowControl/BL", uint64(0))
l = l.add("flowControl/MRR", uint64(0))
- l = l.add("flowControl/MRC", testCostList())
+ l = l.add("flowControl/MRC", testCostList(0))
return l
},
@@ -99,7 +99,7 @@ func TestPeerHandshakeAnnounceTypeSignedForTrustedPeersPeerNotInTrusted(t *testi
l = l.add("txRelay", nil)
l = l.add("flowControl/BL", uint64(0))
l = l.add("flowControl/MRR", uint64(0))
- l = l.add("flowControl/MRC", testCostList())
+ l = l.add("flowControl/MRC", testCostList(0))
return l
},
diff --git a/les/protocol.go b/les/protocol.go
index 24dc9bdea..ebf581473 100644
--- a/les/protocol.go
+++ b/les/protocol.go
@@ -32,17 +32,18 @@ import (
// Constants to match up protocol versions and messages
const (
lpv2 = 2
+ lpv3 = 3
)
// Supported versions of the les protocol (first is primary)
var (
- ClientProtocolVersions = []uint{lpv2}
- ServerProtocolVersions = []uint{lpv2}
+ ClientProtocolVersions = []uint{lpv2, lpv3}
+ ServerProtocolVersions = []uint{lpv2, lpv3}
AdvertiseProtocolVersions = []uint{lpv2} // clients are searching for the first advertised protocol in the list
)
// Number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = map[uint]uint64{lpv2: 22}
+var ProtocolLengths = map[uint]uint64{lpv2: 22, lpv3: 24}
const (
NetworkId = 1
@@ -70,6 +71,9 @@ const (
SendTxV2Msg = 0x13
GetTxStatusMsg = 0x14
TxStatusMsg = 0x15
+ // Protocol messages introduced in LPV3
+ StopMsg = 0x16
+ ResumeMsg = 0x17
)
type requestInfo struct {
diff --git a/les/retrieve.go b/les/retrieve.go
index dd9d14598..d17a02e1a 100644
--- a/les/retrieve.go
+++ b/les/retrieve.go
@@ -78,8 +78,8 @@ type sentReq struct {
// after which delivered is set to true, the validity of the response is sent on the
// valid channel and no more responses are accepted.
type sentReqToPeer struct {
- delivered bool
- valid chan bool
+ delivered, frozen bool
+ event chan int
}
// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
@@ -95,6 +95,7 @@ const (
rpHardTimeout
rpDeliveredValid
rpDeliveredInvalid
+ rpNotDelivered
)
// newRetrieveManager creates the retrieve manager
@@ -149,7 +150,7 @@ func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc
req.request = func(p distPeer) func() {
// before actually sending the request, put an entry into the sentTo map
r.lock.Lock()
- r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
+ r.sentTo[p] = sentReqToPeer{delivered: false, frozen: false, event: make(chan int, 1)}
r.lock.Unlock()
return request(p)
}
@@ -173,6 +174,17 @@ func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
}
+// frozen is called by the LES protocol manager when a server has suspended its service and we
+// should not expect an answer for the requests already sent there
+func (rm *retrieveManager) frozen(peer distPeer) {
+ rm.lock.RLock()
+ defer rm.lock.RUnlock()
+
+ for _, req := range rm.sentReqs {
+ req.frozen(peer)
+ }
+}
+
// reqStateFn represents a state of the retrieve loop state machine
type reqStateFn func() reqStateFn
@@ -215,7 +227,7 @@ func (r *sentReq) stateRequesting() reqStateFn {
go r.tryRequest()
r.lastReqQueued = true
return r.stateRequesting
- case rpDeliveredInvalid:
+ case rpDeliveredInvalid, rpNotDelivered:
// if it was the last sent request (set to nil by update) then start a new one
if !r.lastReqQueued && r.lastReqSentTo == nil {
go r.tryRequest()
@@ -277,7 +289,7 @@ func (r *sentReq) update(ev reqPeerEvent) {
r.reqSrtoCount++
case rpHardTimeout:
r.reqSrtoCount--
- case rpDeliveredValid, rpDeliveredInvalid:
+ case rpDeliveredValid, rpDeliveredInvalid, rpNotDelivered:
if ev.peer == r.lastReqSentTo {
r.lastReqSentTo = nil
} else {
@@ -343,12 +355,13 @@ func (r *sentReq) tryRequest() {
}()
select {
- case ok := <-s.valid:
- if ok {
- r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
- } else {
- r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ case event := <-s.event:
+ if event == rpNotDelivered {
+ r.lock.Lock()
+ delete(r.sentTo, p)
+ r.lock.Unlock()
}
+ r.eventsCh <- reqPeerEvent{event, p}
return
case <-time.After(softRequestTimeout):
srto = true
@@ -356,12 +369,13 @@ func (r *sentReq) tryRequest() {
}
select {
- case ok := <-s.valid:
- if ok {
- r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
- } else {
- r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ case event := <-s.event:
+ if event == rpNotDelivered {
+ r.lock.Lock()
+ delete(r.sentTo, p)
+ r.lock.Unlock()
}
+ r.eventsCh <- reqPeerEvent{event, p}
case <-time.After(hardRequestTimeout):
hrto = true
r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
@@ -377,15 +391,37 @@ func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
if !ok || s.delivered {
return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
}
+ if s.frozen {
+ return nil
+ }
valid := r.validate(peer, msg) == nil
- r.sentTo[peer] = sentReqToPeer{true, s.valid}
- s.valid <- valid
+ r.sentTo[peer] = sentReqToPeer{delivered: true, frozen: false, event: s.event}
+ if valid {
+ s.event <- rpDeliveredValid
+ } else {
+ s.event <- rpDeliveredInvalid
+ }
if !valid {
return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
}
return nil
}
+// frozen sends a "not delivered" event to the peer event channel belonging to the
+// given peer if the request has been sent there, causing the state machine to not
+// expect an answer and potentially even send the request to the same peer again
+// when canSend allows it.
+func (r *sentReq) frozen(peer distPeer) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ s, ok := r.sentTo[peer]
+ if ok && !s.delivered && !s.frozen {
+ r.sentTo[peer] = sentReqToPeer{delivered: false, frozen: true, event: s.event}
+ s.event <- rpNotDelivered
+ }
+}
+
// stop stops the retrieval process and sets an error code that will be returned
// by getError
func (r *sentReq) stop(err error) {
diff --git a/les/server.go b/les/server.go
index 6b93b846d..836fa0d55 100644
--- a/les/server.go
+++ b/les/server.go
@@ -19,6 +19,7 @@ package les
import (
"crypto/ecdsa"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
@@ -26,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
@@ -37,26 +39,43 @@ import (
const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio
+const (
+ logFileName = "" // csv log file name (disabled if empty)
+ logClientPoolMetrics = true // log client pool metrics
+ logClientPoolEvents = false // detailed client pool event logging
+ logRequestServing = true // log request serving metrics and events
+ logBlockProcEvents = true // log block processing events
+ logProtocolHandler = true // log protocol handler events
+)
+
type LesServer struct {
lesCommons
fcManager *flowcontrol.ClientManager // nil if our node is client only
costTracker *costTracker
+ testCost uint64
defParams flowcontrol.ServerParams
lesTopics []discv5.Topic
privateKey *ecdsa.PrivateKey
quitSync chan struct{}
onlyAnnounce bool
+ csvLogger *csvlogger.Logger
+ logTotalCap *csvlogger.Channel
thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode
- maxPeers int
- freeClientCap uint64
- freeClientPool *freeClientPool
- priorityClientPool *priorityClientPool
+ maxPeers int
+ minCapacity, freeClientCap uint64
+ freeClientPool *freeClientPool
+ priorityClientPool *priorityClientPool
}
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
+ var csvLogger *csvlogger.Logger
+ if logFileName != "" {
+ csvLogger = csvlogger.NewLogger(logFileName, time.Second*10, "event, peerId")
+ }
+
quitSync := make(chan struct{})
pm, err := NewProtocolManager(
eth.BlockChain().Config(),
@@ -78,6 +97,14 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
if err != nil {
return nil, err
}
+ if logProtocolHandler {
+ pm.logger = csvLogger
+ }
+ requestLogger := csvLogger
+ if !logRequestServing {
+ requestLogger = nil
+ }
+ pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100, requestLogger)
lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
for i, pv := range AdvertiseProtocolVersions {
@@ -93,11 +120,13 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency),
protocolManager: pm,
},
- costTracker: newCostTracker(eth.ChainDb(), config),
quitSync: quitSync,
lesTopics: lesTopics,
onlyAnnounce: config.OnlyAnnounce,
+ csvLogger: csvLogger,
+ logTotalCap: requestLogger.NewChannel("totalCapacity", 0.01),
}
+ srv.costTracker, srv.minCapacity = newCostTracker(eth.ChainDb(), config, requestLogger)
logger := log.New()
pm.server = srv
@@ -144,7 +173,11 @@ func (s *LesServer) APIs() []rpc.API {
func (s *LesServer) startEventLoop() {
s.protocolManager.wg.Add(1)
- var processing bool
+ blockProcLogger := s.csvLogger
+ if !logBlockProcEvents {
+ blockProcLogger = nil
+ }
+ var processing, procLast bool
blockProcFeed := make(chan bool, 100)
s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed)
totalRechargeCh := make(chan uint64, 100)
@@ -152,17 +185,25 @@ func (s *LesServer) startEventLoop() {
totalCapacityCh := make(chan uint64, 100)
updateRecharge := func() {
if processing {
+ if !procLast {
+ blockProcLogger.Event("block processing started")
+ }
s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing)
s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
} else {
+ if procLast {
+ blockProcLogger.Event("block processing finished")
+ }
s.protocolManager.servingQueue.setThreads(s.thcNormal)
- s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 10, totalRecharge}, {totalRecharge, totalRecharge}})
+ s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}})
}
+ procLast = processing
}
updateRecharge()
totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
+ var maxFreePeers uint64
go func() {
for {
select {
@@ -171,6 +212,12 @@ func (s *LesServer) startEventLoop() {
case totalRecharge = <-totalRechargeCh:
updateRecharge()
case totalCapacity = <-totalCapacityCh:
+ s.logTotalCap.Update(float64(totalCapacity))
+ newFreePeers := totalCapacity / s.freeClientCap
+ if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) {
+ log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers)
+ }
+ maxFreePeers = newFreePeers
s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
case <-s.protocolManager.quitSync:
s.protocolManager.wg.Done()
@@ -189,9 +236,9 @@ func (s *LesServer) Start(srvr *p2p.Server) {
s.maxPeers = s.config.LightPeers
totalRecharge := s.costTracker.totalRecharge()
if s.maxPeers > 0 {
- s.freeClientCap = minCapacity //totalRecharge / uint64(s.maxPeers)
- if s.freeClientCap < minCapacity {
- s.freeClientCap = minCapacity
+ s.freeClientCap = s.minCapacity //totalRecharge / uint64(s.maxPeers)
+ if s.freeClientCap < s.minCapacity {
+ s.freeClientCap = s.minCapacity
}
if s.freeClientCap > 0 {
s.defParams = flowcontrol.ServerParams{
@@ -200,15 +247,25 @@ func (s *LesServer) Start(srvr *p2p.Server) {
}
}
}
- freePeers := int(totalRecharge / s.freeClientCap)
- if freePeers < s.maxPeers {
- log.Warn("Light peer count limited", "specified", s.maxPeers, "allowed", freePeers)
- }
- s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) })
- s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool)
+ maxCapacity := s.freeClientCap * uint64(s.maxPeers)
+ if totalRecharge > maxCapacity {
+ maxCapacity = totalRecharge
+ }
+ s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2)
+ poolMetricsLogger := s.csvLogger
+ if !logClientPoolMetrics {
+ poolMetricsLogger = nil
+ }
+ poolEventLogger := s.csvLogger
+ if !logClientPoolEvents {
+ poolEventLogger = nil
+ }
+ s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }, poolMetricsLogger, poolEventLogger)
+ s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool, poolMetricsLogger, poolEventLogger)
s.protocolManager.peers.notify(s.priorityClientPool)
+ s.csvLogger.Start()
s.startEventLoop()
s.protocolManager.Start(s.config.LightPeers)
if srvr.DiscV5 != nil {
@@ -233,6 +290,7 @@ func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
// Stop stops the LES service
func (s *LesServer) Stop() {
+ s.fcManager.Stop()
s.chtIndexer.Close()
// bloom trie indexer is closed by parent bloombits indexer
go func() {
@@ -241,6 +299,7 @@ func (s *LesServer) Stop() {
s.freeClientPool.stop()
s.costTracker.stop()
s.protocolManager.Stop()
+ s.csvLogger.Stop()
}
// todo(rjl493456442) separate client and server implementation.
diff --git a/les/servingqueue.go b/les/servingqueue.go
index 2438fdfe3..26656ec01 100644
--- a/les/servingqueue.go
+++ b/les/servingqueue.go
@@ -17,16 +17,24 @@
package les
import (
+ "fmt"
+ "sort"
"sync"
+ "sync/atomic"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
)
// servingQueue allows running tasks in a limited number of threads and puts the
// waiting tasks in a priority queue
type servingQueue struct {
- tokenCh chan runToken
+ recentTime, queuedTime, servingTimeDiff uint64
+ burstLimit, burstDropLimit uint64
+ burstDecRate float64
+ lastUpdate mclock.AbsTime
+
queueAddCh, queueBestCh chan *servingTask
stopThreadCh, quit chan struct{}
setThreadsCh chan int
@@ -36,6 +44,10 @@ type servingQueue struct {
queue *prque.Prque // priority queue for waiting or suspended tasks
best *servingTask // the highest priority task (not included in the queue)
suspendBias int64 // priority bias against suspending an already running task
+
+ logger *csvlogger.Logger
+ logRecentTime *csvlogger.Channel
+ logQueuedTime *csvlogger.Channel
}
// servingTask represents a request serving task. Tasks can be implemented to
@@ -47,12 +59,13 @@ type servingQueue struct {
// - run: execute a single step; return true if finished
// - after: executed after run finishes or returns an error, receives the total serving time
type servingTask struct {
- sq *servingQueue
- servingTime uint64
- priority int64
- biasAdded bool
- token runToken
- tokenCh chan runToken
+ sq *servingQueue
+ servingTime, timeAdded, maxTime, expTime uint64
+ peer *peer
+ priority int64
+ biasAdded bool
+ token runToken
+ tokenCh chan runToken
}
// runToken received by servingTask.start allows the task to run. Closing the
@@ -63,20 +76,19 @@ type runToken chan struct{}
// start blocks until the task can start and returns true if it is allowed to run.
// Returning false means that the task should be cancelled.
func (t *servingTask) start() bool {
+ if t.peer.isFrozen() {
+ return false
+ }
+ t.tokenCh = make(chan runToken, 1)
select {
- case t.token = <-t.sq.tokenCh:
- default:
- t.tokenCh = make(chan runToken, 1)
- select {
- case t.sq.queueAddCh <- t:
- case <-t.sq.quit:
- return false
- }
- select {
- case t.token = <-t.tokenCh:
- case <-t.sq.quit:
- return false
- }
+ case t.sq.queueAddCh <- t:
+ case <-t.sq.quit:
+ return false
+ }
+ select {
+ case t.token = <-t.tokenCh:
+ case <-t.sq.quit:
+ return false
}
if t.token == nil {
return false
@@ -90,6 +102,14 @@ func (t *servingTask) start() bool {
func (t *servingTask) done() uint64 {
t.servingTime += uint64(mclock.Now())
close(t.token)
+ diff := t.servingTime - t.timeAdded
+ t.timeAdded = t.servingTime
+ if t.expTime > diff {
+ t.expTime -= diff
+ atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
+ } else {
+ t.expTime = 0
+ }
return t.servingTime
}
@@ -107,16 +127,22 @@ func (t *servingTask) waitOrStop() bool {
}
// newServingQueue returns a new servingQueue
-func newServingQueue(suspendBias int64) *servingQueue {
+func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue {
sq := &servingQueue{
- queue: prque.New(nil),
- suspendBias: suspendBias,
- tokenCh: make(chan runToken),
- queueAddCh: make(chan *servingTask, 100),
- queueBestCh: make(chan *servingTask),
- stopThreadCh: make(chan struct{}),
- quit: make(chan struct{}),
- setThreadsCh: make(chan int, 10),
+ queue: prque.New(nil),
+ suspendBias: suspendBias,
+ queueAddCh: make(chan *servingTask, 100),
+ queueBestCh: make(chan *servingTask),
+ stopThreadCh: make(chan struct{}),
+ quit: make(chan struct{}),
+ setThreadsCh: make(chan int, 10),
+ burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
+ burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
+ burstDecRate: utilTarget,
+ lastUpdate: mclock.Now(),
+ logger: logger,
+ logRecentTime: logger.NewMinMaxChannel("recentTime", false),
+ logQueuedTime: logger.NewMinMaxChannel("queuedTime", false),
}
sq.wg.Add(2)
go sq.queueLoop()
@@ -125,9 +151,12 @@ func newServingQueue(suspendBias int64) *servingQueue {
}
// newTask creates a new task with the given priority
-func (sq *servingQueue) newTask(priority int64) *servingTask {
+func (sq *servingQueue) newTask(peer *peer, maxTime uint64, priority int64) *servingTask {
return &servingTask{
sq: sq,
+ peer: peer,
+ maxTime: maxTime,
+ expTime: maxTime,
priority: priority,
}
}
@@ -144,18 +173,12 @@ func (sq *servingQueue) threadController() {
select {
case best := <-sq.queueBestCh:
best.tokenCh <- token
- default:
- select {
- case best := <-sq.queueBestCh:
- best.tokenCh <- token
- case sq.tokenCh <- token:
- case <-sq.stopThreadCh:
- sq.wg.Done()
- return
- case <-sq.quit:
- sq.wg.Done()
- return
- }
+ case <-sq.stopThreadCh:
+ sq.wg.Done()
+ return
+ case <-sq.quit:
+ sq.wg.Done()
+ return
}
<-token
select {
@@ -170,6 +193,100 @@ func (sq *servingQueue) threadController() {
}
}
+type (
+ // peerTasks lists the tasks received from a given peer when selecting peers to freeze
+ peerTasks struct {
+ peer *peer
+ list []*servingTask
+ sumTime uint64
+ priority float64
+ }
+ // peerList is a sortable list of peerTasks
+ peerList []*peerTasks
+)
+
+func (l peerList) Len() int {
+ return len(l)
+}
+
+func (l peerList) Less(i, j int) bool {
+ return l[i].priority < l[j].priority
+}
+
+func (l peerList) Swap(i, j int) {
+ l[i], l[j] = l[j], l[i]
+}
+
+// freezePeers selects the peers with the worst priority queued tasks and freezes
+// them until burstTime goes under burstDropLimit or all peers are frozen
+func (sq *servingQueue) freezePeers() {
+ peerMap := make(map[*peer]*peerTasks)
+ var peerList peerList
+ if sq.best != nil {
+ sq.queue.Push(sq.best, sq.best.priority)
+ }
+ sq.best = nil
+ for sq.queue.Size() > 0 {
+ task := sq.queue.PopItem().(*servingTask)
+ tasks := peerMap[task.peer]
+ if tasks == nil {
+ bufValue, bufLimit := task.peer.fcClient.BufferStatus()
+ if bufLimit < 1 {
+ bufLimit = 1
+ }
+ tasks = &peerTasks{
+ peer: task.peer,
+ priority: float64(bufValue) / float64(bufLimit), // lower value comes first
+ }
+ peerMap[task.peer] = tasks
+ peerList = append(peerList, tasks)
+ }
+ tasks.list = append(tasks.list, task)
+ tasks.sumTime += task.expTime
+ }
+ sort.Sort(peerList)
+ drop := true
+ sq.logger.Event("freezing peers")
+ for _, tasks := range peerList {
+ if drop {
+ tasks.peer.freezeClient()
+ tasks.peer.fcClient.Freeze()
+ sq.queuedTime -= tasks.sumTime
+ if sq.logQueuedTime != nil {
+ sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
+ }
+ sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id))
+ drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
+ for _, task := range tasks.list {
+ task.tokenCh <- nil
+ }
+ } else {
+ for _, task := range tasks.list {
+ sq.queue.Push(task, task.priority)
+ }
+ }
+ }
+ if sq.queue.Size() > 0 {
+ sq.best = sq.queue.PopItem().(*servingTask)
+ }
+}
+
+// updateRecentTime recalculates the recent serving time value
+func (sq *servingQueue) updateRecentTime() {
+ subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
+ now := mclock.Now()
+ dt := now - sq.lastUpdate
+ sq.lastUpdate = now
+ if dt > 0 {
+ subTime += uint64(float64(dt) * sq.burstDecRate)
+ }
+ if sq.recentTime > subTime {
+ sq.recentTime -= subTime
+ } else {
+ sq.recentTime = 0
+ }
+}
+
// addTask inserts a task into the priority queue
func (sq *servingQueue) addTask(task *servingTask) {
if sq.best == nil {
@@ -177,10 +294,18 @@ func (sq *servingQueue) addTask(task *servingTask) {
} else if task.priority > sq.best.priority {
sq.queue.Push(sq.best, sq.best.priority)
sq.best = task
- return
} else {
sq.queue.Push(task, task.priority)
}
+ sq.updateRecentTime()
+ sq.queuedTime += task.expTime
+ if sq.logQueuedTime != nil {
+ sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
+ sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
+ }
+ if sq.recentTime+sq.queuedTime > sq.burstLimit {
+ sq.freezePeers()
+ }
}
// queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
@@ -189,10 +314,18 @@ func (sq *servingQueue) addTask(task *servingTask) {
func (sq *servingQueue) queueLoop() {
for {
if sq.best != nil {
+ expTime := sq.best.expTime
select {
case task := <-sq.queueAddCh:
sq.addTask(task)
case sq.queueBestCh <- sq.best:
+ sq.updateRecentTime()
+ sq.queuedTime -= expTime
+ sq.recentTime += expTime
+ if sq.logQueuedTime != nil {
+ sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
+ sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
+ }
if sq.queue.Size() == 0 {
sq.best = nil
} else {