diff options
author | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2016-12-15 18:13:52 +0800 |
---|---|---|
committer | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2017-01-06 11:34:31 +0800 |
commit | 93f9c023ccda2256079484d6c2a3159818ba6691 (patch) | |
tree | f2ed341904459184587ca1cc8667d8169761f752 /les/handler.go | |
parent | e0ee0cc66a4416edd47232649f4d4bca4a5e3c07 (diff) | |
download | go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.gz go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.bz2 go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.lz go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.xz go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.tar.zst go-tangerine-93f9c023ccda2256079484d6c2a3159818ba6691.zip |
les: fixed selectPeer deadlock, improved request distribution
les/flowcontrol: using proper types for relative and absolute times
Diffstat (limited to 'les/handler.go')
-rw-r--r-- | les/handler.go | 69 |
1 files changed, 36 insertions, 33 deletions
diff --git a/les/handler.go b/les/handler.go index b024841f2..603ce9ad4 100644 --- a/les/handler.go +++ b/les/handler.go @@ -24,6 +24,7 @@ import ( "math/big" "net" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -228,6 +229,12 @@ func (pm *ProtocolManager) removePeer(id string) { if peer == nil { return } + if err := pm.peers.Unregister(id); err != nil { + if err == errNotRegistered { + return + } + glog.V(logger.Error).Infoln("Removal failed:", err) + } glog.V(logger.Debug).Infoln("Removing peer", id) // Unregister the peer from the downloader and Ethereum peer set @@ -241,9 +248,6 @@ func (pm *ProtocolManager) removePeer(id string) { pm.fetcher.removePeer(peer) } } - if err := pm.peers.Unregister(id); err != nil { - glog.V(logger.Error).Infoln("Removal failed:", err) - } // Hard disconnect at the networking layer if peer != nil { peer.Peer.Disconnect(p2p.DiscUselessPeer) @@ -340,12 +344,14 @@ func (pm *ProtocolManager) handle(p *peer) error { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) + p.fcServer.MustAssignRequest(reqID) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) + p.fcServer.MustAssignRequest(reqID) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } @@ -404,26 +410,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return err } - var costs *requestCosts - var reqCnt, maxReqs int - glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size) - if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type - costs = rc - if p.fcClient == nil { - return errResp(ErrRequestRejected, "") + + costs := p.fcCosts[msg.Code] + reject := func(reqCnt, maxCnt uint64) bool { + if p.fcClient == nil || reqCnt > maxCnt { + return true } - bv, ok := p.fcClient.AcceptRequest() - if !ok || bv < costs.baseCost { - return errResp(ErrRequestRejected, "") + bufValue, _ := p.fcClient.AcceptRequest() + cost := costs.baseCost + reqCnt*costs.reqCost + if cost > pm.server.defParams.BufLimit { + cost = pm.server.defParams.BufLimit } - maxReqs = 10000 - if bv < pm.server.defParams.BufLimit { - d := bv - costs.baseCost - if d/10000 < costs.reqCost { - maxReqs = int(d / costs.reqCost) - } + if cost > bufValue { + glog.V(logger.Error).Infof("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge)) + return true } + return false } if msg.Size > ProtocolMaxMsgSize { @@ -450,7 +453,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) if pm.fetcher != nil { - go pm.fetcher.announce(p, &req) + pm.fetcher.announce(p, &req) } case GetBlockHeadersMsg: @@ -465,7 +468,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } query := req.Query - if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch { + if reject(query.Amount, MaxHeaderFetch) { return errResp(ErrRequestRejected, "") } @@ -573,8 +576,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int bodies []rlp.RawValue ) - reqCnt = len(req.Hashes) - if reqCnt > maxReqs || reqCnt > MaxBodyFetch { + reqCnt := len(req.Hashes) + if reject(uint64(reqCnt), MaxBodyFetch) { return errResp(ErrRequestRejected, "") } for _, hash := range req.Hashes { @@ -627,8 +630,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int data [][]byte ) - reqCnt = len(req.Reqs) - if reqCnt > maxReqs || reqCnt > MaxCodeFetch { + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxCodeFetch) { return errResp(ErrRequestRejected, "") } for _, req := range req.Reqs { @@ -688,8 +691,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int receipts []rlp.RawValue ) - reqCnt = len(req.Hashes) - if reqCnt > maxReqs || reqCnt > MaxReceiptFetch { + reqCnt := len(req.Hashes) + if reject(uint64(reqCnt), MaxReceiptFetch) { return errResp(ErrRequestRejected, "") } for _, hash := range req.Hashes { @@ -751,8 +754,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int proofs proofsData ) - reqCnt = len(req.Reqs) - if reqCnt > maxReqs || reqCnt > MaxProofsFetch { + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxProofsFetch) { return errResp(ErrRequestRejected, "") } for _, req := range req.Reqs { @@ -818,8 +821,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int proofs []ChtResp ) - reqCnt = len(req.Reqs) - if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch { + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxHeaderProofsFetch) { return errResp(ErrRequestRejected, "") } for _, req := range req.Reqs { @@ -872,8 +875,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - reqCnt = len(txs) - if reqCnt > maxReqs || reqCnt > MaxTxSend { + reqCnt := len(txs) + if reject(uint64(reqCnt), MaxTxSend) { return errResp(ErrRequestRejected, "") } |