aboutsummaryrefslogtreecommitdiffstats
path: root/les/txrelay.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-03-23 03:44:22 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-03-23 03:44:22 +0800
commit525116dbff916825463931361f75e75e955c12e2 (patch)
treeb272801c420ad9a591f227919567c7952b0bd512 /les/txrelay.go
parent1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff)
downloadgo-tangerine-525116dbff916825463931361f75e75e955c12e2.tar
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst
go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock
Diffstat (limited to 'les/txrelay.go')
-rw-r--r--les/txrelay.go27
1 files changed, 22 insertions, 5 deletions
diff --git a/les/txrelay.go b/les/txrelay.go
index 76d416c57..1ca3467e4 100644
--- a/les/txrelay.go
+++ b/les/txrelay.go
@@ -35,13 +35,14 @@ type LesTxRelay struct {
peerList []*peer
peerStartPos int
lock sync.RWMutex
+
+ reqDist *requestDistributor
}
func NewLesTxRelay() *LesTxRelay {
return &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}),
- ps: newPeerSet(),
}
}
@@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
}
for p, list := range sendTo {
- cost := p.GetRequestCost(SendTxMsg, len(list))
- go func(p *peer, list types.Transactions, cost uint64) {
- p.SendTxs(cost, list)
- }(p, list, cost)
+ pp := p
+ ll := list
+
+ reqID := getNextReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(SendTxMsg, len(ll))
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == pp
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(SendTxMsg, len(ll))
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.SendTxs(reqID, cost, ll) }
+ },
+ }
+ self.reqDist.queue(rq)
}
}