aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2019-02-26 19:32:48 +0800
committerFelix Lange <fjl@users.noreply.github.com>2019-02-26 19:32:48 +0800
commitc2003ed63b975c6318e4dd7e65b69c60777b0ddf (patch)
tree0045c39070c4d2b9fcaa93cbcbdd8d5ea4f4a29f /les/handler.go
parentc2b33a117f686069e36d58d937e1c75d72d7b94c (diff)
downloadgo-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.gz
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.bz2
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.lz
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.xz
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.tar.zst
go-tangerine-c2003ed63b975c6318e4dd7e65b69c60777b0ddf.zip
les, les/flowcontrol: improved request serving and flow control (#18230)
This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go803
1 files changed, 422 insertions, 381 deletions
diff --git a/les/handler.go b/les/handler.go
index 680e115b0..0352f5b03 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// Package les implements the Light Ethereum Subprotocol.
package les
import (
@@ -22,12 +21,10 @@ import (
"encoding/json"
"fmt"
"math/big"
- "net"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -90,21 +87,21 @@ type txPool interface {
}
type ProtocolManager struct {
- lightSync bool
- txpool txPool
- txrelay *LesTxRelay
- networkId uint64
- chainConfig *params.ChainConfig
- iConfig *light.IndexerConfig
- blockchain BlockChain
- chainDb ethdb.Database
- odr *LesOdr
- server *LesServer
- serverPool *serverPool
- clientPool *freeClientPool
- lesTopic discv5.Topic
- reqDist *requestDistributor
- retriever *retrieveManager
+ lightSync bool
+ txpool txPool
+ txrelay *LesTxRelay
+ networkId uint64
+ chainConfig *params.ChainConfig
+ iConfig *light.IndexerConfig
+ blockchain BlockChain
+ chainDb ethdb.Database
+ odr *LesOdr
+ server *LesServer
+ serverPool *serverPool
+ lesTopic discv5.Topic
+ reqDist *requestDistributor
+ retriever *retrieveManager
+ servingQueue *servingQueue
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -165,6 +162,8 @@ func NewProtocolManager(
if odr != nil {
manager.retriever = odr.retriever
manager.reqDist = odr.retriever.dist
+ } else {
+ manager.servingQueue = newServingQueue(int64(time.Millisecond * 10))
}
if ulcConfig != nil {
@@ -181,7 +180,6 @@ func NewProtocolManager(
manager.peers.notify((*downloaderPeerNotify)(manager))
manager.fetcher = newLightFetcher(manager)
}
-
return manager, nil
}
@@ -192,11 +190,9 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
-
if pm.lightSync {
go pm.syncer()
} else {
- pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{})
go func() {
for range pm.newPeerCh {
}
@@ -214,8 +210,9 @@ func (pm *ProtocolManager) Stop() {
pm.noMorePeers <- struct{}{}
close(pm.quitSync) // quits syncer, fetcher
- if pm.clientPool != nil {
- pm.clientPool.stop()
+
+ if pm.servingQueue != nil {
+ pm.servingQueue.stop()
}
// Disconnect existing sessions.
@@ -286,17 +283,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
-
- if !pm.lightSync && !p.Peer.Info().Network.Trusted {
- addr, ok := p.RemoteAddr().(*net.TCPAddr)
- // test peer address is not a tcp address, don't use client pool if can not typecast
- if ok {
- id := addr.IP.String()
- if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) {
- return p2p.DiscTooManyPeers
- }
- defer pm.clientPool.disconnect(id)
- }
+ if p.fcClient != nil {
+ defer p.fcClient.Disconnect()
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
@@ -309,9 +297,6 @@ func (pm *ProtocolManager) handle(p *peer) error {
return err
}
defer func() {
- if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
- p.fcClient.Remove(pm.server.fcManager)
- }
pm.removePeer(p.id)
}()
@@ -329,31 +314,18 @@ func (pm *ProtocolManager) handle(p *peer) error {
}
}
- stop := make(chan struct{})
- defer close(stop)
- go func() {
- // new block announce loop
- for {
- select {
- case announce := <-p.announceChn:
- p.SendAnnounce(announce)
- case <-stop:
- return
- }
- }
- }()
-
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Light Ethereum message handling failed", "err", err)
+ if p.fcServer != nil {
+ p.fcServer.DumpLogs()
+ }
return err
}
}
}
-var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
-
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
@@ -364,22 +336,31 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
- costs := p.fcCosts[msg.Code]
- reject := func(reqCnt, maxCnt uint64) bool {
- if p.fcClient == nil || reqCnt > maxCnt {
- return true
+ p.responseCount++
+ responseCount := p.responseCount
+ var (
+ maxCost uint64
+ task *servingTask
+ )
+
+ accept := func(reqID, reqCnt, maxCnt uint64) bool {
+ if reqCnt == 0 {
+ return false
}
- bufValue, _ := p.fcClient.AcceptRequest()
- cost := costs.baseCost + reqCnt*costs.reqCost
- if cost > pm.server.defParams.BufLimit {
- cost = pm.server.defParams.BufLimit
+ if p.fcClient == nil || reqCnt > maxCnt {
+ return false
}
- if cost > bufValue {
- recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
- p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
- return true
+ maxCost = p.fcCosts.getCost(msg.Code, reqCnt)
+
+ if accepted, bufShort, servingPriority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost); !accepted {
+ if bufShort > 0 {
+ p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
+ }
+ return false
+ } else {
+ task = pm.servingQueue.newTask(servingPriority)
}
- return false
+ return task.start()
}
if msg.Size > ProtocolMaxMsgSize {
@@ -389,6 +370,31 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var deliverMsg *Msg
+ sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
+ p.responseLock.Lock()
+ defer p.responseLock.Unlock()
+
+ var replySize uint32
+ if reply != nil {
+ replySize = reply.size()
+ }
+ var realCost uint64
+ if pm.server.costTracker != nil {
+ realCost = pm.server.costTracker.realCost(servingTime, msg.Size, replySize)
+ pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
+ } else {
+ realCost = maxCost
+ }
+ bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
+ if reply != nil {
+ p.queueSend(func() {
+ if err := reply.send(bv); err != nil {
+ p.errCh <- err
+ }
+ })
+ }
+ }
+
// Handle the message depending on its contents
switch msg.Code {
case StatusMsg:
@@ -399,25 +405,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Block header query, collect the requested headers and reply
case AnnounceMsg:
p.Log().Trace("Received announce message")
- if p.announceType == announceTypeNone {
- return errResp(ErrUnexpectedResponse, "")
- }
var req announceData
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
- if p.announceType == announceTypeSigned {
- if err := req.checkSignature(p.ID()); err != nil {
- p.Log().Trace("Invalid announcement signature", "err", err)
- return err
- }
- p.Log().Trace("Valid announcement signature")
+ update, size := req.Update.decode()
+ if p.rejectUpdate(size) {
+ return errResp(ErrRequestRejected, "")
}
+ p.updateFlowControl(update)
- p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
- if pm.fetcher != nil {
- pm.fetcher.announce(p, &req)
+ if req.Hash != (common.Hash{}) {
+ if p.announceType == announceTypeNone {
+ return errResp(ErrUnexpectedResponse, "")
+ }
+ if p.announceType == announceTypeSigned {
+ if err := req.checkSignature(p.ID(), update); err != nil {
+ p.Log().Trace("Invalid announcement signature", "err", err)
+ return err
+ }
+ p.Log().Trace("Valid announcement signature")
+ }
+
+ p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
+ if pm.fetcher != nil {
+ pm.fetcher.announce(p, &req)
+ }
}
case GetBlockHeadersMsg:
@@ -432,93 +446,94 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
query := req.Query
- if reject(query.Amount, MaxHeaderFetch) {
+ if !accept(req.ReqID, query.Amount, MaxHeaderFetch) {
return errResp(ErrRequestRejected, "")
}
-
- hashMode := query.Origin.Hash != (common.Hash{})
- first := true
- maxNonCanonical := uint64(100)
-
- // Gather headers until the fetch or network limits is reached
- var (
- bytes common.StorageSize
- headers []*types.Header
- unknown bool
- )
- for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
- // Retrieve the next header satisfying the query
- var origin *types.Header
- if hashMode {
- if first {
- first = false
- origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
- if origin != nil {
- query.Origin.Number = origin.Number.Uint64()
+ go func() {
+ hashMode := query.Origin.Hash != (common.Hash{})
+ first := true
+ maxNonCanonical := uint64(100)
+
+ // Gather headers until the fetch or network limits is reached
+ var (
+ bytes common.StorageSize
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
+ if !first && !task.waitOrStop() {
+ return
+ }
+ // Retrieve the next header satisfying the query
+ var origin *types.Header
+ if hashMode {
+ if first {
+ origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
+ if origin != nil {
+ query.Origin.Number = origin.Number.Uint64()
+ }
+ } else {
+ origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
- origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
+ origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
- } else {
- origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
- }
- if origin == nil {
- break
- }
- headers = append(headers, origin)
- bytes += estHeaderRlpSize
-
- // Advance to the next header of the query
- switch {
- case hashMode && query.Reverse:
- // Hash based traversal towards the genesis block
- ancestor := query.Skip + 1
- if ancestor == 0 {
- unknown = true
- } else {
- query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
- unknown = (query.Origin.Hash == common.Hash{})
+ if origin == nil {
+ break
}
- case hashMode && !query.Reverse:
- // Hash based traversal towards the leaf block
- var (
- current = origin.Number.Uint64()
- next = current + query.Skip + 1
- )
- if next <= current {
- infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
- p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
- unknown = true
- } else {
- if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
- nextHash := header.Hash()
- expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
- if expOldHash == query.Origin.Hash {
- query.Origin.Hash, query.Origin.Number = nextHash, next
+ headers = append(headers, origin)
+ bytes += estHeaderRlpSize
+
+ // Advance to the next header of the query
+ switch {
+ case hashMode && query.Reverse:
+ // Hash based traversal towards the genesis block
+ ancestor := query.Skip + 1
+ if ancestor == 0 {
+ unknown = true
+ } else {
+ query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
+ unknown = (query.Origin.Hash == common.Hash{})
+ }
+ case hashMode && !query.Reverse:
+ // Hash based traversal towards the leaf block
+ var (
+ current = origin.Number.Uint64()
+ next = current + query.Skip + 1
+ )
+ if next <= current {
+ infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
+ p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
+ unknown = true
+ } else {
+ if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
+ nextHash := header.Hash()
+ expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
+ if expOldHash == query.Origin.Hash {
+ query.Origin.Hash, query.Origin.Number = nextHash, next
+ } else {
+ unknown = true
+ }
} else {
unknown = true
}
+ }
+ case query.Reverse:
+ // Number based traversal towards the genesis block
+ if query.Origin.Number >= query.Skip+1 {
+ query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}
- }
- case query.Reverse:
- // Number based traversal towards the genesis block
- if query.Origin.Number >= query.Skip+1 {
- query.Origin.Number -= query.Skip + 1
- } else {
- unknown = true
- }
- case !query.Reverse:
- // Number based traversal towards the leaf block
- query.Origin.Number += query.Skip + 1
+ case !query.Reverse:
+ // Number based traversal towards the leaf block
+ query.Origin.Number += query.Skip + 1
+ }
+ first = false
}
- }
-
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
- return p.SendBlockHeaders(req.ReqID, bv, headers)
+ sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done())
+ }()
case BlockHeadersMsg:
if pm.downloader == nil {
@@ -534,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
@@ -560,24 +575,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bodies []rlp.RawValue
)
reqCnt := len(req.Hashes)
- if reject(uint64(reqCnt), MaxBodyFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
return errResp(ErrRequestRejected, "")
}
- for _, hash := range req.Hashes {
- if bytes >= softResponseLimit {
- break
- }
- // Retrieve the requested block body, stopping if enough was found
- if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
- if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
- bodies = append(bodies, data)
- bytes += len(data)
+ go func() {
+ for i, hash := range req.Hashes {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ if bytes >= softResponseLimit {
+ break
+ }
+ // Retrieve the requested block body, stopping if enough was found
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
+ if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
+ bodies = append(bodies, data)
+ bytes += len(data)
+ }
}
}
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyBlockBodiesRLP(req.ReqID, bodies), task.done())
+ }()
case BlockBodiesMsg:
if pm.odr == nil {
@@ -593,7 +611,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgBlockBodies,
ReqID: resp.ReqID,
@@ -616,33 +634,36 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
data [][]byte
)
reqCnt := len(req.Reqs)
- if reject(uint64(reqCnt), MaxCodeFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
return errResp(ErrRequestRejected, "")
}
- for _, req := range req.Reqs {
- // Retrieve the requested state entry, stopping if enough was found
- if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
- if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
- statedb, err := pm.blockchain.State()
- if err != nil {
- continue
- }
- account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
- if err != nil {
- continue
- }
- code, _ := statedb.Database().TrieDB().Node(common.BytesToHash(account.CodeHash))
+ go func() {
+ for i, req := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ // Retrieve the requested state entry, stopping if enough was found
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
+ if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
+ statedb, err := pm.blockchain.State()
+ if err != nil {
+ continue
+ }
+ account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
+ if err != nil {
+ continue
+ }
+ code, _ := statedb.Database().TrieDB().Node(common.BytesToHash(account.CodeHash))
- data = append(data, code)
- if bytes += len(code); bytes >= softResponseLimit {
- break
+ data = append(data, code)
+ if bytes += len(code); bytes >= softResponseLimit {
+ break
+ }
}
}
}
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendCode(req.ReqID, bv, data)
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyCode(req.ReqID, data), task.done())
+ }()
case CodeMsg:
if pm.odr == nil {
@@ -658,7 +679,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgCode,
ReqID: resp.ReqID,
@@ -681,34 +702,37 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
receipts []rlp.RawValue
)
reqCnt := len(req.Hashes)
- if reject(uint64(reqCnt), MaxReceiptFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
return errResp(ErrRequestRejected, "")
}
- for _, hash := range req.Hashes {
- if bytes >= softResponseLimit {
- break
- }
- // Retrieve the requested block's receipts, skipping if unknown to us
- var results types.Receipts
- if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
- results = rawdb.ReadReceipts(pm.chainDb, hash, *number)
- }
- if results == nil {
- if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
- continue
+ go func() {
+ for i, hash := range req.Hashes {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ if bytes >= softResponseLimit {
+ break
+ }
+ // Retrieve the requested block's receipts, skipping if unknown to us
+ var results types.Receipts
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
+ results = rawdb.ReadReceipts(pm.chainDb, hash, *number)
+ }
+ if results == nil {
+ if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
+ continue
+ }
+ }
+ // If known, encode and queue for response packet
+ if encoded, err := rlp.EncodeToBytes(results); err != nil {
+ log.Error("Failed to encode receipt", "err", err)
+ } else {
+ receipts = append(receipts, encoded)
+ bytes += len(encoded)
}
}
- // If known, encode and queue for response packet
- if encoded, err := rlp.EncodeToBytes(results); err != nil {
- log.Error("Failed to encode receipt", "err", err)
- } else {
- receipts = append(receipts, encoded)
- bytes += len(encoded)
- }
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendReceiptsRLP(req.ReqID, bv, receipts)
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyReceiptsRLP(req.ReqID, receipts), task.done())
+ }()
case ReceiptsMsg:
if pm.odr == nil {
@@ -724,7 +748,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgReceipts,
ReqID: resp.ReqID,
@@ -747,42 +771,45 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
proofs proofsData
)
reqCnt := len(req.Reqs)
- if reject(uint64(reqCnt), MaxProofsFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
return errResp(ErrRequestRejected, "")
}
- for _, req := range req.Reqs {
- // Retrieve the requested state entry, stopping if enough was found
- if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
- if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
- statedb, err := pm.blockchain.State()
- if err != nil {
- continue
- }
- var trie state.Trie
- if len(req.AccKey) > 0 {
- account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
+ go func() {
+ for i, req := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ // Retrieve the requested state entry, stopping if enough was found
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
+ if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
+ statedb, err := pm.blockchain.State()
if err != nil {
continue
}
- trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
- } else {
- trie, _ = statedb.Database().OpenTrie(header.Root)
- }
- if trie != nil {
- var proof light.NodeList
- trie.Prove(req.Key, 0, &proof)
-
- proofs = append(proofs, proof)
- if bytes += proof.DataSize(); bytes >= softResponseLimit {
- break
+ var trie state.Trie
+ if len(req.AccKey) > 0 {
+ account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
+ if err != nil {
+ continue
+ }
+ trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
+ } else {
+ trie, _ = statedb.Database().OpenTrie(header.Root)
+ }
+ if trie != nil {
+ var proof light.NodeList
+ trie.Prove(req.Key, 0, &proof)
+
+ proofs = append(proofs, proof)
+ if bytes += proof.DataSize(); bytes >= softResponseLimit {
+ break
+ }
}
}
}
}
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendProofs(req.ReqID, bv, proofs)
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofs(req.ReqID, proofs), task.done())
+ }()
case GetProofsV2Msg:
p.Log().Trace("Received les/2 proofs request")
@@ -801,50 +828,53 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
root common.Hash
)
reqCnt := len(req.Reqs)
- if reject(uint64(reqCnt), MaxProofsFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
return errResp(ErrRequestRejected, "")
}
+ go func() {
- nodes := light.NewNodeSet()
-
- for _, req := range req.Reqs {
- // Look up the state belonging to the request
- if statedb == nil || req.BHash != lastBHash {
- statedb, root, lastBHash = nil, common.Hash{}, req.BHash
+ nodes := light.NewNodeSet()
- if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
- if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
- statedb, _ = pm.blockchain.State()
- root = header.Root
+ for i, req := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ // Look up the state belonging to the request
+ if statedb == nil || req.BHash != lastBHash {
+ statedb, root, lastBHash = nil, common.Hash{}, req.BHash
+
+ if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
+ if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
+ statedb, _ = pm.blockchain.State()
+ root = header.Root
+ }
}
}
- }
- if statedb == nil {
- continue
- }
- // Pull the account or storage trie of the request
- var trie state.Trie
- if len(req.AccKey) > 0 {
- account, err := pm.getAccount(statedb, root, common.BytesToHash(req.AccKey))
- if err != nil {
+ if statedb == nil {
continue
}
- trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
- } else {
- trie, _ = statedb.Database().OpenTrie(root)
- }
- if trie == nil {
- continue
- }
- // Prove the user's request from the account or stroage trie
- trie.Prove(req.Key, req.FromLevel, nodes)
- if nodes.DataSize() >= softResponseLimit {
- break
+ // Pull the account or storage trie of the request
+ var trie state.Trie
+ if len(req.AccKey) > 0 {
+ account, err := pm.getAccount(statedb, root, common.BytesToHash(req.AccKey))
+ if err != nil {
+ continue
+ }
+ trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
+ } else {
+ trie, _ = statedb.Database().OpenTrie(root)
+ }
+ if trie == nil {
+ continue
+ }
+ // Prove the user's request from the account or stroage trie
+ trie.Prove(req.Key, req.FromLevel, nodes)
+ if nodes.DataSize() >= softResponseLimit {
+ break
+ }
}
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendProofsV2(req.ReqID, bv, nodes.NodeList())
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofsV2(req.ReqID, nodes.NodeList()), task.done())
+ }()
case ProofsV1Msg:
if pm.odr == nil {
@@ -860,7 +890,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgProofsV1,
ReqID: resp.ReqID,
@@ -881,7 +911,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgProofsV2,
ReqID: resp.ReqID,
@@ -904,34 +934,37 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
proofs []ChtResp
)
reqCnt := len(req.Reqs)
- if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
return errResp(ErrRequestRejected, "")
}
- trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
- for _, req := range req.Reqs {
- if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
- sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1)
- if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
- trie, err := trie.New(root, trieDb)
- if err != nil {
- continue
- }
- var encNumber [8]byte
- binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
+ go func() {
+ trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
+ for i, req := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
+ sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1)
+ if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
+ trie, err := trie.New(root, trieDb)
+ if err != nil {
+ continue
+ }
+ var encNumber [8]byte
+ binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
- var proof light.NodeList
- trie.Prove(encNumber[:], 0, &proof)
+ var proof light.NodeList
+ trie.Prove(encNumber[:], 0, &proof)
- proofs = append(proofs, ChtResp{Header: header, Proof: proof})
- if bytes += proof.DataSize() + estHeaderRlpSize; bytes >= softResponseLimit {
- break
+ proofs = append(proofs, ChtResp{Header: header, Proof: proof})
+ if bytes += proof.DataSize() + estHeaderRlpSize; bytes >= softResponseLimit {
+ break
+ }
}
}
}
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendHeaderProofs(req.ReqID, bv, proofs)
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHeaderProofs(req.ReqID, proofs), task.done())
+ }()
case GetHelperTrieProofsMsg:
p.Log().Trace("Received helper trie proof request")
@@ -949,50 +982,53 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
auxData [][]byte
)
reqCnt := len(req.Reqs)
- if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
return errResp(ErrRequestRejected, "")
}
+ go func() {
- var (
- lastIdx uint64
- lastType uint
- root common.Hash
- auxTrie *trie.Trie
- )
- nodes := light.NewNodeSet()
- for _, req := range req.Reqs {
- if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx {
- auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx
-
- var prefix string
- if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) {
- auxTrie, _ = trie.New(root, trie.NewDatabase(ethdb.NewTable(pm.chainDb, prefix)))
- }
- }
- if req.AuxReq == auxRoot {
- var data []byte
- if root != (common.Hash{}) {
- data = root[:]
+ var (
+ lastIdx uint64
+ lastType uint
+ root common.Hash
+ auxTrie *trie.Trie
+ )
+ nodes := light.NewNodeSet()
+ for i, req := range req.Reqs {
+ if i != 0 && !task.waitOrStop() {
+ return
}
- auxData = append(auxData, data)
- auxBytes += len(data)
- } else {
- if auxTrie != nil {
- auxTrie.Prove(req.Key, req.FromLevel, nodes)
+ if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx {
+ auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx
+
+ var prefix string
+ if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) {
+ auxTrie, _ = trie.New(root, trie.NewDatabase(ethdb.NewTable(pm.chainDb, prefix)))
+ }
}
- if req.AuxReq != 0 {
- data := pm.getHelperTrieAuxData(req)
+ if req.AuxReq == auxRoot {
+ var data []byte
+ if root != (common.Hash{}) {
+ data = root[:]
+ }
auxData = append(auxData, data)
auxBytes += len(data)
+ } else {
+ if auxTrie != nil {
+ auxTrie.Prove(req.Key, req.FromLevel, nodes)
+ }
+ if req.AuxReq != 0 {
+ data := pm.getHelperTrieAuxData(req)
+ auxData = append(auxData, data)
+ auxBytes += len(data)
+ }
+ }
+ if nodes.DataSize()+auxBytes >= softResponseLimit {
+ break
}
}
- if nodes.DataSize()+auxBytes >= softResponseLimit {
- break
- }
- }
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
- return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}), task.done())
+ }()
case HeaderProofsMsg:
if pm.odr == nil {
@@ -1007,7 +1043,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgHeaderProofs,
ReqID: resp.ReqID,
@@ -1028,7 +1064,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgHelperTrieProofs,
ReqID: resp.ReqID,
@@ -1045,13 +1081,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(txs)
- if reject(uint64(reqCnt), MaxTxSend) {
+ if !accept(0, uint64(reqCnt), MaxTxSend) {
return errResp(ErrRequestRejected, "")
}
- pm.txpool.AddRemotes(txs)
-
- _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
+ go func() {
+ for i, tx := range txs {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ pm.txpool.AddRemotes([]*types.Transaction{tx})
+ }
+ sendResponse(0, uint64(reqCnt), nil, task.done())
+ }()
case SendTxV2Msg:
if pm.txpool == nil {
@@ -1066,29 +1107,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(req.Txs)
- if reject(uint64(reqCnt), MaxTxSend) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
return errResp(ErrRequestRejected, "")
}
-
- hashes := make([]common.Hash, len(req.Txs))
- for i, tx := range req.Txs {
- hashes[i] = tx.Hash()
- }
- stats := pm.txStatus(hashes)
- for i, stat := range stats {
- if stat.Status == core.TxStatusUnknown {
- if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil {
- stats[i].Error = errs[0].Error()
- continue
+ go func() {
+ stats := make([]txStatus, len(req.Txs))
+ for i, tx := range req.Txs {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ hash := tx.Hash()
+ stats[i] = pm.txStatus(hash)
+ if stats[i].Status == core.TxStatusUnknown {
+ if errs := pm.txpool.AddRemotes([]*types.Transaction{tx}); errs[0] != nil {
+ stats[i].Error = errs[0].Error()
+ continue
+ }
+ stats[i] = pm.txStatus(hash)
}
- stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0]
}
- }
-
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
-
- return p.SendTxStatus(req.ReqID, bv, stats)
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
+ }()
case GetTxStatusMsg:
if pm.txpool == nil {
@@ -1103,13 +1142,19 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(req.Hashes)
- if reject(uint64(reqCnt), MaxTxStatus) {
+ if !accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
return errResp(ErrRequestRejected, "")
}
- bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
- pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
-
- return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes))
+ go func() {
+ stats := make([]txStatus, len(req.Hashes))
+ for i, hash := range req.Hashes {
+ if i != 0 && !task.waitOrStop() {
+ return
+ }
+ stats[i] = pm.txStatus(hash)
+ }
+ sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
+ }()
case TxStatusMsg:
if pm.odr == nil {
@@ -1125,7 +1170,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.fcServer.GotReply(resp.ReqID, resp.BV)
+ p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
default:
p.Log().Trace("Received unknown message", "code", msg.Code)
@@ -1185,21 +1230,17 @@ func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
return nil
}
-func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus {
- stats := make([]txStatus, len(hashes))
- for i, stat := range pm.txpool.Status(hashes) {
- // Save the status we've got from the transaction pool
- stats[i].Status = stat
-
- // If the transaction is unknown to the pool, try looking it up locally
- if stat == core.TxStatusUnknown {
- if tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(pm.chainDb, hashes[i]); tx != nil {
- stats[i].Status = core.TxStatusIncluded
- stats[i].Lookup = &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex}
- }
+func (pm *ProtocolManager) txStatus(hash common.Hash) txStatus {
+ var stat txStatus
+ stat.Status = pm.txpool.Status([]common.Hash{hash})[0]
+ // If the transaction is unknown to the pool, try looking it up locally
+ if stat.Status == core.TxStatusUnknown {
+ if tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(pm.chainDb, hash); tx != nil {
+ stat.Status = core.TxStatusIncluded
+ stat.Lookup = &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex}
}
}
- return stats
+ return stat
}
// isULCEnabled returns true if we can use ULC
@@ -1235,7 +1276,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
- peer.fcServer.QueueRequest(reqID, cost)
+ peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
},
}
@@ -1259,7 +1300,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
- peer.fcServer.QueueRequest(reqID, cost)
+ peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
},
}