diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-03-23 03:44:22 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-23 03:44:22 +0800 |
commit | 525116dbff916825463931361f75e75e955c12e2 (patch) | |
tree | b272801c420ad9a591f227919567c7952b0bd512 /les/peer.go | |
parent | 1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff) | |
download | go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2 go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip |
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
Diffstat (limited to 'les/peer.go')
-rw-r--r-- | les/peer.go | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/les/peer.go b/les/peer.go index ef5f8a6ce..4793da296 100644 --- a/les/peer.go +++ b/les/peer.go @@ -22,6 +22,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -37,7 +38,10 @@ var ( errNotRegistered = errors.New("peer is not registered") ) -const maxHeadInfoLen = 20 +const ( + maxHeadInfoLen = 20 + maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) +) type peer struct { *p2p.Peer @@ -53,9 +57,11 @@ type peer struct { lock sync.RWMutex announceChn chan announceData + sendQueue *execQueue - poolEntry *poolEntry - hasBlock func(common.Hash, uint64) bool + poolEntry *poolEntry + hasBlock func(common.Hash, uint64) bool + responseErrors int fcClient *flowcontrol.ClientNode // nil if the peer is server only fcServer *flowcontrol.ServerNode // nil if the peer is client only @@ -76,6 +82,14 @@ func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { } } +func (p *peer) canQueue() bool { + return p.sendQueue.canQueue() +} + +func (p *peer) queueSend(f func()) { + p.sendQueue.queue(f) +} + // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *eth.PeerInfo { return ð.PeerInfo{ @@ -117,6 +131,11 @@ func (p *peer) Td() *big.Int { return new(big.Int).Set(p.headInfo.Td) } +// waitBefore implements distPeer interface +func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) { + return p.fcServer.CanSend(maxCost) +} + func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error { type req struct { ReqID uint64 @@ -237,11 +256,8 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error { return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs) } -func (p *peer) SendTxs(cost uint64, txs types.Transactions) error { +func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { p.Log().Debug("Fetching batch of transactions", "count", len(txs)) - reqID := getNextReqID() - p.fcServer.MustAssignRequest(reqID) - p.fcServer.SendRequest(reqID, cost) return p2p.Send(p.rw, SendTxMsg, txs) } @@ -444,6 +460,7 @@ func (ps *peerSet) Register(p *peer) error { return errAlreadyRegistered } ps.peers[p.id] = p + p.sendQueue = newExecQueue(100) return nil } @@ -453,8 +470,10 @@ func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() defer ps.lock.Unlock() - if _, ok := ps.peers[id]; !ok { + if p, ok := ps.peers[id]; !ok { return errNotRegistered + } else { + p.sendQueue.quit() } delete(ps.peers, id) return nil |