diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-03-23 03:44:22 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-23 03:44:22 +0800 |
commit | 525116dbff916825463931361f75e75e955c12e2 (patch) | |
tree | b272801c420ad9a591f227919567c7952b0bd512 /les/txrelay.go | |
parent | 1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff) | |
download | go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2 go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip |
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
Diffstat (limited to 'les/txrelay.go')
-rw-r--r-- | les/txrelay.go | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/les/txrelay.go b/les/txrelay.go index 76d416c57..1ca3467e4 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -35,13 +35,14 @@ type LesTxRelay struct { peerList []*peer peerStartPos int lock sync.RWMutex + + reqDist *requestDistributor } func NewLesTxRelay() *LesTxRelay { return &LesTxRelay{ txSent: make(map[common.Hash]*ltrInfo), txPending: make(map[common.Hash]struct{}), - ps: newPeerSet(), } } @@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { } for p, list := range sendTo { - cost := p.GetRequestCost(SendTxMsg, len(list)) - go func(p *peer, list types.Transactions, cost uint64) { - p.SendTxs(cost, list) - }(p, list, cost) + pp := p + ll := list + + reqID := getNextReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(SendTxMsg, len(ll)) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == pp + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(SendTxMsg, len(ll)) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.SendTxs(reqID, cost, ll) } + }, + } + self.reqDist.queue(rq) } } |