aboutsummaryrefslogtreecommitdiffstats
path: root/les/txrelay.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/txrelay.go')
-rw-r--r--les/txrelay.go27
1 files changed, 22 insertions, 5 deletions
diff --git a/les/txrelay.go b/les/txrelay.go
index 76d416c57..1ca3467e4 100644
--- a/les/txrelay.go
+++ b/les/txrelay.go
@@ -35,13 +35,14 @@ type LesTxRelay struct {
peerList []*peer
peerStartPos int
lock sync.RWMutex
+
+ reqDist *requestDistributor
}
func NewLesTxRelay() *LesTxRelay {
return &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}),
- ps: newPeerSet(),
}
}
@@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
}
for p, list := range sendTo {
- cost := p.GetRequestCost(SendTxMsg, len(list))
- go func(p *peer, list types.Transactions, cost uint64) {
- p.SendTxs(cost, list)
- }(p, list, cost)
+ pp := p
+ ll := list
+
+ reqID := getNextReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(SendTxMsg, len(ll))
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == pp
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(SendTxMsg, len(ll))
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.SendTxs(reqID, cost, ll) }
+ },
+ }
+ self.reqDist.queue(rq)
}
}