diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2019-05-31 02:51:13 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-31 02:51:13 +0800 |
commit | 58497f46bd0bdd105828c30500e863e826e598cd (patch) | |
tree | 7c658530edfebb6e47ed5f993753f4b48fd1747e | |
parent | 3d58268bba92c6c8f7f035bafcd1608bc22cee51 (diff) | |
download | go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.gz go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.bz2 go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.lz go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.xz go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.zst go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.zip |
les, les/flowcontrol: implement LES/3 (#19329)
les, les/flowcontrol: implement LES/3
-rw-r--r-- | cmd/utils/flags.go | 8 | ||||
-rw-r--r-- | core/blockchain.go | 12 | ||||
-rw-r--r-- | core/blockchain_test.go | 22 | ||||
-rw-r--r-- | les/api.go | 32 | ||||
-rw-r--r-- | les/costtracker.go | 188 | ||||
-rw-r--r-- | les/csvlogger/csvlogger.go | 227 | ||||
-rw-r--r-- | les/distributor.go | 33 | ||||
-rw-r--r-- | les/execqueue.go | 9 | ||||
-rw-r--r-- | les/flowcontrol/control.go | 111 | ||||
-rw-r--r-- | les/flowcontrol/manager.go | 193 | ||||
-rw-r--r-- | les/flowcontrol/manager_test.go | 12 | ||||
-rw-r--r-- | les/freeclient.go | 57 | ||||
-rw-r--r-- | les/freeclient_test.go | 4 | ||||
-rw-r--r-- | les/handler.go | 649 | ||||
-rw-r--r-- | les/handler_test.go | 51 | ||||
-rw-r--r-- | les/helper_test.go | 29 | ||||
-rw-r--r-- | les/peer.go | 110 | ||||
-rw-r--r-- | les/peer_test.go | 4 | ||||
-rw-r--r-- | les/protocol.go | 10 | ||||
-rw-r--r-- | les/retrieve.go | 70 | ||||
-rw-r--r-- | les/server.go | 91 | ||||
-rw-r--r-- | les/servingqueue.go | 219 |
22 files changed, 1533 insertions, 608 deletions
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ddeb44f34..93d162370 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -205,13 +205,13 @@ var ( } LightBandwidthInFlag = cli.IntFlag{ Name: "lightbwin", - Usage: "Incoming bandwidth limit for light server (1000 bytes/sec, 0 = unlimited)", - Value: 1000, + Usage: "Incoming bandwidth limit for light server (kilobytes/sec, 0 = unlimited)", + Value: 0, } LightBandwidthOutFlag = cli.IntFlag{ Name: "lightbwout", - Usage: "Outgoing bandwidth limit for light server (1000 bytes/sec, 0 = unlimited)", - Value: 5000, + Usage: "Outgoing bandwidth limit for light server (kilobytes/sec, 0 = unlimited)", + Value: 0, } LightPeersFlag = cli.IntFlag{ Name: "lightpeers", diff --git a/core/blockchain.go b/core/blockchain.go index 711959692..2355f0ea3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -74,7 +74,7 @@ const ( maxFutureBlocks = 256 maxTimeFutureBlocks = 30 badBlockLimit = 10 - triesInMemory = 128 + TriesInMemory = 128 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // @@ -799,7 +799,7 @@ func (bc *BlockChain) Stop() { if !bc.cacheConfig.TrieDirtyDisabled { triedb := bc.stateCache.TrieDB() - for _, offset := range []uint64{0, 1, triesInMemory - 1} { + for _, offset := range []uint64{0, 1, TriesInMemory - 1} { if number := bc.CurrentBlock().NumberU64(); number > offset { recent := bc.GetBlockByNumber(number - offset) @@ -1224,7 +1224,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive bc.triegc.Push(root, -int64(block.NumberU64())) - if current := block.NumberU64(); current > triesInMemory { + if current := block.NumberU64(); current > TriesInMemory { // If we exceeded our memory allowance, flush matured singleton nodes to disk var ( nodes, imgs = triedb.Size() @@ -1234,7 +1234,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. triedb.Cap(limit - ethdb.IdealBatchSize) } // Find the next state trie we need to commit - chosen := current - triesInMemory + chosen := current - TriesInMemory // If we exceeded out time allowance, flush an entire trie to disk if bc.gcproc > bc.cacheConfig.TrieTimeLimit { @@ -1246,8 +1246,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } else { // If we're exceeding limits but haven't reached a large enough memory gap, // warn the user that the system is becoming unstable. - if chosen < lastWrite+triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { - log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory) + if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory) } // Flush an entire trie and restart the counters triedb.Commit(header.Root, true) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 8dfcda6d4..27115af52 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1534,7 +1534,7 @@ func TestTrieForkGC(t *testing.T) { db := rawdb.NewMemoryDatabase() genesis := new(Genesis).MustCommit(db) - blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) // Generate a bunch of fork blocks, each side forking from the canonical chain forks := make([]*types.Block, len(blocks)) @@ -1563,7 +1563,7 @@ func TestTrieForkGC(t *testing.T) { } } // Dereference all the recent tries and ensure no past trie is left in - for i := 0; i < triesInMemory; i++ { + for i := 0; i < TriesInMemory; i++ { chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root()) chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root()) } @@ -1582,8 +1582,8 @@ func TestLargeReorgTrieGC(t *testing.T) { genesis := new(Genesis).MustCommit(db) shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 64, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) - original, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) - competitor, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) }) + original, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) + competitor, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*TriesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) }) // Import the shared chain and the original canonical one diskdb := rawdb.NewMemoryDatabase() @@ -1618,7 +1618,7 @@ func TestLargeReorgTrieGC(t *testing.T) { if _, err := chain.InsertChain(competitor[len(competitor)-2:]); err != nil { t.Fatalf("failed to finalize competitor chain: %v", err) } - for i, block := range competitor[:len(competitor)-triesInMemory] { + for i, block := range competitor[:len(competitor)-TriesInMemory] { if node, _ := chain.stateCache.TrieDB().Node(block.Root()); node != nil { t.Fatalf("competitor %d: competing chain state missing", i) } @@ -1753,7 +1753,7 @@ func TestLowDiffLongChain(t *testing.T) { // We must use a pretty long chain to ensure that the fork doesn't overtake us // until after at least 128 blocks post tip - blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 6*triesInMemory, func(i int, b *BlockGen) { + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 6*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) b.OffsetTime(-9) }) @@ -1771,7 +1771,7 @@ func TestLowDiffLongChain(t *testing.T) { } // Generate fork chain, starting from an early block parent := blocks[10] - fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 8*triesInMemory, func(i int, b *BlockGen) { + fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 8*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) @@ -1806,7 +1806,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon genesis := new(Genesis).MustCommit(db) // Generate and import the canonical chain - blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*triesInMemory, nil) + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*TriesInMemory, nil) diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) @@ -1817,9 +1817,9 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon t.Fatalf("block %d: failed to insert into chain: %v", n, err) } - lastPrunedIndex := len(blocks) - triesInMemory - 1 + lastPrunedIndex := len(blocks) - TriesInMemory - 1 lastPrunedBlock := blocks[lastPrunedIndex] - firstNonPrunedBlock := blocks[len(blocks)-triesInMemory] + firstNonPrunedBlock := blocks[len(blocks)-TriesInMemory] // Verify pruning of lastPrunedBlock if chain.HasBlockAndState(lastPrunedBlock.Hash(), lastPrunedBlock.NumberU64()) { @@ -1836,7 +1836,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon // Generate fork chain, make it longer than canon parentIndex := lastPrunedIndex + blocksBetweenCommonAncestorAndPruneblock parent := blocks[parentIndex] - fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 2*triesInMemory, func(i int, b *BlockGen) { + fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 2*TriesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) // Prepend the parent(s) diff --git a/les/api.go b/les/api.go index a933cbd06..3a8d49ca5 100644 --- a/les/api.go +++ b/les/api.go @@ -19,11 +19,13 @@ package les import ( "context" "errors" + "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" ) @@ -99,7 +101,7 @@ func (s tcSubs) send(tc uint64, underrun bool) { // MinimumCapacity queries minimum assignable capacity for a single client func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 { - return hexutil.Uint64(minCapacity) + return hexutil.Uint64(api.server.minCapacity) } // FreeClientCapacity queries the capacity provided for free clients @@ -115,7 +117,7 @@ func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 { // Note: assigned capacity can be changed while the client is connected with // immediate effect. func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error { - if cap != 0 && cap < minCapacity { + if cap != 0 && cap < api.server.minCapacity { return ErrMinCap } return api.server.priorityClientPool.setClientCapacity(id, cap) @@ -144,6 +146,8 @@ type priorityClientPool struct { totalCap, totalCapAnnounced uint64 totalConnectedCap, freeClientCap uint64 maxPeers, priorityCount int + logger *csvlogger.Logger + logTotalPriConn *csvlogger.Channel subs tcSubs updateSchedule []scheduledUpdate @@ -164,12 +168,14 @@ type priorityClientInfo struct { } // newPriorityClientPool creates a new priority client pool -func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool) *priorityClientPool { +func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool { return &priorityClientPool{ - clients: make(map[enode.ID]priorityClientInfo), - freeClientCap: freeClientCap, - ps: ps, - child: child, + clients: make(map[enode.ID]priorityClientInfo), + freeClientCap: freeClientCap, + ps: ps, + child: child, + logger: eventLogger, + logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0), } } @@ -185,6 +191,7 @@ func (v *priorityClientPool) registerPeer(p *peer) { id := p.ID() c := v.clients[id] + v.logger.Event(fmt.Sprintf("priorityClientPool: registerPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes())) if c.connected { return } @@ -192,6 +199,7 @@ func (v *priorityClientPool) registerPeer(p *peer) { v.child.registerPeer(p) } if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap { + v.logger.Event(fmt.Sprintf("priorityClientPool: rejected, %x", id.Bytes())) go v.ps.Unregister(p.id) return } @@ -202,6 +210,8 @@ func (v *priorityClientPool) registerPeer(p *peer) { if c.cap != 0 { v.priorityCount++ v.totalConnectedCap += c.cap + v.logger.Event(fmt.Sprintf("priorityClientPool: accepted with %d capacity, %x", c.cap, id.Bytes())) + v.logTotalPriConn.Update(float64(v.totalConnectedCap)) if v.child != nil { v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) } @@ -217,6 +227,7 @@ func (v *priorityClientPool) unregisterPeer(p *peer) { id := p.ID() c := v.clients[id] + v.logger.Event(fmt.Sprintf("priorityClientPool: unregisterPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes())) if !c.connected { return } @@ -225,6 +236,7 @@ func (v *priorityClientPool) unregisterPeer(p *peer) { v.clients[id] = c v.priorityCount-- v.totalConnectedCap -= c.cap + v.logTotalPriConn.Update(float64(v.totalConnectedCap)) if v.child != nil { v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) } @@ -299,8 +311,10 @@ func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) { if v.priorityCount > count || v.totalConnectedCap > totalCap { for id, c := range v.clients { if c.connected { + v.logger.Event(fmt.Sprintf("priorityClientPool: setLimitsNow kicked out, %x", id.Bytes())) c.connected = false v.totalConnectedCap -= c.cap + v.logTotalPriConn.Update(float64(v.totalConnectedCap)) v.priorityCount-- v.clients[id] = c go v.ps.Unregister(c.peer.id) @@ -356,6 +370,7 @@ func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error { v.priorityCount-- } v.totalConnectedCap += cap - c.cap + v.logTotalPriConn.Update(float64(v.totalConnectedCap)) if v.child != nil { v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) } @@ -374,6 +389,9 @@ func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error { } else { delete(v.clients, id) } + if c.connected { + v.logger.Event(fmt.Sprintf("priorityClientPool: changed capacity to %d, %x", cap, id.Bytes())) + } return nil } diff --git a/les/costtracker.go b/les/costtracker.go index 332d73343..e463c9f8b 100644 --- a/les/costtracker.go +++ b/les/costtracker.go @@ -18,6 +18,7 @@ package les import ( "encoding/binary" + "fmt" "math" "sync" "sync/atomic" @@ -26,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/log" ) @@ -52,7 +54,7 @@ var ( GetCodeMsg: {0, 80}, GetProofsV2Msg: {0, 80}, GetHelperTrieProofsMsg: {0, 20}, - SendTxV2Msg: {0, 66000}, + SendTxV2Msg: {0, 16500}, GetTxStatusMsg: {0, 50}, } // maximum outgoing message size estimates @@ -66,17 +68,27 @@ var ( SendTxV2Msg: {0, 100}, GetTxStatusMsg: {0, 100}, } - minBufLimit = uint64(50000000 * maxCostFactor) // minimum buffer limit allowed for a client - minCapacity = (minBufLimit-1)/bufLimitRatio + 1 // minimum capacity allowed for a client + // request amounts that have to fit into the minimum buffer size minBufferMultiplier times + minBufferReqAmount = map[uint64]uint64{ + GetBlockHeadersMsg: 192, + GetBlockBodiesMsg: 1, + GetReceiptsMsg: 1, + GetCodeMsg: 1, + GetProofsV2Msg: 1, + GetHelperTrieProofsMsg: 16, + SendTxV2Msg: 8, + GetTxStatusMsg: 64, + } + minBufferMultiplier = 3 ) const ( maxCostFactor = 2 // ratio of maximum and average cost estimates - gfInitWeight = time.Second * 10 - gfMaxWeight = time.Hour gfUsageThreshold = 0.5 gfUsageTC = time.Second - gfDbKey = "_globalCostFactor" + gfRaiseTC = time.Second * 200 + gfDropTC = time.Second * 50 + gfDbKey = "_globalCostFactorV3" ) // costTracker is responsible for calculating costs and cost estimates on the @@ -94,21 +106,30 @@ type costTracker struct { inSizeFactor, outSizeFactor float64 gf, utilTarget float64 + minBufLimit uint64 gfUpdateCh chan gfUpdate gfLock sync.RWMutex totalRechargeCh chan uint64 - stats map[uint64][]uint64 + stats map[uint64][]uint64 + logger *csvlogger.Logger + logRecentTime, logRecentAvg, logTotalRecharge, logRelCost *csvlogger.Channel } -// newCostTracker creates a cost tracker and loads the cost factor statistics from the database -func newCostTracker(db ethdb.Database, config *eth.Config) *costTracker { +// newCostTracker creates a cost tracker and loads the cost factor statistics from the database. +// It also returns the minimum capacity that can be assigned to any peer. +func newCostTracker(db ethdb.Database, config *eth.Config, logger *csvlogger.Logger) (*costTracker, uint64) { utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100 ct := &costTracker{ - db: db, - stopCh: make(chan chan struct{}), - utilTarget: utilTarget, + db: db, + stopCh: make(chan chan struct{}), + utilTarget: utilTarget, + logger: logger, + logRelCost: logger.NewMinMaxChannel("relativeCost", true), + logRecentTime: logger.NewMinMaxChannel("recentTime", true), + logRecentAvg: logger.NewMinMaxChannel("recentAvg", true), + logTotalRecharge: logger.NewChannel("totalRecharge", 0.01), } if config.LightBandwidthIn > 0 { ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn) @@ -123,7 +144,16 @@ func newCostTracker(db ethdb.Database, config *eth.Config) *costTracker { } } ct.gfLoop() - return ct + costList := ct.makeCostList(ct.globalFactor() * 1.25) + for _, c := range costList { + amount := minBufferReqAmount[c.MsgCode] + cost := c.BaseCost + amount*c.ReqCost + if cost > ct.minBufLimit { + ct.minBufLimit = cost + } + } + ct.minBufLimit *= uint64(minBufferMultiplier) + return ct, (ct.minBufLimit-1)/bufLimitRatio + 1 } // stop stops the cost tracker and saves the cost factor statistics to the database @@ -138,16 +168,14 @@ func (ct *costTracker) stop() { // makeCostList returns upper cost estimates based on the hardcoded cost estimate // tables and the optionally specified incoming/outgoing bandwidth limits -func (ct *costTracker) makeCostList() RequestCostList { - maxCost := func(avgTime, inSize, outSize uint64) uint64 { - globalFactor := ct.globalFactor() - - cost := avgTime * maxCostFactor - inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor * maxCostFactor) +func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList { + maxCost := func(avgTimeCost, inSize, outSize uint64) uint64 { + cost := avgTimeCost * maxCostFactor + inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor) if inSizeCost > cost { cost = inSizeCost } - outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor * maxCostFactor) + outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor) if outSizeCost > cost { cost = outSizeCost } @@ -155,17 +183,29 @@ func (ct *costTracker) makeCostList() RequestCostList { } var list RequestCostList for code, data := range reqAvgTimeCost { + baseCost := maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost) + reqCost := maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost) + if ct.minBufLimit != 0 { + // if minBufLimit is set then always enforce maximum request cost <= minBufLimit + maxCost := baseCost + reqCost*minBufferReqAmount[code] + if maxCost > ct.minBufLimit { + mul := 0.999 * float64(ct.minBufLimit) / float64(maxCost) + baseCost = uint64(float64(baseCost) * mul) + reqCost = uint64(float64(reqCost) * mul) + } + } + list = append(list, requestCostListItem{ MsgCode: code, - BaseCost: maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost), - ReqCost: maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost), + BaseCost: baseCost, + ReqCost: reqCost, }) } return list } type gfUpdate struct { - avgTime, servingTime float64 + avgTimeCost, servingTime float64 } // gfLoop starts an event loop which updates the global cost factor which is @@ -178,45 +218,74 @@ type gfUpdate struct { // total allowed serving time per second but nominated in cost units, should // also be scaled with the cost factor and is also updated by this loop. func (ct *costTracker) gfLoop() { - var gfUsage, gfSum, gfWeight float64 + var gfLog, recentTime, recentAvg float64 lastUpdate := mclock.Now() expUpdate := lastUpdate data, _ := ct.db.Get([]byte(gfDbKey)) - if len(data) == 16 { - gfSum = math.Float64frombits(binary.BigEndian.Uint64(data[0:8])) - gfWeight = math.Float64frombits(binary.BigEndian.Uint64(data[8:16])) + if len(data) == 8 { + gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:])) } - if gfWeight < float64(gfInitWeight) { - gfSum = float64(gfInitWeight) - gfWeight = float64(gfInitWeight) - } - gf := gfSum / gfWeight + gf := math.Exp(gfLog) ct.gf = gf + totalRecharge := ct.utilTarget * gf ct.gfUpdateCh = make(chan gfUpdate, 100) + threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / 1000000 go func() { + saveCostFactor := func() { + var data [8]byte + binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog)) + ct.db.Put([]byte(gfDbKey), data[:]) + log.Debug("global cost factor saved", "value", gf) + } + saveTicker := time.NewTicker(time.Minute * 10) + for { select { case r := <-ct.gfUpdateCh: now := mclock.Now() - max := r.servingTime * gf - if r.avgTime > max { - max = r.avgTime + if ct.logRelCost != nil && r.avgTimeCost > 1e-20 { + ct.logRelCost.Update(r.servingTime * gf / r.avgTimeCost) + } + if r.servingTime > 1000000000 { + ct.logger.Event(fmt.Sprintf("Very long servingTime = %f avgTimeCost = %f costFactor = %f", r.servingTime, r.avgTimeCost, gf)) } dt := float64(now - expUpdate) expUpdate = now - gfUsage = gfUsage*math.Exp(-dt/float64(gfUsageTC)) + max*1000000/float64(gfUsageTC) + exp := math.Exp(-dt / float64(gfUsageTC)) + // calculate gf correction until now, based on previous values + var gfCorr float64 + max := recentTime + if recentAvg > max { + max = recentAvg + } + // we apply continuous correction when MAX(recentTime, recentAvg) > threshold + if max > threshold { + // calculate correction time between last expUpdate and now + if max*exp >= threshold { + gfCorr = dt + } else { + gfCorr = math.Log(max/threshold) * float64(gfUsageTC) + } + // calculate log(gf) correction with the right direction and time constant + if recentTime > recentAvg { + // drop gf if actual serving times are larger than average estimates + gfCorr /= -float64(gfDropTC) + } else { + // raise gf if actual serving times are smaller than average estimates + gfCorr /= float64(gfRaiseTC) + } + } + // update recent cost values with current request + recentTime = recentTime*exp + r.servingTime + recentAvg = recentAvg*exp + r.avgTimeCost/gf - if gfUsage >= gfUsageThreshold*ct.utilTarget*gf { - gfSum += r.avgTime - gfWeight += r.servingTime + if gfCorr != 0 { + gfLog += gfCorr + gf = math.Exp(gfLog) if time.Duration(now-lastUpdate) > time.Second { - gf = gfSum / gfWeight - if gfWeight >= float64(gfMaxWeight) { - gfSum = gf * float64(gfMaxWeight) - gfWeight = float64(gfMaxWeight) - } + totalRecharge = ct.utilTarget * gf lastUpdate = now ct.gfLock.Lock() ct.gf = gf @@ -224,19 +293,22 @@ func (ct *costTracker) gfLoop() { ct.gfLock.Unlock() if ch != nil { select { - case ct.totalRechargeCh <- uint64(ct.utilTarget * gf): + case ct.totalRechargeCh <- uint64(totalRecharge): default: } } - log.Debug("global cost factor updated", "gf", gf, "weight", time.Duration(gfWeight)) + log.Debug("global cost factor updated", "gf", gf) } } + ct.logRecentTime.Update(recentTime) + ct.logRecentAvg.Update(recentAvg) + ct.logTotalRecharge.Update(totalRecharge) + + case <-saveTicker.C: + saveCostFactor() + case stopCh := <-ct.stopCh: - var data [16]byte - binary.BigEndian.PutUint64(data[0:8], math.Float64bits(gfSum)) - binary.BigEndian.PutUint64(data[8:16], math.Float64bits(gfWeight)) - ct.db.Put([]byte(gfDbKey), data[:]) - log.Debug("global cost factor saved", "sum", time.Duration(gfSum), "weight", time.Duration(gfWeight)) + saveCostFactor() close(stopCh) return } @@ -275,15 +347,15 @@ func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 { // average estimate statistics func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) { avg := reqAvgTimeCost[code] - avgTime := avg.baseCost + amount*avg.reqCost + avgTimeCost := avg.baseCost + amount*avg.reqCost select { - case ct.gfUpdateCh <- gfUpdate{float64(avgTime), float64(servingTime)}: + case ct.gfUpdateCh <- gfUpdate{float64(avgTimeCost), float64(servingTime)}: default: } if makeCostStats { realCost <<= 4 l := 0 - for l < 9 && realCost > avgTime { + for l < 9 && realCost > avgTimeCost { l++ realCost >>= 1 } @@ -339,8 +411,8 @@ type ( } ) -// getCost calculates the estimated cost for a given request type and amount -func (table requestCostTable) getCost(code, amount uint64) uint64 { +// getMaxCost calculates the estimated cost for a given request type and amount +func (table requestCostTable) getMaxCost(code, amount uint64) uint64 { costs := table[code] return costs.baseCost + amount*costs.reqCost } @@ -360,7 +432,7 @@ func (list RequestCostList) decode(protocolLength uint64) requestCostTable { } // testCostList returns a dummy request cost list used by tests -func testCostList() RequestCostList { +func testCostList(testCost uint64) RequestCostList { cl := make(RequestCostList, len(reqAvgTimeCost)) var max uint64 for code := range reqAvgTimeCost { @@ -372,7 +444,7 @@ func testCostList() RequestCostList { for code := uint64(0); code <= max; code++ { if _, ok := reqAvgTimeCost[code]; ok { cl[i].MsgCode = code - cl[i].BaseCost = 0 + cl[i].BaseCost = testCost cl[i].ReqCost = 0 i++ } diff --git a/les/csvlogger/csvlogger.go b/les/csvlogger/csvlogger.go new file mode 100644 index 000000000..9a4093cb9 --- /dev/null +++ b/les/csvlogger/csvlogger.go @@ -0,0 +1,227 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package csvlogger + +import ( + "fmt" + "os" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/log" +) + +// Logger is a metrics/events logger that writes logged values and events into a comma separated file +type Logger struct { + file *os.File + started mclock.AbsTime + channels []*Channel + period time.Duration + stopCh, stopped chan struct{} + storeCh chan string + eventHeader string +} + +// NewLogger creates a new Logger +func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger { + if fileName == "" { + return nil + } + f, err := os.Create(fileName) + if err != nil { + log.Error("Error creating log file", "name", fileName, "error", err) + return nil + } + return &Logger{ + file: f, + period: updatePeriod, + stopCh: make(chan struct{}), + storeCh: make(chan string, 1), + eventHeader: eventHeader, + } +} + +// NewChannel creates a new value logger channel that writes values in a single +// column. If the relative change of the value is bigger than the given threshold +// then a new line is added immediately (threshold can also be 0). +func (l *Logger) NewChannel(name string, threshold float64) *Channel { + if l == nil { + return nil + } + c := &Channel{ + logger: l, + name: name, + threshold: threshold, + } + l.channels = append(l.channels, c) + return c +} + +// NewMinMaxChannel creates a new value logger channel that writes the minimum and +// maximum of the tracked value in two columns. It never triggers adding a new line. +// If zeroDefault is true then 0 is written to both min and max columns if no update +// was given during the last period. If it is false then the last update will appear +// in both columns. +func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel { + if l == nil { + return nil + } + c := &Channel{ + logger: l, + name: name, + minmax: true, + mmZeroDefault: zeroDefault, + } + l.channels = append(l.channels, c) + return c +} + +func (l *Logger) store(event string) { + s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000) + for _, ch := range l.channels { + s += ", " + ch.store() + } + if event != "" { + s += ", " + event + } + l.file.WriteString(s + "\n") +} + +// Start writes the header line and starts the logger +func (l *Logger) Start() { + if l == nil { + return + } + l.started = mclock.Now() + s := "Time" + for _, ch := range l.channels { + s += ", " + ch.header() + } + if l.eventHeader != "" { + s += ", " + l.eventHeader + } + l.file.WriteString(s + "\n") + go func() { + timer := time.NewTimer(l.period) + for { + select { + case <-timer.C: + l.store("") + timer.Reset(l.period) + case event := <-l.storeCh: + l.store(event) + if !timer.Stop() { + <-timer.C + } + timer.Reset(l.period) + case <-l.stopCh: + close(l.stopped) + return + } + } + }() +} + +// Stop stops the logger and closes the file +func (l *Logger) Stop() { + if l == nil { + return + } + l.stopped = make(chan struct{}) + close(l.stopCh) + <-l.stopped + l.file.Close() +} + +// Event immediately adds a new line and adds the given event string in the last column +func (l *Logger) Event(event string) { + if l == nil { + return + } + select { + case l.storeCh <- event: + case <-l.stopCh: + } +} + +// Channel represents a logger channel tracking a single value +type Channel struct { + logger *Logger + lock sync.Mutex + name string + threshold, storeMin, storeMax, lastValue, min, max float64 + minmax, mmSet, mmZeroDefault bool +} + +// Update updates the tracked value +func (lc *Channel) Update(value float64) { + if lc == nil { + return + } + lc.lock.Lock() + defer lc.lock.Unlock() + + lc.lastValue = value + if lc.minmax { + if value > lc.max || !lc.mmSet { + lc.max = value + } + if value < lc.min || !lc.mmSet { + lc.min = value + } + lc.mmSet = true + } else { + if value < lc.storeMin || value > lc.storeMax { + select { + case lc.logger.storeCh <- "": + default: + } + } + } +} + +func (lc *Channel) store() (s string) { + lc.lock.Lock() + defer lc.lock.Unlock() + + if lc.minmax { + s = fmt.Sprintf("%g, %g", lc.min, lc.max) + lc.mmSet = false + if lc.mmZeroDefault { + lc.min = 0 + } else { + lc.min = lc.lastValue + } + lc.max = lc.min + } else { + s = fmt.Sprintf("%g", lc.lastValue) + lc.storeMin = lc.lastValue * (1 - lc.threshold) + lc.storeMax = lc.lastValue * (1 + lc.threshold) + if lc.lastValue < 0 { + lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin + } + } + return +} + +func (lc *Channel) header() string { + if lc.minmax { + return lc.name + " (min), " + lc.name + " (max)" + } + return lc.name +} diff --git a/les/distributor.go b/les/distributor.go index 1de267f27..9235adc03 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -62,9 +62,10 @@ type distReq struct { canSend func(distPeer) bool request func(distPeer) func() - reqOrder uint64 - sentChn chan distPeer - element *list.Element + reqOrder uint64 + sentChn chan distPeer + element *list.Element + waitForPeers mclock.AbsTime } // newRequestDistributor creates a new request distributor @@ -106,7 +107,11 @@ func (d *requestDistributor) registerTestPeer(p distPeer) { // distMaxWait is the maximum waiting time after which further necessary waiting // times are recalculated based on new feedback from the servers -const distMaxWait = time.Millisecond * 10 +const distMaxWait = time.Millisecond * 50 + +// waitForPeers is the time window in which a request does not fail even if it +// has no suitable peers to send to at the moment +const waitForPeers = time.Second * 3 // main event loop func (d *requestDistributor) loop() { @@ -179,8 +184,6 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( - bestPeer distPeer - bestReq *distReq bestWait time.Duration sel *weightedRandomSelect ) @@ -188,9 +191,18 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { d.peerLock.RLock() defer d.peerLock.RUnlock() - for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + peerCount := len(d.peers) + for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false + now := d.clock.Now() + if req.waitForPeers > now { + canSend = true + wait := time.Duration(req.waitForPeers - now) + if bestWait == 0 || wait < bestWait { + bestWait = wait + } + } for peer := range d.peers { if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true @@ -202,9 +214,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { } sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) } else { - if bestReq == nil || wait < bestWait { - bestPeer = peer - bestReq = req + if bestWait == 0 || wait < bestWait { bestWait = wait } } @@ -223,7 +233,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { c := sel.choose().(selectPeerItem) return c.peer, c.req, 0 } - return bestPeer, bestReq, bestWait + return nil, nil, bestWait } // queue adds a request to the distribution queue, returns a channel where the @@ -237,6 +247,7 @@ func (d *requestDistributor) queue(r *distReq) chan distPeer { if r.reqOrder == 0 { d.lastReqOrder++ r.reqOrder = d.lastReqOrder + r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers) } back := d.reqQueue.Back() diff --git a/les/execqueue.go b/les/execqueue.go index 614721bf0..e0c88a990 100644 --- a/les/execqueue.go +++ b/les/execqueue.go @@ -44,7 +44,7 @@ func (q *execQueue) loop() { func (q *execQueue) waitNext(drop bool) (f func()) { q.mu.Lock() - if drop { + if drop && len(q.funcs) > 0 { // Remove the function that just executed. We do this here instead of when // dequeuing so len(q.funcs) includes the function that is running. q.funcs = append(q.funcs[:0], q.funcs[1:]...) @@ -84,6 +84,13 @@ func (q *execQueue) queue(f func()) bool { return ok } +// clear drops all queued functions +func (q *execQueue) clear() { + q.mu.Lock() + q.funcs = q.funcs[:0] + q.mu.Unlock() +} + // quit stops the exec queue. // quit waits for the current execution to finish before returning. func (q *execQueue) quit() { diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go index c03f673b2..490013677 100644 --- a/les/flowcontrol/control.go +++ b/les/flowcontrol/control.go @@ -56,11 +56,12 @@ type scheduledUpdate struct { // (used in server mode only) type ClientNode struct { params ServerParams - bufValue uint64 + bufValue int64 lastTime mclock.AbsTime updateSchedule []scheduledUpdate sumCost uint64 // sum of req costs received from this client accepted map[uint64]uint64 // value = sumCost after accepting the given req + connected bool lock sync.Mutex cm *ClientManager log *logger @@ -70,11 +71,12 @@ type ClientNode struct { // NewClientNode returns a new ClientNode func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode { node := &ClientNode{ - cm: cm, - params: params, - bufValue: params.BufLimit, - lastTime: cm.clock.Now(), - accepted: make(map[uint64]uint64), + cm: cm, + params: params, + bufValue: int64(params.BufLimit), + lastTime: cm.clock.Now(), + accepted: make(map[uint64]uint64), + connected: true, } if keepLogs > 0 { node.log = newLogger(keepLogs) @@ -85,9 +87,55 @@ func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode { // Disconnect should be called when a client is disconnected func (node *ClientNode) Disconnect() { + node.lock.Lock() + defer node.lock.Unlock() + + node.connected = false node.cm.disconnect(node) } +// BufferStatus returns the current buffer value and limit +func (node *ClientNode) BufferStatus() (uint64, uint64) { + node.lock.Lock() + defer node.lock.Unlock() + + if !node.connected { + return 0, 0 + } + now := node.cm.clock.Now() + node.update(now) + node.cm.updateBuffer(node, 0, now) + bv := node.bufValue + if bv < 0 { + bv = 0 + } + return uint64(bv), node.params.BufLimit +} + +// OneTimeCost subtracts the given amount from the node's buffer. +// +// Note: this call can take the buffer into the negative region internally. +// In this case zero buffer value is returned by exported calls and no requests +// are accepted. +func (node *ClientNode) OneTimeCost(cost uint64) { + node.lock.Lock() + defer node.lock.Unlock() + + now := node.cm.clock.Now() + node.update(now) + node.bufValue -= int64(cost) + node.cm.updateBuffer(node, -int64(cost), now) +} + +// Freeze notifies the client manager about a client freeze event in which case +// the total capacity allowance is slightly reduced. +func (node *ClientNode) Freeze() { + node.lock.Lock() + frozenCap := node.params.MinRecharge + node.lock.Unlock() + node.cm.reduceTotalCapacity(frozenCap) +} + // update recalculates the buffer value at a specified time while also performing // scheduled flow control parameter updates if necessary func (node *ClientNode) update(now mclock.AbsTime) { @@ -105,9 +153,9 @@ func (node *ClientNode) recalcBV(now mclock.AbsTime) { if now < node.lastTime { dt = 0 } - node.bufValue += node.params.MinRecharge * dt / uint64(fcTimeConst) - if node.bufValue > node.params.BufLimit { - node.bufValue = node.params.BufLimit + node.bufValue += int64(node.params.MinRecharge * dt / uint64(fcTimeConst)) + if node.bufValue > int64(node.params.BufLimit) { + node.bufValue = int64(node.params.BufLimit) } if node.log != nil { node.log.add(now, fmt.Sprintf("updated bv=%d MRR=%d BufLimit=%d", node.bufValue, node.params.MinRecharge, node.params.BufLimit)) @@ -139,11 +187,11 @@ func (node *ClientNode) UpdateParams(params ServerParams) { // updateParams updates the flow control parameters of the node func (node *ClientNode) updateParams(params ServerParams, now mclock.AbsTime) { - diff := params.BufLimit - node.params.BufLimit - if int64(diff) > 0 { + diff := int64(params.BufLimit - node.params.BufLimit) + if diff > 0 { node.bufValue += diff - } else if node.bufValue > params.BufLimit { - node.bufValue = params.BufLimit + } else if node.bufValue > int64(params.BufLimit) { + node.bufValue = int64(params.BufLimit) } node.cm.updateParams(node, params, now) } @@ -157,14 +205,14 @@ func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bo now := node.cm.clock.Now() node.update(now) - if maxCost > node.bufValue { + if int64(maxCost) > node.bufValue { if node.log != nil { node.log.add(now, fmt.Sprintf("rejected reqID=%d bv=%d maxCost=%d", reqID, node.bufValue, maxCost)) node.log.dump(now) } - return false, maxCost - node.bufValue, 0 + return false, maxCost - uint64(node.bufValue), 0 } - node.bufValue -= maxCost + node.bufValue -= int64(maxCost) node.sumCost += maxCost if node.log != nil { node.log.add(now, fmt.Sprintf("accepted reqID=%d bv=%d maxCost=%d sumCost=%d", reqID, node.bufValue, maxCost, node.sumCost)) @@ -174,19 +222,22 @@ func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bo } // RequestProcessed should be called when the request has been processed -func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) (bv uint64) { +func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64 { node.lock.Lock() defer node.lock.Unlock() now := node.cm.clock.Now() node.update(now) node.cm.processed(node, maxCost, realCost, now) - bv = node.bufValue + node.sumCost - node.accepted[index] + bv := node.bufValue + int64(node.sumCost-node.accepted[index]) if node.log != nil { node.log.add(now, fmt.Sprintf("processed reqID=%d bv=%d maxCost=%d realCost=%d sumCost=%d oldSumCost=%d reportedBV=%d", reqID, node.bufValue, maxCost, realCost, node.sumCost, node.accepted[index], bv)) } delete(node.accepted, index) - return + if bv < 0 { + return 0 + } + return uint64(bv) } // ServerNode is the flow control system's representation of a server @@ -345,6 +396,28 @@ func (node *ServerNode) ReceivedReply(reqID, bv uint64) { } } +// ResumeFreeze cleans all pending requests and sets the buffer estimate to the +// reported value after resuming from a frozen state +func (node *ServerNode) ResumeFreeze(bv uint64) { + node.lock.Lock() + defer node.lock.Unlock() + + for reqID := range node.pending { + delete(node.pending, reqID) + } + now := node.clock.Now() + node.recalcBLE(now) + if bv > node.params.BufLimit { + bv = node.params.BufLimit + } + node.bufEstimate = bv + node.bufRecharge = node.bufEstimate < node.params.BufLimit + node.lastTime = now + if node.log != nil { + node.log.add(now, fmt.Sprintf("unfreeze bv=%d sumCost=%d", bv, node.sumCost)) + } +} + // DumpLogs dumps the event log if logging is used func (node *ServerNode) DumpLogs() { node.lock.Lock() diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go index 532e6a405..68f1a47c9 100644 --- a/les/flowcontrol/manager.go +++ b/les/flowcontrol/manager.go @@ -47,9 +47,9 @@ type cmNodeFields struct { const FixedPointMultiplier = 1000000 var ( - capFactorDropTC = 1 / float64(time.Second*10) // time constant for dropping the capacity factor - capFactorRaiseTC = 1 / float64(time.Hour) // time constant for raising the capacity factor - capFactorRaiseThreshold = 0.75 // connected / total capacity ratio threshold for raising the capacity factor + capacityDropFactor = 0.1 + capacityRaiseTC = 1 / (3 * float64(time.Hour)) // time constant for raising the capacity factor + capacityRaiseThresholdRatio = 1.125 // total/connected capacity ratio threshold for raising the capacity factor ) // ClientManager controls the capacity assigned to the clients of a server. @@ -61,10 +61,14 @@ type ClientManager struct { clock mclock.Clock lock sync.Mutex enabledCh chan struct{} + stop chan chan struct{} curve PieceWiseLinear sumRecharge, totalRecharge, totalConnected uint64 - capLogFactor, totalCapacity float64 + logTotalCap, totalCapacity float64 + logTotalCapRaiseLimit float64 + minLogTotalCap, maxLogTotalCap float64 + capacityRaiseThreshold uint64 capLastUpdate mclock.AbsTime totalCapacityCh chan uint64 @@ -106,13 +110,35 @@ func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager clock: clock, rcQueue: prque.New(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }), capLastUpdate: clock.Now(), + stop: make(chan chan struct{}), } if curve != nil { cm.SetRechargeCurve(curve) } + go func() { + // regularly recalculate and update total capacity + for { + select { + case <-time.After(time.Minute): + cm.lock.Lock() + cm.updateTotalCapacity(cm.clock.Now(), true) + cm.lock.Unlock() + case stop := <-cm.stop: + close(stop) + return + } + } + }() return cm } +// Stop stops the client manager +func (cm *ClientManager) Stop() { + stop := make(chan struct{}) + cm.stop <- stop + <-stop +} + // SetRechargeCurve updates the recharge curve func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear) { cm.lock.Lock() @@ -120,13 +146,29 @@ func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear) { now := cm.clock.Now() cm.updateRecharge(now) - cm.updateCapFactor(now, false) cm.curve = curve if len(curve) > 0 { cm.totalRecharge = curve[len(curve)-1].Y } else { cm.totalRecharge = 0 } +} + +// SetCapacityRaiseThreshold sets a threshold value used for raising capFactor. +// Either if the difference between total allowed and connected capacity is less +// than this threshold or if their ratio is less than capacityRaiseThresholdRatio +// then capFactor is allowed to slowly raise. +func (cm *ClientManager) SetCapacityLimits(min, max, raiseThreshold uint64) { + if min < 1 { + min = 1 + } + cm.minLogTotalCap = math.Log(float64(min)) + if max < 1 { + max = 1 + } + cm.maxLogTotalCap = math.Log(float64(max)) + cm.logTotalCap = cm.maxLogTotalCap + cm.capacityRaiseThreshold = raiseThreshold cm.refreshCapacity() } @@ -141,8 +183,9 @@ func (cm *ClientManager) connect(node *ClientNode) { node.corrBufValue = int64(node.params.BufLimit) node.rcLastIntValue = cm.rcLastIntValue node.queueIndex = -1 - cm.updateCapFactor(now, true) + cm.updateTotalCapacity(now, true) cm.totalConnected += node.params.MinRecharge + cm.updateRaiseLimit() } // disconnect should be called when a client is disconnected @@ -152,8 +195,9 @@ func (cm *ClientManager) disconnect(node *ClientNode) { now := cm.clock.Now() cm.updateRecharge(cm.clock.Now()) - cm.updateCapFactor(now, true) + cm.updateTotalCapacity(now, true) cm.totalConnected -= node.params.MinRecharge + cm.updateRaiseLimit() } // accepted is called when a request with given maximum cost is accepted. @@ -174,18 +218,24 @@ func (cm *ClientManager) accepted(node *ClientNode, maxCost uint64, now mclock.A // // Note: processed should always be called for all accepted requests func (cm *ClientManager) processed(node *ClientNode, maxCost, realCost uint64, now mclock.AbsTime) { - cm.lock.Lock() - defer cm.lock.Unlock() - if realCost > maxCost { realCost = maxCost } - cm.updateNodeRc(node, int64(maxCost-realCost), &node.params, now) - if uint64(node.corrBufValue) > node.bufValue { + cm.updateBuffer(node, int64(maxCost-realCost), now) +} + +// updateBuffer recalulates the corrected buffer value, adds the given value to it +// and updates the node's actual buffer value if possible +func (cm *ClientManager) updateBuffer(node *ClientNode, add int64, now mclock.AbsTime) { + cm.lock.Lock() + defer cm.lock.Unlock() + + cm.updateNodeRc(node, add, &node.params, now) + if node.corrBufValue > node.bufValue { if node.log != nil { node.log.add(now, fmt.Sprintf("corrected bv=%d oldBv=%d", node.corrBufValue, node.bufValue)) } - node.bufValue = uint64(node.corrBufValue) + node.bufValue = node.corrBufValue } } @@ -195,11 +245,30 @@ func (cm *ClientManager) updateParams(node *ClientNode, params ServerParams, now defer cm.lock.Unlock() cm.updateRecharge(now) - cm.updateCapFactor(now, true) + cm.updateTotalCapacity(now, true) cm.totalConnected += params.MinRecharge - node.params.MinRecharge + cm.updateRaiseLimit() cm.updateNodeRc(node, 0, ¶ms, now) } +// updateRaiseLimit recalculates the limiting value until which logTotalCap +// can be raised when no client freeze events occur +func (cm *ClientManager) updateRaiseLimit() { + if cm.capacityRaiseThreshold == 0 { + cm.logTotalCapRaiseLimit = 0 + return + } + limit := float64(cm.totalConnected + cm.capacityRaiseThreshold) + limit2 := float64(cm.totalConnected) * capacityRaiseThresholdRatio + if limit2 > limit { + limit = limit2 + } + if limit < 1 { + limit = 1 + } + cm.logTotalCapRaiseLimit = math.Log(limit) +} + // updateRecharge updates the recharge integrator and checks the recharge queue // for nodes with recently filled buffers func (cm *ClientManager) updateRecharge(now mclock.AbsTime) { @@ -208,9 +277,15 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) { // updating is done in multiple steps if node buffers are filled and sumRecharge // is decreased before the given target time for cm.sumRecharge > 0 { - bonusRatio := cm.curve.ValueAt(cm.sumRecharge) / float64(cm.sumRecharge) - if bonusRatio < 1 { - bonusRatio = 1 + sumRecharge := cm.sumRecharge + if sumRecharge > cm.totalRecharge { + sumRecharge = cm.totalRecharge + } + bonusRatio := float64(1) + v := cm.curve.ValueAt(sumRecharge) + s := float64(sumRecharge) + if v > s && s > 0 { + bonusRatio = v / s } dt := now - lastUpdate // fetch the client that finishes first @@ -228,7 +303,6 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) { // finished recharging, update corrBufValue and sumRecharge if necessary and do next step if rcqNode.corrBufValue < int64(rcqNode.params.BufLimit) { rcqNode.corrBufValue = int64(rcqNode.params.BufLimit) - cm.updateCapFactor(lastUpdate, true) cm.sumRecharge -= rcqNode.params.MinRecharge } cm.rcLastIntValue = rcqNode.rcFullIntValue @@ -249,9 +323,6 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve node.rcLastIntValue = cm.rcLastIntValue } node.corrBufValue += bvc - if node.corrBufValue < 0 { - node.corrBufValue = 0 - } diff := int64(params.BufLimit - node.params.BufLimit) if diff > 0 { node.corrBufValue += diff @@ -261,15 +332,14 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve node.corrBufValue = int64(params.BufLimit) isFull = true } - sumRecharge := cm.sumRecharge if !wasFull { - sumRecharge -= node.params.MinRecharge + cm.sumRecharge -= node.params.MinRecharge } if params != &node.params { node.params = *params } if !isFull { - sumRecharge += node.params.MinRecharge + cm.sumRecharge += node.params.MinRecharge if node.queueIndex != -1 { cm.rcQueue.Remove(node.queueIndex) } @@ -277,63 +347,54 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve node.rcFullIntValue = cm.rcLastIntValue + (int64(node.params.BufLimit)-node.corrBufValue)*FixedPointMultiplier/int64(node.params.MinRecharge) cm.rcQueue.Push(node, -node.rcFullIntValue) } - if sumRecharge != cm.sumRecharge { - cm.updateCapFactor(now, true) - cm.sumRecharge = sumRecharge - } +} +// reduceTotalCapacity reduces the total capacity allowance in case of a client freeze event +func (cm *ClientManager) reduceTotalCapacity(frozenCap uint64) { + cm.lock.Lock() + defer cm.lock.Unlock() + + ratio := float64(1) + if frozenCap < cm.totalConnected { + ratio = float64(frozenCap) / float64(cm.totalConnected) + } + now := cm.clock.Now() + cm.updateTotalCapacity(now, false) + cm.logTotalCap -= capacityDropFactor * ratio + if cm.logTotalCap < cm.minLogTotalCap { + cm.logTotalCap = cm.minLogTotalCap + } + cm.updateTotalCapacity(now, true) } -// updateCapFactor updates the total capacity factor. The capacity factor allows +// updateTotalCapacity updates the total capacity factor. The capacity factor allows // the total capacity of the system to go over the allowed total recharge value -// if the sum of momentarily recharging clients only exceeds the total recharge -// allowance in a very small fraction of time. -// The capacity factor is dropped quickly (with a small time constant) if sumRecharge -// exceeds totalRecharge. It is raised slowly (with a large time constant) if most -// of the total capacity is used by connected clients (totalConnected is larger than -// totalCapacity*capFactorRaiseThreshold) and sumRecharge stays under -// totalRecharge*totalConnected/totalCapacity. -func (cm *ClientManager) updateCapFactor(now mclock.AbsTime, refresh bool) { - if cm.totalRecharge == 0 { - return - } +// if clients go to frozen state sufficiently rarely. +// The capacity factor is dropped instantly by a small amount if a clients is frozen. +// It is raised slowly (with a large time constant) if the total connected capacity +// is close to the total allowed amount and no clients are frozen. +func (cm *ClientManager) updateTotalCapacity(now mclock.AbsTime, refresh bool) { dt := now - cm.capLastUpdate cm.capLastUpdate = now - var d float64 - if cm.sumRecharge > cm.totalRecharge { - d = (1 - float64(cm.sumRecharge)/float64(cm.totalRecharge)) * capFactorDropTC - } else { - totalConnected := float64(cm.totalConnected) - var connRatio float64 - if totalConnected < cm.totalCapacity { - connRatio = totalConnected / cm.totalCapacity - } else { - connRatio = 1 - } - if connRatio > capFactorRaiseThreshold { - sumRecharge := float64(cm.sumRecharge) - limit := float64(cm.totalRecharge) * connRatio - if sumRecharge < limit { - d = (1 - sumRecharge/limit) * (connRatio - capFactorRaiseThreshold) * (1 / (1 - capFactorRaiseThreshold)) * capFactorRaiseTC - } + if cm.logTotalCap < cm.logTotalCapRaiseLimit { + cm.logTotalCap += capacityRaiseTC * float64(dt) + if cm.logTotalCap > cm.logTotalCapRaiseLimit { + cm.logTotalCap = cm.logTotalCapRaiseLimit } } - if d != 0 { - cm.capLogFactor += d * float64(dt) - if cm.capLogFactor < 0 { - cm.capLogFactor = 0 - } - if refresh { - cm.refreshCapacity() - } + if cm.logTotalCap > cm.maxLogTotalCap { + cm.logTotalCap = cm.maxLogTotalCap + } + if refresh { + cm.refreshCapacity() } } // refreshCapacity recalculates the total capacity value and sends an update to the subscription // channel if the relative change of the value since the last update is more than 0.1 percent func (cm *ClientManager) refreshCapacity() { - totalCapacity := float64(cm.totalRecharge) * math.Exp(cm.capLogFactor) + totalCapacity := math.Exp(cm.logTotalCap) if totalCapacity >= cm.totalCapacity*0.999 && totalCapacity <= cm.totalCapacity*1.001 { return } diff --git a/les/flowcontrol/manager_test.go b/les/flowcontrol/manager_test.go index 4e7746d40..b32ec5599 100644 --- a/les/flowcontrol/manager_test.go +++ b/les/flowcontrol/manager_test.go @@ -63,7 +63,7 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random } m := NewClientManager(PieceWiseLinear{{0, totalCapacity}}, clock) for _, n := range nodes { - n.bufLimit = n.capacity * 6000 //uint64(2000+rand.Intn(10000)) + n.bufLimit = n.capacity * 6000 n.node = NewClientNode(m, ServerParams{BufLimit: n.bufLimit, MinRecharge: n.capacity}) } maxNodes := make([]int, maxCapacityNodes) @@ -73,6 +73,7 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random maxNodes[i] = rand.Intn(nodeCount) } + var sendCount int for i := 0; i < testLength; i++ { now := clock.Now() for _, idx := range maxNodes { @@ -83,13 +84,15 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random maxNodes[rand.Intn(maxCapacityNodes)] = rand.Intn(nodeCount) } - sendCount := randomSend - for sendCount > 0 { + sendCount += randomSend + failCount := randomSend * 10 + for sendCount > 0 && failCount > 0 { if nodes[rand.Intn(nodeCount)].send(t, now) { sendCount-- + } else { + failCount-- } } - clock.Run(time.Millisecond) } @@ -117,7 +120,6 @@ func (n *testNode) send(t *testing.T, now mclock.AbsTime) bool { if bv < testMaxCost { n.waitUntil = now + mclock.AbsTime((testMaxCost-bv)*1001000/n.capacity) } - //n.waitUntil = now + mclock.AbsTime(float64(testMaxCost)*1001000/float64(n.capacity)*(1-float64(bv)/float64(n.bufLimit))) n.totalCost += rcost return true } diff --git a/les/freeclient.go b/les/freeclient.go index d859337c2..958b7daa7 100644 --- a/les/freeclient.go +++ b/les/freeclient.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) @@ -52,6 +53,8 @@ type freeClientPool struct { connectedLimit, totalLimit int freeClientCap uint64 + logger *csvlogger.Logger + logTotalFreeConn *csvlogger.Channel addressMap map[string]*freeClientPoolEntry connPool, disconnPool *prque.Prque @@ -66,16 +69,18 @@ const ( ) // newFreeClientPool creates a new free client pool -func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool { +func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string), metricsLogger, eventLogger *csvlogger.Logger) *freeClientPool { pool := &freeClientPool{ - db: db, - clock: clock, - addressMap: make(map[string]*freeClientPoolEntry), - connPool: prque.New(poolSetIndex), - disconnPool: prque.New(poolSetIndex), - freeClientCap: freeClientCap, - totalLimit: totalLimit, - removePeer: removePeer, + db: db, + clock: clock, + addressMap: make(map[string]*freeClientPoolEntry), + connPool: prque.New(poolSetIndex), + disconnPool: prque.New(poolSetIndex), + freeClientCap: freeClientCap, + totalLimit: totalLimit, + logger: eventLogger, + logTotalFreeConn: metricsLogger.NewChannel("totalFreeConn", 0), + removePeer: removePeer, } pool.loadFromDb() return pool @@ -88,10 +93,25 @@ func (f *freeClientPool) stop() { f.lock.Unlock() } +// freeClientId returns a string identifier for the peer. Multiple peers with the +// same identifier can not be in the free client pool simultaneously. +func freeClientId(p *peer) string { + if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { + if addr.IP.IsLoopback() { + // using peer id instead of loopback ip address allows multiple free + // connections from local machine to own server + return p.id + } else { + return addr.IP.String() + } + } + return "" +} + // registerPeer implements clientPool func (f *freeClientPool) registerPeer(p *peer) { - if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { - if !f.connect(addr.IP.String(), p.id) { + if freeId := freeClientId(p); freeId != "" { + if !f.connect(freeId, p.id) { f.removePeer(p.id) } } @@ -107,7 +127,9 @@ func (f *freeClientPool) connect(address, id string) bool { return false } + f.logger.Event("freeClientPool: connecting from " + address + ", " + id) if f.connectedLimit == 0 { + f.logger.Event("freeClientPool: rejected, " + id) log.Debug("Client rejected", "address", address) return false } @@ -119,6 +141,7 @@ func (f *freeClientPool) connect(address, id string) bool { f.addressMap[address] = e } else { if e.connected { + f.logger.Event("freeClientPool: already connected, " + id) log.Debug("Client already connected", "address", address) return false } @@ -131,9 +154,11 @@ func (f *freeClientPool) connect(address, id string) bool { if e.linUsage+int64(connectedBias)-i.linUsage < 0 { // kick it out and accept the new client f.dropClient(i, now) + f.logger.Event("freeClientPool: kicked out, " + i.id) } else { // keep the old client and reject the new one f.connPool.Push(i, i.linUsage) + f.logger.Event("freeClientPool: rejected, " + id) log.Debug("Client rejected", "address", address) return false } @@ -142,17 +167,19 @@ func (f *freeClientPool) connect(address, id string) bool { e.connected = true e.id = id f.connPool.Push(e, e.linUsage) + f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap)) if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { f.disconnPool.Pop() } + f.logger.Event("freeClientPool: accepted, " + id) log.Debug("Client accepted", "address", address) return true } // unregisterPeer implements clientPool func (f *freeClientPool) unregisterPeer(p *peer) { - if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { - f.disconnect(addr.IP.String()) + if freeId := freeClientId(p); freeId != "" { + f.disconnect(freeId) } } @@ -174,9 +201,11 @@ func (f *freeClientPool) disconnect(address string) { } f.connPool.Remove(e.index) + f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap)) f.calcLogUsage(e, now) e.connected = false f.disconnPool.Push(e, -e.logUsage) + f.logger.Event("freeClientPool: disconnected, " + e.id) log.Debug("Client disconnected", "address", address) } @@ -194,6 +223,7 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) { for f.connPool.Size() > f.connectedLimit { i := f.connPool.PopItem().(*freeClientPoolEntry) f.dropClient(i, now) + f.logger.Event("freeClientPool: setLimits kicked out, " + i.id) } } @@ -201,6 +231,7 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) { // disconnected pool func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) { f.connPool.Remove(i.index) + f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap)) f.calcLogUsage(i, now) i.connected = false f.disconnPool.Push(i, -i.logUsage) diff --git a/les/freeclient_test.go b/les/freeclient_test.go index 191822264..5a58a6c1c 100644 --- a/les/freeclient_test.go +++ b/les/freeclient_test.go @@ -61,7 +61,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { } disconnCh <- i } - pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn) + pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil) ) pool.setLimits(connLimit, uint64(connLimit)) @@ -130,7 +130,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { // close and restart pool pool.stop() - pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn) + pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil) pool.setLimits(connLimit, uint64(connLimit)) // try connecting all known peers (connLimit should be filled up) diff --git a/les/handler.go b/les/handler.go index f53a4722f..59bfd81cd 100644 --- a/les/handler.go +++ b/les/handler.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -118,6 +119,7 @@ type ProtocolManager struct { wg *sync.WaitGroup eventMux *event.TypeMux + logger *csvlogger.Logger // Callbacks synced func() bool @@ -165,8 +167,6 @@ func NewProtocolManager( if odr != nil { manager.retriever = odr.retriever manager.reqDist = odr.retriever.dist - } else { - manager.servingQueue = newServingQueue(int64(time.Millisecond * 10)) } if ulcConfig != nil { @@ -272,6 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Ignore maxPeers if this is a trusted peer // In server mode we try to check into the client pool after handshake if pm.client && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { + pm.logger.Event("Rejected (too many peers), " + p.id) return p2p.DiscTooManyPeers } // Reject light clients if server is not synced. @@ -290,6 +291,7 @@ func (pm *ProtocolManager) handle(p *peer) error { ) if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil { p.Log().Debug("Light Ethereum handshake failed", "err", err) + pm.logger.Event("Handshake error: " + err.Error() + ", " + p.id) return err } if p.fcClient != nil { @@ -303,9 +305,12 @@ func (pm *ProtocolManager) handle(p *peer) error { // Register the peer locally if err := pm.peers.Register(p); err != nil { p.Log().Error("Light Ethereum peer registration failed", "err", err) + pm.logger.Event("Peer registration error: " + err.Error() + ", " + p.id) return err } + pm.logger.Event("Connection established, " + p.id) defer func() { + pm.logger.Event("Closed connection, " + p.id) pm.removePeer(p.id) }() @@ -326,6 +331,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { + pm.logger.Event("Message handling error: " + err.Error() + ", " + p.id) p.Log().Debug("Light Ethereum message handling failed", "err", err) if p.fcServer != nil { p.fcServer.DumpLogs() @@ -358,23 +364,40 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ) accept := func(reqID, reqCnt, maxCnt uint64) bool { - if reqCnt == 0 { - return false + inSizeCost := func() uint64 { + if pm.server.costTracker != nil { + return pm.server.costTracker.realCost(0, msg.Size, 0) + } + return 0 } - if p.fcClient == nil || reqCnt > maxCnt { + if p.isFrozen() || reqCnt == 0 || p.fcClient == nil || reqCnt > maxCnt { + p.fcClient.OneTimeCost(inSizeCost()) return false } - maxCost = p.fcCosts.getCost(msg.Code, reqCnt) + maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt) + gf := float64(1) + if pm.server.costTracker != nil { + gf = pm.server.costTracker.globalFactor() + if gf < 0.001 { + p.Log().Error("Invalid global cost factor", "globalFactor", gf) + gf = 1 + } + } + maxTime := uint64(float64(maxCost) / gf) if accepted, bufShort, servingPriority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost); !accepted { - if bufShort > 0 { - p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge))) - } + p.freezeClient() + p.Log().Warn("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge))) + p.fcClient.OneTimeCost(inSizeCost()) return false } else { - task = pm.servingQueue.newTask(servingPriority) + task = pm.servingQueue.newTask(p, maxTime, servingPriority) + } + if task.start() { + return true } - return task.start() + p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost()) + return false } if msg.Size > ProtocolMaxMsgSize { @@ -388,6 +411,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.responseLock.Lock() defer p.responseLock.Unlock() + if p.isFrozen() { + amount = 0 + reply = nil + } var replySize uint32 if reply != nil { replySize = reply.size() @@ -395,7 +422,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { var realCost uint64 if pm.server.costTracker != nil { realCost = pm.server.costTracker.realCost(servingTime, msg.Size, replySize) - pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost) + if amount != 0 { + pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost) + } } else { realCost = maxCost } @@ -463,94 +492,94 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } query := req.Query - if !accept(req.ReqID, query.Amount, MaxHeaderFetch) { - return errResp(ErrRequestRejected, "") - } - go func() { - hashMode := query.Origin.Hash != (common.Hash{}) - first := true - maxNonCanonical := uint64(100) - - // Gather headers until the fetch or network limits is reached - var ( - bytes common.StorageSize - headers []*types.Header - unknown bool - ) - for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit { - if !first && !task.waitOrStop() { - return - } - // Retrieve the next header satisfying the query - var origin *types.Header - if hashMode { - if first { - origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) - if origin != nil { - query.Origin.Number = origin.Number.Uint64() + if accept(req.ReqID, query.Amount, MaxHeaderFetch) { + go func() { + hashMode := query.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit { + if !first && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return + } + // Retrieve the next header satisfying the query + var origin *types.Header + if hashMode { + if first { + origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) + if origin != nil { + query.Origin.Number = origin.Number.Uint64() + } + } else { + origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) } } else { - origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) + origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } - } else { - origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) - } - if origin == nil { - break - } - headers = append(headers, origin) - bytes += estHeaderRlpSize - - // Advance to the next header of the query - switch { - case hashMode && query.Reverse: - // Hash based traversal towards the genesis block - ancestor := query.Skip + 1 - if ancestor == 0 { - unknown = true - } else { - query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) - unknown = (query.Origin.Hash == common.Hash{}) + if origin == nil { + break } - case hashMode && !query.Reverse: - // Hash based traversal towards the leaf block - var ( - current = origin.Number.Uint64() - next = current + query.Skip + 1 - ) - if next <= current { - infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") - p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) - unknown = true - } else { - if header := pm.blockchain.GetHeaderByNumber(next); header != nil { - nextHash := header.Hash() - expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) - if expOldHash == query.Origin.Hash { - query.Origin.Hash, query.Origin.Number = nextHash, next + headers = append(headers, origin) + bytes += estHeaderRlpSize + + // Advance to the next header of the query + switch { + case hashMode && query.Reverse: + // Hash based traversal towards the genesis block + ancestor := query.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) + unknown = (query.Origin.Hash == common.Hash{}) + } + case hashMode && !query.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + query.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := pm.blockchain.GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) + if expOldHash == query.Origin.Hash { + query.Origin.Hash, query.Origin.Number = nextHash, next + } else { + unknown = true + } } else { unknown = true } + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= query.Skip + 1 } else { unknown = true } - } - case query.Reverse: - // Number based traversal towards the genesis block - if query.Origin.Number >= query.Skip+1 { - query.Origin.Number -= query.Skip + 1 - } else { - unknown = true - } - case !query.Reverse: - // Number based traversal towards the leaf block - query.Origin.Number += query.Skip + 1 + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += query.Skip + 1 + } + first = false } - first = false - } - sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done()) - }() + sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done()) + }() + } case BlockHeadersMsg: if pm.downloader == nil { @@ -592,27 +621,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bodies []rlp.RawValue ) reqCnt := len(req.Hashes) - if !accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) { - return errResp(ErrRequestRejected, "") - } - go func() { - for i, hash := range req.Hashes { - if i != 0 && !task.waitOrStop() { - return - } - if bytes >= softResponseLimit { - break - } - // Retrieve the requested block body, stopping if enough was found - if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil { - if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 { - bodies = append(bodies, data) - bytes += len(data) + if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) { + go func() { + for i, hash := range req.Hashes { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return + } + if bytes >= softResponseLimit { + break + } + // Retrieve the requested block body, stopping if enough was found + if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil { + if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) + } } } - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyBlockBodiesRLP(req.ReqID, bodies), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyBlockBodiesRLP(req.ReqID, bodies), task.done()) + }() + } case BlockBodiesMsg: if pm.odr == nil { @@ -651,45 +680,45 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { data [][]byte ) reqCnt := len(req.Reqs) - if !accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) { - return errResp(ErrRequestRejected, "") - } - go func() { - for i, req := range req.Reqs { - if i != 0 && !task.waitOrStop() { - return - } - // Look up the root hash belonging to the request - number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash) - if number == nil { - p.Log().Warn("Failed to retrieve block num for code", "hash", req.BHash) - continue - } - header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number) - if header == nil { - p.Log().Warn("Failed to retrieve header for code", "block", *number, "hash", req.BHash) - continue - } - triedb := pm.blockchain.StateCache().TrieDB() + if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) { + go func() { + for i, request := range req.Reqs { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return + } + // Look up the root hash belonging to the request + number := rawdb.ReadHeaderNumber(pm.chainDb, request.BHash) + if number == nil { + p.Log().Warn("Failed to retrieve block num for code", "hash", request.BHash) + continue + } + header := rawdb.ReadHeader(pm.chainDb, request.BHash, *number) + if header == nil { + p.Log().Warn("Failed to retrieve header for code", "block", *number, "hash", request.BHash) + continue + } + triedb := pm.blockchain.StateCache().TrieDB() - account, err := pm.getAccount(triedb, header.Root, common.BytesToHash(req.AccKey)) - if err != nil { - p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "err", err) - continue - } - code, err := triedb.Node(common.BytesToHash(account.CodeHash)) - if err != nil { - p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err) - continue - } - // Accumulate the code and abort if enough data was retrieved - data = append(data, code) - if bytes += len(code); bytes >= softResponseLimit { - break + account, err := pm.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey)) + if err != nil { + p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) + continue + } + code, err := triedb.Node(common.BytesToHash(account.CodeHash)) + if err != nil { + p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err) + continue + } + // Accumulate the code and abort if enough data was retrieved + data = append(data, code) + if bytes += len(code); bytes >= softResponseLimit { + break + } } - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyCode(req.ReqID, data), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyCode(req.ReqID, data), task.done()) + }() + } case CodeMsg: if pm.odr == nil { @@ -728,37 +757,37 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { receipts []rlp.RawValue ) reqCnt := len(req.Hashes) - if !accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) { - return errResp(ErrRequestRejected, "") - } - go func() { - for i, hash := range req.Hashes { - if i != 0 && !task.waitOrStop() { - return - } - if bytes >= softResponseLimit { - break - } - // Retrieve the requested block's receipts, skipping if unknown to us - var results types.Receipts - if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil { - results = rawdb.ReadRawReceipts(pm.chainDb, hash, *number) - } - if results == nil { - if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { - continue + if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) { + go func() { + for i, hash := range req.Hashes { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return + } + if bytes >= softResponseLimit { + break + } + // Retrieve the requested block's receipts, skipping if unknown to us + var results types.Receipts + if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil { + results = rawdb.ReadRawReceipts(pm.chainDb, hash, *number) + } + if results == nil { + if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) } } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(results); err != nil { - log.Error("Failed to encode receipt", "err", err) - } else { - receipts = append(receipts, encoded) - bytes += len(encoded) - } - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyReceiptsRLP(req.ReqID, receipts), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyReceiptsRLP(req.ReqID, receipts), task.done()) + }() + } case ReceiptsMsg: if pm.odr == nil { @@ -797,70 +826,70 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { root common.Hash ) reqCnt := len(req.Reqs) - if !accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) { - return errResp(ErrRequestRejected, "") - } - go func() { - nodes := light.NewNodeSet() - - for i, req := range req.Reqs { - if i != 0 && !task.waitOrStop() { - return - } - // Look up the root hash belonging to the request - var ( - number *uint64 - header *types.Header - trie state.Trie - ) - if req.BHash != lastBHash { - root, lastBHash = common.Hash{}, req.BHash - - if number = rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number == nil { - p.Log().Warn("Failed to retrieve block num for proof", "hash", req.BHash) - continue + if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) { + go func() { + nodes := light.NewNodeSet() + + for i, request := range req.Reqs { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return } - if header = rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header == nil { - p.Log().Warn("Failed to retrieve header for proof", "block", *number, "hash", req.BHash) - continue + // Look up the root hash belonging to the request + var ( + number *uint64 + header *types.Header + trie state.Trie + ) + if request.BHash != lastBHash { + root, lastBHash = common.Hash{}, request.BHash + + if number = rawdb.ReadHeaderNumber(pm.chainDb, request.BHash); number == nil { + p.Log().Warn("Failed to retrieve block num for proof", "hash", request.BHash) + continue + } + if header = rawdb.ReadHeader(pm.chainDb, request.BHash, *number); header == nil { + p.Log().Warn("Failed to retrieve header for proof", "block", *number, "hash", request.BHash) + continue + } + root = header.Root } - root = header.Root - } - // Open the account or storage trie for the request - statedb := pm.blockchain.StateCache() - - switch len(req.AccKey) { - case 0: - // No account key specified, open an account trie - trie, err = statedb.OpenTrie(root) - if trie == nil || err != nil { - p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err) - continue + // Open the account or storage trie for the request + statedb := pm.blockchain.StateCache() + + switch len(request.AccKey) { + case 0: + // No account key specified, open an account trie + trie, err = statedb.OpenTrie(root) + if trie == nil || err != nil { + p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err) + continue + } + default: + // Account key specified, open a storage trie + account, err := pm.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey)) + if err != nil { + p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) + continue + } + trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root) + if trie == nil || err != nil { + p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err) + continue + } } - default: - // Account key specified, open a storage trie - account, err := pm.getAccount(statedb.TrieDB(), root, common.BytesToHash(req.AccKey)) - if err != nil { - p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "err", err) + // Prove the user's request from the account or stroage trie + if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil { + p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err) continue } - trie, err = statedb.OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root) - if trie == nil || err != nil { - p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(req.AccKey), "root", account.Root, "err", err) - continue + if nodes.DataSize() >= softResponseLimit { + break } } - // Prove the user's request from the account or stroage trie - if err := trie.Prove(req.Key, req.FromLevel, nodes); err != nil { - p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err) - continue - } - if nodes.DataSize() >= softResponseLimit { - break - } - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofsV2(req.ReqID, nodes.NodeList()), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofsV2(req.ReqID, nodes.NodeList()), task.done()) + }() + } case ProofsV2Msg: if pm.odr == nil { @@ -899,53 +928,53 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { auxData [][]byte ) reqCnt := len(req.Reqs) - if !accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) { - return errResp(ErrRequestRejected, "") - } - go func() { - - var ( - lastIdx uint64 - lastType uint - root common.Hash - auxTrie *trie.Trie - ) - nodes := light.NewNodeSet() - for i, req := range req.Reqs { - if i != 0 && !task.waitOrStop() { - return - } - if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx { - auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx + if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) { + go func() { - var prefix string - if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) { - auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(pm.chainDb, prefix))) - } - } - if req.AuxReq == auxRoot { - var data []byte - if root != (common.Hash{}) { - data = root[:] + var ( + lastIdx uint64 + lastType uint + root common.Hash + auxTrie *trie.Trie + ) + nodes := light.NewNodeSet() + for i, request := range req.Reqs { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return } - auxData = append(auxData, data) - auxBytes += len(data) - } else { - if auxTrie != nil { - auxTrie.Prove(req.Key, req.FromLevel, nodes) + if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx { + auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx + + var prefix string + if root, prefix = pm.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) { + auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(pm.chainDb, prefix))) + } } - if req.AuxReq != 0 { - data := pm.getHelperTrieAuxData(req) + if request.AuxReq == auxRoot { + var data []byte + if root != (common.Hash{}) { + data = root[:] + } auxData = append(auxData, data) auxBytes += len(data) + } else { + if auxTrie != nil { + auxTrie.Prove(request.Key, request.FromLevel, nodes) + } + if request.AuxReq != 0 { + data := pm.getHelperTrieAuxData(request) + auxData = append(auxData, data) + auxBytes += len(data) + } + } + if nodes.DataSize()+auxBytes >= softResponseLimit { + break } } - if nodes.DataSize()+auxBytes >= softResponseLimit { - break - } - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}), task.done()) + }() + } case HelperTrieProofsMsg: if pm.odr == nil { @@ -981,27 +1010,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } reqCnt := len(req.Txs) - if !accept(req.ReqID, uint64(reqCnt), MaxTxSend) { - return errResp(ErrRequestRejected, "") - } - go func() { - stats := make([]light.TxStatus, len(req.Txs)) - for i, tx := range req.Txs { - if i != 0 && !task.waitOrStop() { - return - } - hash := tx.Hash() - stats[i] = pm.txStatus(hash) - if stats[i].Status == core.TxStatusUnknown { - if errs := pm.txpool.AddRemotes([]*types.Transaction{tx}); errs[0] != nil { - stats[i].Error = errs[0].Error() - continue + if accept(req.ReqID, uint64(reqCnt), MaxTxSend) { + go func() { + stats := make([]light.TxStatus, len(req.Txs)) + for i, tx := range req.Txs { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return } + hash := tx.Hash() stats[i] = pm.txStatus(hash) + if stats[i].Status == core.TxStatusUnknown { + if errs := pm.txpool.AddRemotes([]*types.Transaction{tx}); errs[0] != nil { + stats[i].Error = errs[0].Error() + continue + } + stats[i] = pm.txStatus(hash) + } } - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done()) + }() + } case GetTxStatusMsg: if pm.txpool == nil { @@ -1016,19 +1045,19 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } reqCnt := len(req.Hashes) - if !accept(req.ReqID, uint64(reqCnt), MaxTxStatus) { - return errResp(ErrRequestRejected, "") - } - go func() { - stats := make([]light.TxStatus, len(req.Hashes)) - for i, hash := range req.Hashes { - if i != 0 && !task.waitOrStop() { - return + if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) { + go func() { + stats := make([]light.TxStatus, len(req.Hashes)) + for i, hash := range req.Hashes { + if i != 0 && !task.waitOrStop() { + sendResponse(req.ReqID, 0, nil, task.servingTime) + return + } + stats[i] = pm.txStatus(hash) } - stats[i] = pm.txStatus(hash) - } - sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done()) - }() + sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done()) + }() + } case TxStatusMsg: if pm.odr == nil { @@ -1053,6 +1082,26 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { Obj: resp.Status, } + case StopMsg: + if pm.odr == nil { + return errResp(ErrUnexpectedResponse, "") + } + p.freezeServer(true) + pm.retriever.frozen(p) + p.Log().Warn("Service stopped") + + case ResumeMsg: + if pm.odr == nil { + return errResp(ErrUnexpectedResponse, "") + } + var bv uint64 + if err := msg.Decode(&bv); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + p.fcServer.ResumeFreeze(bv) + p.freezeServer(false) + p.Log().Warn("Service resumed") + default: p.Log().Trace("Received unknown message", "code", msg.Code) return errResp(ErrInvalidMsgCode, "%v", msg.Code) diff --git a/les/handler_test.go b/les/handler_test.go index d8051c145..6e24165cf 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" @@ -438,7 +439,7 @@ func TestTransactionStatusLes2(t *testing.T) { config.Journal = "" txpool := core.NewTxPool(config, params.TestChainConfig, chain) pm.txpool = txpool - peer, _ := newTestPeer(t, "peer", 2, pm, true) + peer, _ := newTestPeer(t, "peer", 2, pm, true, 0) defer peer.close() var reqID uint64 @@ -519,3 +520,51 @@ func TestTransactionStatusLes2(t *testing.T) { test(tx1, false, light.TxStatus{Status: core.TxStatusPending}) test(tx2, false, light.TxStatus{Status: core.TxStatusPending}) } + +func TestStopResumeLes3(t *testing.T) { + db := rawdb.NewMemoryDatabase() + clock := &mclock.Simulated{} + testCost := testBufLimit / 10 + pm, err := newTestProtocolManager(false, 0, nil, nil, nil, db, nil, testCost, clock) + if err != nil { + t.Fatalf("Failed to create protocol manager: %v", err) + } + peer, _ := newTestPeer(t, "peer", 3, pm, true, testCost) + defer peer.close() + + expBuf := testBufLimit + var reqID uint64 + + req := func() { + reqID++ + sendRequest(peer.app, GetBlockHeadersMsg, reqID, testCost, &getBlockHeadersData{Origin: hashOrNumber{Hash: common.Hash{1}}, Amount: 1}) + } + + for i := 1; i <= 5; i++ { + // send requests while we still have enough buffer and expect a response + for expBuf >= testCost { + req() + expBuf -= testCost + if err := expectResponse(peer.app, BlockHeadersMsg, reqID, expBuf, nil); err != nil { + t.Errorf("expected response and failed: %v", err) + } + } + // send some more requests in excess and expect a single StopMsg + c := i + for c > 0 { + req() + c-- + } + if err := p2p.ExpectMsg(peer.app, StopMsg, nil); err != nil { + t.Errorf("expected StopMsg and failed: %v", err) + } + // wait until the buffer is recharged by half of the limit + wait := testBufLimit / testBufRecharge / 2 + clock.Run(time.Millisecond * time.Duration(wait)) + // expect a ResumeMsg with the partially recharged buffer value + expBuf += testBufRecharge * wait + if err := p2p.ExpectMsg(peer.app, ResumeMsg, expBuf); err != nil { + t.Errorf("expected ResumeMsg and failed: %v", err) + } + } +} diff --git a/les/helper_test.go b/les/helper_test.go index 878e44404..dbb081344 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -62,7 +62,8 @@ var ( testEventEmitterCode = common.Hex2Bytes("60606040523415600e57600080fd5b7f57050ab73f6b9ebdd9f76b8d4997793f48cf956e965ee070551b9ca0bb71584e60405160405180910390a160358060476000396000f3006060604052600080fd00a165627a7a723058203f727efcad8b5811f8cb1fc2620ce5e8c63570d697aef968172de296ea3994140029") testEventEmitterAddr common.Address - testBufLimit = uint64(100) + testBufLimit = uint64(1000000) + testBufRecharge = uint64(1000) ) /* @@ -138,7 +139,7 @@ func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.Indexe // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, potential notification // channels for different events and relative chain indexers array. -func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig) (*ProtocolManager, error) { +func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig, testCost uint64, clock mclock.Clock) (*ProtocolManager, error) { var ( evmux = new(event.TypeMux) engine = ethash.NewFaker() @@ -177,14 +178,15 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor if !lightSync { srv := &LesServer{lesCommons: lesCommons{protocolManager: pm}} pm.server = srv + pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1, nil) pm.servingQueue.setThreads(4) srv.defParams = flowcontrol.ServerParams{ BufLimit: testBufLimit, - MinRecharge: 1, + MinRecharge: testBufRecharge, } - - srv.fcManager = flowcontrol.NewClientManager(nil, &mclock.System{}) + srv.testCost = testCost + srv.fcManager = flowcontrol.NewClientManager(nil, clock) } pm.Start(1000) return pm, nil @@ -195,7 +197,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor // channels for different events and relative chain indexers array. In case of an error, the constructor force- // fails the test. func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig) *ProtocolManager { - pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db, ulcConfig) + pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db, ulcConfig, 0, &mclock.System{}) if err != nil { t.Fatalf("Failed to create protocol manager: %v", err) } @@ -210,7 +212,7 @@ type testPeer struct { } // newTestPeer creates a new peer registered at the given protocol manager. -func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, shake bool) (*testPeer, <-chan error) { +func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, shake bool, testCost uint64) (*testPeer, <-chan error) { // Create a message pipe to communicate through app, net := p2p.MsgPipe() @@ -242,7 +244,7 @@ func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, sh head = pm.blockchain.CurrentHeader() td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) ) - tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash()) + tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), testCost) } return tp, errc } @@ -282,7 +284,7 @@ func newTestPeerPair(name string, version int, pm, pm2 *ProtocolManager) (*peer, // handshake simulates a trivial handshake that expects the same state from the // remote side as we are simulating locally. -func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash) { +func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, testCost uint64) { var expList keyValueList expList = expList.add("protocolVersion", uint64(p.version)) expList = expList.add("networkId", uint64(NetworkId)) @@ -295,10 +297,11 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu expList = expList.add("serveHeaders", nil) expList = expList.add("serveChainSince", uint64(0)) expList = expList.add("serveStateSince", uint64(0)) + expList = expList.add("serveRecentState", uint64(core.TriesInMemory-4)) expList = expList.add("txRelay", nil) expList = expList.add("flowControl/BL", testBufLimit) - expList = expList.add("flowControl/MRR", uint64(1)) - expList = expList.add("flowControl/MRC", testCostList()) + expList = expList.add("flowControl/MRR", testBufRecharge) + expList = expList.add("flowControl/MRC", testCostList(testCost)) if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil { t.Fatalf("status recv: %v", err) @@ -309,7 +312,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu p.fcParams = flowcontrol.ServerParams{ BufLimit: testBufLimit, - MinRecharge: 1, + MinRecharge: testBufRecharge, } } @@ -338,7 +341,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*cor cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig) pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db, nil) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) + peer, _ := newTestPeer(t, "peer", protocol, pm, true, 0) cIndexer.Start(pm.blockchain.(*core.BlockChain)) bIndexer.Start(pm.blockchain.(*core.BlockChain)) diff --git a/les/peer.go b/les/peer.go index 42c13ab7d..6792d0611 100644 --- a/les/peer.go +++ b/les/peer.go @@ -20,11 +20,14 @@ import ( "errors" "fmt" "math/big" + "math/rand" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" @@ -47,10 +50,16 @@ const ( allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance ) +const ( + freezeTimeBase = time.Millisecond * 700 // fixed component of client freeze time + freezeTimeRandom = time.Millisecond * 600 // random component of client freeze time + freezeCheckPeriod = time.Millisecond * 100 // buffer value recheck period after initial freeze time has elapsed +) + // if the total encoded size of a sent transaction batch is over txSizeCostLimit // per transaction then the request cost is calculated as proportional to the // encoded size instead of the transaction count -const txSizeCostLimit = 0x10000 +const txSizeCostLimit = 0x4000 const ( announceTypeNone = iota @@ -86,14 +95,17 @@ type peer struct { responseErrors int updateCounter uint64 updateTime mclock.AbsTime + frozen uint32 // 1 if client is in frozen state fcClient *flowcontrol.ClientNode // nil if the peer is server only fcServer *flowcontrol.ServerNode // nil if the peer is client only fcParams flowcontrol.ServerParams fcCosts requestCostTable - isTrusted bool - isOnlyAnnounce bool + isTrusted bool + isOnlyAnnounce bool + chainSince, chainRecent uint64 + stateSince, stateRecent uint64 } func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -129,8 +141,59 @@ func (p *peer) rejectUpdate(size uint64) bool { return p.updateCounter > allowedUpdateBytes } +// freezeClient temporarily puts the client in a frozen state which means all +// unprocessed and subsequent requests are dropped. Unfreezing happens automatically +// after a short time if the client's buffer value is at least in the slightly positive +// region. The client is also notified about being frozen/unfrozen with a Stop/Resume +// message. +func (p *peer) freezeClient() { + if p.version < lpv3 { + // if Stop/Resume is not supported then just drop the peer after setting + // its frozen status permanently + atomic.StoreUint32(&p.frozen, 1) + p.Peer.Disconnect(p2p.DiscUselessPeer) + return + } + if atomic.SwapUint32(&p.frozen, 1) == 0 { + go func() { + p.SendStop() + time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom)))) + for { + bufValue, bufLimit := p.fcClient.BufferStatus() + if bufLimit == 0 { + return + } + if bufValue <= bufLimit/8 { + time.Sleep(freezeCheckPeriod) + } else { + atomic.StoreUint32(&p.frozen, 0) + p.SendResume(bufValue) + break + } + } + }() + } +} + +// freezeServer processes Stop/Resume messages from the given server +func (p *peer) freezeServer(frozen bool) { + var f uint32 + if frozen { + f = 1 + } + if atomic.SwapUint32(&p.frozen, f) != f && frozen { + p.sendQueue.clear() + } +} + +// isFrozen returns true if the client is frozen or the server has put our +// client in frozen state +func (p *peer) isFrozen() bool { + return atomic.LoadUint32(&p.frozen) != 0 +} + func (p *peer) canQueue() bool { - return p.sendQueue.canQueue() + return p.sendQueue.canQueue() && !p.isFrozen() } func (p *peer) queueSend(f func()) { @@ -265,10 +328,21 @@ func (p *peer) GetTxRelayCost(amount, size int) uint64 { // HasBlock checks if the peer has a given block func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { + var head, since, recent uint64 p.lock.RLock() + if p.headInfo != nil { + head = p.headInfo.Number + } + if hasState { + since = p.stateSince + recent = p.stateRecent + } else { + since = p.chainSince + recent = p.chainRecent + } hasBlock := p.hasBlock p.lock.RUnlock() - return hasBlock != nil && hasBlock(hash, number, hasState) + return head >= number && number >= since && (recent == 0 || number+recent+4 > head) && hasBlock != nil && hasBlock(hash, number, hasState) } // SendAnnounce announces the availability of a number of blocks through @@ -277,6 +351,16 @@ func (p *peer) SendAnnounce(request announceData) error { return p2p.Send(p.rw, AnnounceMsg, request) } +// SendStop notifies the client about being in frozen state +func (p *peer) SendStop() error { + return p2p.Send(p.rw, StopMsg, struct{}{}) +} + +// SendResume notifies the client about getting out of frozen state +func (p *peer) SendResume(bv uint64) error { + return p2p.Send(p.rw, ResumeMsg, bv) +} + // ReplyBlockHeaders creates a reply with a batch of block headers func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply { data, _ := rlp.EncodeToBytes(headers) @@ -464,19 +548,19 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis send = send.add("genesisHash", genesis) if server != nil { if !server.onlyAnnounce { - //only announce server. It sends only announse requests send = send.add("serveHeaders", nil) send = send.add("serveChainSince", uint64(0)) send = send.add("serveStateSince", uint64(0)) + send = send.add("serveRecentState", uint64(core.TriesInMemory-4)) send = send.add("txRelay", nil) } send = send.add("flowControl/BL", server.defParams.BufLimit) send = send.add("flowControl/MRR", server.defParams.MinRecharge) var costList RequestCostList if server.costTracker != nil { - costList = server.costTracker.makeCostList() + costList = server.costTracker.makeCostList(server.costTracker.globalFactor()) } else { - costList = testCostList() + costList = testCostList(server.testCost) } send = send.add("flowControl/MRC", costList) p.fcCosts = costList.decode(ProtocolLengths[uint(p.version)]) @@ -544,12 +628,18 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) } else { //mark OnlyAnnounce server if "serveHeaders", "serveChainSince", "serveStateSince" or "txRelay" fields don't exist - if recv.get("serveChainSince", nil) != nil { + if recv.get("serveChainSince", &p.chainSince) != nil { p.isOnlyAnnounce = true } - if recv.get("serveStateSince", nil) != nil { + if recv.get("serveRecentChain", &p.chainRecent) != nil { + p.chainRecent = 0 + } + if recv.get("serveStateSince", &p.stateSince) != nil { p.isOnlyAnnounce = true } + if recv.get("serveRecentState", &p.stateRecent) != nil { + p.stateRecent = 0 + } if recv.get("txRelay", nil) != nil { p.isOnlyAnnounce = true } diff --git a/les/peer_test.go b/les/peer_test.go index 62adf90fe..d5ce16694 100644 --- a/les/peer_test.go +++ b/les/peer_test.go @@ -54,7 +54,7 @@ func TestPeerHandshakeSetAnnounceTypeToAnnounceTypeSignedForTrustedPeer(t *testi l = l.add("txRelay", nil) l = l.add("flowControl/BL", uint64(0)) l = l.add("flowControl/MRR", uint64(0)) - l = l.add("flowControl/MRC", testCostList()) + l = l.add("flowControl/MRC", testCostList(0)) return l }, @@ -99,7 +99,7 @@ func TestPeerHandshakeAnnounceTypeSignedForTrustedPeersPeerNotInTrusted(t *testi l = l.add("txRelay", nil) l = l.add("flowControl/BL", uint64(0)) l = l.add("flowControl/MRR", uint64(0)) - l = l.add("flowControl/MRC", testCostList()) + l = l.add("flowControl/MRC", testCostList(0)) return l }, diff --git a/les/protocol.go b/les/protocol.go index 24dc9bdea..ebf581473 100644 --- a/les/protocol.go +++ b/les/protocol.go @@ -32,17 +32,18 @@ import ( // Constants to match up protocol versions and messages const ( lpv2 = 2 + lpv3 = 3 ) // Supported versions of the les protocol (first is primary) var ( - ClientProtocolVersions = []uint{lpv2} - ServerProtocolVersions = []uint{lpv2} + ClientProtocolVersions = []uint{lpv2, lpv3} + ServerProtocolVersions = []uint{lpv2, lpv3} AdvertiseProtocolVersions = []uint{lpv2} // clients are searching for the first advertised protocol in the list ) // Number of implemented message corresponding to different protocol versions. -var ProtocolLengths = map[uint]uint64{lpv2: 22} +var ProtocolLengths = map[uint]uint64{lpv2: 22, lpv3: 24} const ( NetworkId = 1 @@ -70,6 +71,9 @@ const ( SendTxV2Msg = 0x13 GetTxStatusMsg = 0x14 TxStatusMsg = 0x15 + // Protocol messages introduced in LPV3 + StopMsg = 0x16 + ResumeMsg = 0x17 ) type requestInfo struct { diff --git a/les/retrieve.go b/les/retrieve.go index dd9d14598..d17a02e1a 100644 --- a/les/retrieve.go +++ b/les/retrieve.go @@ -78,8 +78,8 @@ type sentReq struct { // after which delivered is set to true, the validity of the response is sent on the // valid channel and no more responses are accepted. type sentReqToPeer struct { - delivered bool - valid chan bool + delivered, frozen bool + event chan int } // reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the @@ -95,6 +95,7 @@ const ( rpHardTimeout rpDeliveredValid rpDeliveredInvalid + rpNotDelivered ) // newRetrieveManager creates the retrieve manager @@ -149,7 +150,7 @@ func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc req.request = func(p distPeer) func() { // before actually sending the request, put an entry into the sentTo map r.lock.Lock() - r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)} + r.sentTo[p] = sentReqToPeer{delivered: false, frozen: false, event: make(chan int, 1)} r.lock.Unlock() return request(p) } @@ -173,6 +174,17 @@ func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error { return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) } +// frozen is called by the LES protocol manager when a server has suspended its service and we +// should not expect an answer for the requests already sent there +func (rm *retrieveManager) frozen(peer distPeer) { + rm.lock.RLock() + defer rm.lock.RUnlock() + + for _, req := range rm.sentReqs { + req.frozen(peer) + } +} + // reqStateFn represents a state of the retrieve loop state machine type reqStateFn func() reqStateFn @@ -215,7 +227,7 @@ func (r *sentReq) stateRequesting() reqStateFn { go r.tryRequest() r.lastReqQueued = true return r.stateRequesting - case rpDeliveredInvalid: + case rpDeliveredInvalid, rpNotDelivered: // if it was the last sent request (set to nil by update) then start a new one if !r.lastReqQueued && r.lastReqSentTo == nil { go r.tryRequest() @@ -277,7 +289,7 @@ func (r *sentReq) update(ev reqPeerEvent) { r.reqSrtoCount++ case rpHardTimeout: r.reqSrtoCount-- - case rpDeliveredValid, rpDeliveredInvalid: + case rpDeliveredValid, rpDeliveredInvalid, rpNotDelivered: if ev.peer == r.lastReqSentTo { r.lastReqSentTo = nil } else { @@ -343,12 +355,13 @@ func (r *sentReq) tryRequest() { }() select { - case ok := <-s.valid: - if ok { - r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} - } else { - r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} + case event := <-s.event: + if event == rpNotDelivered { + r.lock.Lock() + delete(r.sentTo, p) + r.lock.Unlock() } + r.eventsCh <- reqPeerEvent{event, p} return case <-time.After(softRequestTimeout): srto = true @@ -356,12 +369,13 @@ func (r *sentReq) tryRequest() { } select { - case ok := <-s.valid: - if ok { - r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} - } else { - r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} + case event := <-s.event: + if event == rpNotDelivered { + r.lock.Lock() + delete(r.sentTo, p) + r.lock.Unlock() } + r.eventsCh <- reqPeerEvent{event, p} case <-time.After(hardRequestTimeout): hrto = true r.eventsCh <- reqPeerEvent{rpHardTimeout, p} @@ -377,15 +391,37 @@ func (r *sentReq) deliver(peer distPeer, msg *Msg) error { if !ok || s.delivered { return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) } + if s.frozen { + return nil + } valid := r.validate(peer, msg) == nil - r.sentTo[peer] = sentReqToPeer{true, s.valid} - s.valid <- valid + r.sentTo[peer] = sentReqToPeer{delivered: true, frozen: false, event: s.event} + if valid { + s.event <- rpDeliveredValid + } else { + s.event <- rpDeliveredInvalid + } if !valid { return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID) } return nil } +// frozen sends a "not delivered" event to the peer event channel belonging to the +// given peer if the request has been sent there, causing the state machine to not +// expect an answer and potentially even send the request to the same peer again +// when canSend allows it. +func (r *sentReq) frozen(peer distPeer) { + r.lock.Lock() + defer r.lock.Unlock() + + s, ok := r.sentTo[peer] + if ok && !s.delivered && !s.frozen { + r.sentTo[peer] = sentReqToPeer{delivered: false, frozen: true, event: s.event} + s.event <- rpNotDelivered + } +} + // stop stops the retrieval process and sets an error code that will be returned // by getError func (r *sentReq) stop(err error) { diff --git a/les/server.go b/les/server.go index 6b93b846d..836fa0d55 100644 --- a/les/server.go +++ b/les/server.go @@ -19,6 +19,7 @@ package les import ( "crypto/ecdsa" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" @@ -26,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" @@ -37,26 +39,43 @@ import ( const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio +const ( + logFileName = "" // csv log file name (disabled if empty) + logClientPoolMetrics = true // log client pool metrics + logClientPoolEvents = false // detailed client pool event logging + logRequestServing = true // log request serving metrics and events + logBlockProcEvents = true // log block processing events + logProtocolHandler = true // log protocol handler events +) + type LesServer struct { lesCommons fcManager *flowcontrol.ClientManager // nil if our node is client only costTracker *costTracker + testCost uint64 defParams flowcontrol.ServerParams lesTopics []discv5.Topic privateKey *ecdsa.PrivateKey quitSync chan struct{} onlyAnnounce bool + csvLogger *csvlogger.Logger + logTotalCap *csvlogger.Channel thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode - maxPeers int - freeClientCap uint64 - freeClientPool *freeClientPool - priorityClientPool *priorityClientPool + maxPeers int + minCapacity, freeClientCap uint64 + freeClientPool *freeClientPool + priorityClientPool *priorityClientPool } func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { + var csvLogger *csvlogger.Logger + if logFileName != "" { + csvLogger = csvlogger.NewLogger(logFileName, time.Second*10, "event, peerId") + } + quitSync := make(chan struct{}) pm, err := NewProtocolManager( eth.BlockChain().Config(), @@ -78,6 +97,14 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { if err != nil { return nil, err } + if logProtocolHandler { + pm.logger = csvLogger + } + requestLogger := csvLogger + if !logRequestServing { + requestLogger = nil + } + pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100, requestLogger) lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions)) for i, pv := range AdvertiseProtocolVersions { @@ -93,11 +120,13 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), protocolManager: pm, }, - costTracker: newCostTracker(eth.ChainDb(), config), quitSync: quitSync, lesTopics: lesTopics, onlyAnnounce: config.OnlyAnnounce, + csvLogger: csvLogger, + logTotalCap: requestLogger.NewChannel("totalCapacity", 0.01), } + srv.costTracker, srv.minCapacity = newCostTracker(eth.ChainDb(), config, requestLogger) logger := log.New() pm.server = srv @@ -144,7 +173,11 @@ func (s *LesServer) APIs() []rpc.API { func (s *LesServer) startEventLoop() { s.protocolManager.wg.Add(1) - var processing bool + blockProcLogger := s.csvLogger + if !logBlockProcEvents { + blockProcLogger = nil + } + var processing, procLast bool blockProcFeed := make(chan bool, 100) s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed) totalRechargeCh := make(chan uint64, 100) @@ -152,17 +185,25 @@ func (s *LesServer) startEventLoop() { totalCapacityCh := make(chan uint64, 100) updateRecharge := func() { if processing { + if !procLast { + blockProcLogger.Event("block processing started") + } s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing) s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}}) } else { + if procLast { + blockProcLogger.Event("block processing finished") + } s.protocolManager.servingQueue.setThreads(s.thcNormal) - s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 10, totalRecharge}, {totalRecharge, totalRecharge}}) + s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}}) } + procLast = processing } updateRecharge() totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh) s.priorityClientPool.setLimits(s.maxPeers, totalCapacity) + var maxFreePeers uint64 go func() { for { select { @@ -171,6 +212,12 @@ func (s *LesServer) startEventLoop() { case totalRecharge = <-totalRechargeCh: updateRecharge() case totalCapacity = <-totalCapacityCh: + s.logTotalCap.Update(float64(totalCapacity)) + newFreePeers := totalCapacity / s.freeClientCap + if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) { + log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers) + } + maxFreePeers = newFreePeers s.priorityClientPool.setLimits(s.maxPeers, totalCapacity) case <-s.protocolManager.quitSync: s.protocolManager.wg.Done() @@ -189,9 +236,9 @@ func (s *LesServer) Start(srvr *p2p.Server) { s.maxPeers = s.config.LightPeers totalRecharge := s.costTracker.totalRecharge() if s.maxPeers > 0 { - s.freeClientCap = minCapacity //totalRecharge / uint64(s.maxPeers) - if s.freeClientCap < minCapacity { - s.freeClientCap = minCapacity + s.freeClientCap = s.minCapacity //totalRecharge / uint64(s.maxPeers) + if s.freeClientCap < s.minCapacity { + s.freeClientCap = s.minCapacity } if s.freeClientCap > 0 { s.defParams = flowcontrol.ServerParams{ @@ -200,15 +247,25 @@ func (s *LesServer) Start(srvr *p2p.Server) { } } } - freePeers := int(totalRecharge / s.freeClientCap) - if freePeers < s.maxPeers { - log.Warn("Light peer count limited", "specified", s.maxPeers, "allowed", freePeers) - } - s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }) - s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool) + maxCapacity := s.freeClientCap * uint64(s.maxPeers) + if totalRecharge > maxCapacity { + maxCapacity = totalRecharge + } + s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2) + poolMetricsLogger := s.csvLogger + if !logClientPoolMetrics { + poolMetricsLogger = nil + } + poolEventLogger := s.csvLogger + if !logClientPoolEvents { + poolEventLogger = nil + } + s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }, poolMetricsLogger, poolEventLogger) + s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool, poolMetricsLogger, poolEventLogger) s.protocolManager.peers.notify(s.priorityClientPool) + s.csvLogger.Start() s.startEventLoop() s.protocolManager.Start(s.config.LightPeers) if srvr.DiscV5 != nil { @@ -233,6 +290,7 @@ func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) { // Stop stops the LES service func (s *LesServer) Stop() { + s.fcManager.Stop() s.chtIndexer.Close() // bloom trie indexer is closed by parent bloombits indexer go func() { @@ -241,6 +299,7 @@ func (s *LesServer) Stop() { s.freeClientPool.stop() s.costTracker.stop() s.protocolManager.Stop() + s.csvLogger.Stop() } // todo(rjl493456442) separate client and server implementation. diff --git a/les/servingqueue.go b/les/servingqueue.go index 2438fdfe3..26656ec01 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -17,16 +17,24 @@ package les import ( + "fmt" + "sort" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/les/csvlogger" ) // servingQueue allows running tasks in a limited number of threads and puts the // waiting tasks in a priority queue type servingQueue struct { - tokenCh chan runToken + recentTime, queuedTime, servingTimeDiff uint64 + burstLimit, burstDropLimit uint64 + burstDecRate float64 + lastUpdate mclock.AbsTime + queueAddCh, queueBestCh chan *servingTask stopThreadCh, quit chan struct{} setThreadsCh chan int @@ -36,6 +44,10 @@ type servingQueue struct { queue *prque.Prque // priority queue for waiting or suspended tasks best *servingTask // the highest priority task (not included in the queue) suspendBias int64 // priority bias against suspending an already running task + + logger *csvlogger.Logger + logRecentTime *csvlogger.Channel + logQueuedTime *csvlogger.Channel } // servingTask represents a request serving task. Tasks can be implemented to @@ -47,12 +59,13 @@ type servingQueue struct { // - run: execute a single step; return true if finished // - after: executed after run finishes or returns an error, receives the total serving time type servingTask struct { - sq *servingQueue - servingTime uint64 - priority int64 - biasAdded bool - token runToken - tokenCh chan runToken + sq *servingQueue + servingTime, timeAdded, maxTime, expTime uint64 + peer *peer + priority int64 + biasAdded bool + token runToken + tokenCh chan runToken } // runToken received by servingTask.start allows the task to run. Closing the @@ -63,20 +76,19 @@ type runToken chan struct{} // start blocks until the task can start and returns true if it is allowed to run. // Returning false means that the task should be cancelled. func (t *servingTask) start() bool { + if t.peer.isFrozen() { + return false + } + t.tokenCh = make(chan runToken, 1) select { - case t.token = <-t.sq.tokenCh: - default: - t.tokenCh = make(chan runToken, 1) - select { - case t.sq.queueAddCh <- t: - case <-t.sq.quit: - return false - } - select { - case t.token = <-t.tokenCh: - case <-t.sq.quit: - return false - } + case t.sq.queueAddCh <- t: + case <-t.sq.quit: + return false + } + select { + case t.token = <-t.tokenCh: + case <-t.sq.quit: + return false } if t.token == nil { return false @@ -90,6 +102,14 @@ func (t *servingTask) start() bool { func (t *servingTask) done() uint64 { t.servingTime += uint64(mclock.Now()) close(t.token) + diff := t.servingTime - t.timeAdded + t.timeAdded = t.servingTime + if t.expTime > diff { + t.expTime -= diff + atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime) + } else { + t.expTime = 0 + } return t.servingTime } @@ -107,16 +127,22 @@ func (t *servingTask) waitOrStop() bool { } // newServingQueue returns a new servingQueue -func newServingQueue(suspendBias int64) *servingQueue { +func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue { sq := &servingQueue{ - queue: prque.New(nil), - suspendBias: suspendBias, - tokenCh: make(chan runToken), - queueAddCh: make(chan *servingTask, 100), - queueBestCh: make(chan *servingTask), - stopThreadCh: make(chan struct{}), - quit: make(chan struct{}), - setThreadsCh: make(chan int, 10), + queue: prque.New(nil), + suspendBias: suspendBias, + queueAddCh: make(chan *servingTask, 100), + queueBestCh: make(chan *servingTask), + stopThreadCh: make(chan struct{}), + quit: make(chan struct{}), + setThreadsCh: make(chan int, 10), + burstLimit: uint64(utilTarget * bufLimitRatio * 1200000), + burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000), + burstDecRate: utilTarget, + lastUpdate: mclock.Now(), + logger: logger, + logRecentTime: logger.NewMinMaxChannel("recentTime", false), + logQueuedTime: logger.NewMinMaxChannel("queuedTime", false), } sq.wg.Add(2) go sq.queueLoop() @@ -125,9 +151,12 @@ func newServingQueue(suspendBias int64) *servingQueue { } // newTask creates a new task with the given priority -func (sq *servingQueue) newTask(priority int64) *servingTask { +func (sq *servingQueue) newTask(peer *peer, maxTime uint64, priority int64) *servingTask { return &servingTask{ sq: sq, + peer: peer, + maxTime: maxTime, + expTime: maxTime, priority: priority, } } @@ -144,18 +173,12 @@ func (sq *servingQueue) threadController() { select { case best := <-sq.queueBestCh: best.tokenCh <- token - default: - select { - case best := <-sq.queueBestCh: - best.tokenCh <- token - case sq.tokenCh <- token: - case <-sq.stopThreadCh: - sq.wg.Done() - return - case <-sq.quit: - sq.wg.Done() - return - } + case <-sq.stopThreadCh: + sq.wg.Done() + return + case <-sq.quit: + sq.wg.Done() + return } <-token select { @@ -170,6 +193,100 @@ func (sq *servingQueue) threadController() { } } +type ( + // peerTasks lists the tasks received from a given peer when selecting peers to freeze + peerTasks struct { + peer *peer + list []*servingTask + sumTime uint64 + priority float64 + } + // peerList is a sortable list of peerTasks + peerList []*peerTasks +) + +func (l peerList) Len() int { + return len(l) +} + +func (l peerList) Less(i, j int) bool { + return l[i].priority < l[j].priority +} + +func (l peerList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + +// freezePeers selects the peers with the worst priority queued tasks and freezes +// them until burstTime goes under burstDropLimit or all peers are frozen +func (sq *servingQueue) freezePeers() { + peerMap := make(map[*peer]*peerTasks) + var peerList peerList + if sq.best != nil { + sq.queue.Push(sq.best, sq.best.priority) + } + sq.best = nil + for sq.queue.Size() > 0 { + task := sq.queue.PopItem().(*servingTask) + tasks := peerMap[task.peer] + if tasks == nil { + bufValue, bufLimit := task.peer.fcClient.BufferStatus() + if bufLimit < 1 { + bufLimit = 1 + } + tasks = &peerTasks{ + peer: task.peer, + priority: float64(bufValue) / float64(bufLimit), // lower value comes first + } + peerMap[task.peer] = tasks + peerList = append(peerList, tasks) + } + tasks.list = append(tasks.list, task) + tasks.sumTime += task.expTime + } + sort.Sort(peerList) + drop := true + sq.logger.Event("freezing peers") + for _, tasks := range peerList { + if drop { + tasks.peer.freezeClient() + tasks.peer.fcClient.Freeze() + sq.queuedTime -= tasks.sumTime + if sq.logQueuedTime != nil { + sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) + } + sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id)) + drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit + for _, task := range tasks.list { + task.tokenCh <- nil + } + } else { + for _, task := range tasks.list { + sq.queue.Push(task, task.priority) + } + } + } + if sq.queue.Size() > 0 { + sq.best = sq.queue.PopItem().(*servingTask) + } +} + +// updateRecentTime recalculates the recent serving time value +func (sq *servingQueue) updateRecentTime() { + subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0) + now := mclock.Now() + dt := now - sq.lastUpdate + sq.lastUpdate = now + if dt > 0 { + subTime += uint64(float64(dt) * sq.burstDecRate) + } + if sq.recentTime > subTime { + sq.recentTime -= subTime + } else { + sq.recentTime = 0 + } +} + // addTask inserts a task into the priority queue func (sq *servingQueue) addTask(task *servingTask) { if sq.best == nil { @@ -177,10 +294,18 @@ func (sq *servingQueue) addTask(task *servingTask) { } else if task.priority > sq.best.priority { sq.queue.Push(sq.best, sq.best.priority) sq.best = task - return } else { sq.queue.Push(task, task.priority) } + sq.updateRecentTime() + sq.queuedTime += task.expTime + if sq.logQueuedTime != nil { + sq.logRecentTime.Update(float64(sq.recentTime) / 1000) + sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) + } + if sq.recentTime+sq.queuedTime > sq.burstLimit { + sq.freezePeers() + } } // queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh @@ -189,10 +314,18 @@ func (sq *servingQueue) addTask(task *servingTask) { func (sq *servingQueue) queueLoop() { for { if sq.best != nil { + expTime := sq.best.expTime select { case task := <-sq.queueAddCh: sq.addTask(task) case sq.queueBestCh <- sq.best: + sq.updateRecentTime() + sq.queuedTime -= expTime + sq.recentTime += expTime + if sq.logQueuedTime != nil { + sq.logRecentTime.Update(float64(sq.recentTime) / 1000) + sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) + } if sq.queue.Size() == 0 { sq.best = nil } else { |