diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-03-23 03:44:22 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-23 03:44:22 +0800 |
commit | 525116dbff916825463931361f75e75e955c12e2 (patch) | |
tree | b272801c420ad9a591f227919567c7952b0bd512 /les | |
parent | 1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff) | |
download | go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2 go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip |
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
Diffstat (limited to 'les')
-rw-r--r-- | les/backend.go | 2 | ||||
-rw-r--r-- | les/distributor.go | 259 | ||||
-rw-r--r-- | les/distributor_test.go | 192 | ||||
-rw-r--r-- | les/execqueue.go | 71 | ||||
-rw-r--r-- | les/fetcher.go | 188 | ||||
-rw-r--r-- | les/flowcontrol/control.go | 104 | ||||
-rw-r--r-- | les/handler.go | 66 | ||||
-rw-r--r-- | les/helper_test.go | 8 | ||||
-rw-r--r-- | les/odr.go | 99 | ||||
-rw-r--r-- | les/odr_requests.go | 12 | ||||
-rw-r--r-- | les/odr_test.go | 3 | ||||
-rw-r--r-- | les/peer.go | 35 | ||||
-rw-r--r-- | les/request_test.go | 3 | ||||
-rw-r--r-- | les/serverpool.go | 76 | ||||
-rw-r--r-- | les/txrelay.go | 27 |
15 files changed, 839 insertions, 306 deletions
diff --git a/les/backend.go b/les/backend.go index 404728c0e..3cab75f33 100644 --- a/les/backend.go +++ b/les/backend.go @@ -107,6 +107,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.LightMode, config.NetworkId, eth.eventMux, eth.pow, eth.blockchain, nil, chainDb, odr, relay); err != nil { return nil, err } + relay.ps = eth.protocolManager.peers + relay.reqDist = eth.protocolManager.reqDist eth.ApiBackend = &LesApiBackend{eth, nil} eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend) diff --git a/les/distributor.go b/les/distributor.go new file mode 100644 index 000000000..c59b36146 --- /dev/null +++ b/les/distributor.go @@ -0,0 +1,259 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package light implements on-demand retrieval capable state and chain objects +// for the Ethereum Light Client. +package les + +import ( + "container/list" + "errors" + "sync" + "time" +) + +// ErrNoPeers is returned if no peers capable of serving a queued request are available +var ErrNoPeers = errors.New("no suitable peers available") + +// requestDistributor implements a mechanism that distributes requests to +// suitable peers, obeying flow control rules and prioritizing them in creation +// order (even when a resend is necessary). +type requestDistributor struct { + reqQueue *list.List + lastReqOrder uint64 + stopChn, loopChn chan struct{} + loopNextSent bool + lock sync.Mutex + + getAllPeers func() map[distPeer]struct{} +} + +// distPeer is an LES server peer interface for the request distributor. +// waitBefore returns either the necessary waiting time before sending a request +// with the given upper estimated cost or the estimated remaining relative buffer +// value after sending such a request (in which case the request can be sent +// immediately). At least one of these values is always zero. +type distPeer interface { + waitBefore(uint64) (time.Duration, float64) + canQueue() bool + queueSend(f func()) +} + +// distReq is the request abstraction used by the distributor. It is based on +// three callback functions: +// - getCost returns the upper estimate of the cost of sending the request to a given peer +// - canSend tells if the server peer is suitable to serve the request +// - request prepares sending the request to the given peer and returns a function that +// does the actual sending. Request order should be preserved but the callback itself should not +// block until it is sent because other peers might still be able to receive requests while +// one of them is blocking. Instead, the returned function is put in the peer's send queue. +type distReq struct { + getCost func(distPeer) uint64 + canSend func(distPeer) bool + request func(distPeer) func() + + reqOrder uint64 + sentChn chan distPeer + element *list.Element +} + +// newRequestDistributor creates a new request distributor +func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor { + r := &requestDistributor{ + reqQueue: list.New(), + loopChn: make(chan struct{}, 2), + stopChn: stopChn, + getAllPeers: getAllPeers, + } + go r.loop() + return r +} + +// distMaxWait is the maximum waiting time after which further necessary waiting +// times are recalculated based on new feedback from the servers +const distMaxWait = time.Millisecond * 10 + +// main event loop +func (d *requestDistributor) loop() { + for { + select { + case <-d.stopChn: + d.lock.Lock() + elem := d.reqQueue.Front() + for elem != nil { + close(elem.Value.(*distReq).sentChn) + elem = elem.Next() + } + d.lock.Unlock() + return + case <-d.loopChn: + d.lock.Lock() + d.loopNextSent = false + loop: + for { + peer, req, wait := d.nextRequest() + if req != nil && wait == 0 { + chn := req.sentChn // save sentChn because remove sets it to nil + d.remove(req) + send := req.request(peer) + if send != nil { + peer.queueSend(send) + } + chn <- peer + close(chn) + } else { + if wait == 0 { + // no request to send and nothing to wait for; the next + // queued request will wake up the loop + break loop + } + d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received + if wait > distMaxWait { + // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically + wait = distMaxWait + } + go func() { + time.Sleep(wait) + d.loopChn <- struct{}{} + }() + break loop + } + } + d.lock.Unlock() + } + } +} + +// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect +type selectPeerItem struct { + peer distPeer + req *distReq + weight int64 +} + +// Weight implements wrsItem interface +func (sp selectPeerItem) Weight() int64 { + return sp.weight +} + +// nextRequest returns the next possible request from any peer, along with the +// associated peer and necessary waiting time +func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { + peers := d.getAllPeers() + + elem := d.reqQueue.Front() + var ( + bestPeer distPeer + bestReq *distReq + bestWait time.Duration + sel *weightedRandomSelect + ) + + for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + req := elem.Value.(*distReq) + canSend := false + for peer, _ := range peers { + if peer.canQueue() && req.canSend(peer) { + canSend = true + cost := req.getCost(peer) + wait, bufRemain := peer.waitBefore(cost) + if wait == 0 { + if sel == nil { + sel = newWeightedRandomSelect() + } + sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) + } else { + if bestReq == nil || wait < bestWait { + bestPeer = peer + bestReq = req + bestWait = wait + } + } + delete(peers, peer) + } + } + next := elem.Next() + if !canSend && elem == d.reqQueue.Front() { + close(req.sentChn) + d.remove(req) + } + elem = next + } + + if sel != nil { + c := sel.choose().(selectPeerItem) + return c.peer, c.req, 0 + } + return bestPeer, bestReq, bestWait +} + +// queue adds a request to the distribution queue, returns a channel where the +// receiving peer is sent once the request has been sent (request callback returned). +// If the request is cancelled or timed out without suitable peers, the channel is +// closed without sending any peer references to it. +func (d *requestDistributor) queue(r *distReq) chan distPeer { + d.lock.Lock() + defer d.lock.Unlock() + + if r.reqOrder == 0 { + d.lastReqOrder++ + r.reqOrder = d.lastReqOrder + } + + back := d.reqQueue.Back() + if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder { + r.element = d.reqQueue.PushBack(r) + } else { + before := d.reqQueue.Front() + for before.Value.(*distReq).reqOrder < r.reqOrder { + before = before.Next() + } + r.element = d.reqQueue.InsertBefore(r, before) + } + + if !d.loopNextSent { + d.loopNextSent = true + d.loopChn <- struct{}{} + } + + r.sentChn = make(chan distPeer, 1) + return r.sentChn +} + +// cancel removes a request from the queue if it has not been sent yet (returns +// false if it has been sent already). It is guaranteed that the callback functions +// will not be called after cancel returns. +func (d *requestDistributor) cancel(r *distReq) bool { + d.lock.Lock() + defer d.lock.Unlock() + + if r.sentChn == nil { + return false + } + + close(r.sentChn) + d.remove(r) + return true +} + +// remove removes a request from the queue +func (d *requestDistributor) remove(r *distReq) { + r.sentChn = nil + if r.element != nil { + d.reqQueue.Remove(r.element) + r.element = nil + } +} diff --git a/les/distributor_test.go b/les/distributor_test.go new file mode 100644 index 000000000..f2eb80729 --- /dev/null +++ b/les/distributor_test.go @@ -0,0 +1,192 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package light implements on-demand retrieval capable state and chain objects +// for the Ethereum Light Client. +package les + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testDistReq struct { + cost, procTime, order uint64 + canSendTo map[*testDistPeer]struct{} +} + +func (r *testDistReq) getCost(dp distPeer) uint64 { + return r.cost +} + +func (r *testDistReq) canSend(dp distPeer) bool { + _, ok := r.canSendTo[dp.(*testDistPeer)] + return ok +} + +func (r *testDistReq) request(dp distPeer) func() { + return func() { dp.(*testDistPeer).send(r) } +} + +type testDistPeer struct { + sent []*testDistReq + sumCost uint64 + lock sync.RWMutex +} + +func (p *testDistPeer) send(r *testDistReq) { + p.lock.Lock() + defer p.lock.Unlock() + + p.sent = append(p.sent, r) + p.sumCost += r.cost +} + +func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) { + var last uint64 + for { + wait := time.Millisecond + p.lock.Lock() + if len(p.sent) > 0 { + rq := p.sent[0] + wait = time.Duration(rq.procTime) + p.sumCost -= rq.cost + if checkOrder { + if rq.order <= last { + t.Errorf("Requests processed in wrong order") + } + last = rq.order + } + p.sent = p.sent[1:] + } + p.lock.Unlock() + select { + case <-stop: + return + case <-time.After(wait): + } + } +} + +const ( + testDistBufLimit = 10000000 + testDistMaxCost = 1000000 + testDistPeerCount = 5 + testDistReqCount = 50000 + testDistMaxResendCount = 3 +) + +func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) { + p.lock.RLock() + sumCost := p.sumCost + cost + p.lock.RUnlock() + if sumCost < testDistBufLimit { + return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit) + } else { + return time.Duration(sumCost - testDistBufLimit), 0 + } +} + +func (p *testDistPeer) canQueue() bool { + return true +} + +func (p *testDistPeer) queueSend(f func()) { + f() +} + +func TestRequestDistributor(t *testing.T) { + testRequestDistributor(t, false) +} + +func TestRequestDistributorResend(t *testing.T) { + testRequestDistributor(t, true) +} + +func testRequestDistributor(t *testing.T, resend bool) { + stop := make(chan struct{}) + defer close(stop) + + var peers [testDistPeerCount]*testDistPeer + for i, _ := range peers { + peers[i] = &testDistPeer{} + go peers[i].worker(t, !resend, stop) + } + + dist := newRequestDistributor(func() map[distPeer]struct{} { + m := make(map[distPeer]struct{}) + for _, peer := range peers { + m[peer] = struct{}{} + } + return m + }, stop) + + var wg sync.WaitGroup + + for i := 1; i <= testDistReqCount; i++ { + cost := uint64(rand.Int63n(testDistMaxCost)) + procTime := uint64(rand.Int63n(int64(cost + 1))) + rq := &testDistReq{ + cost: cost, + procTime: procTime, + order: uint64(i), + canSendTo: make(map[*testDistPeer]struct{}), + } + for _, peer := range peers { + if rand.Intn(2) != 0 { + rq.canSendTo[peer] = struct{}{} + } + } + + wg.Add(1) + req := &distReq{ + getCost: rq.getCost, + canSend: rq.canSend, + request: rq.request, + } + chn := dist.queue(req) + go func() { + cnt := 1 + if resend && len(rq.canSendTo) != 0 { + cnt = rand.Intn(testDistMaxResendCount) + 1 + } + for i := 0; i < cnt; i++ { + if i != 0 { + chn = dist.queue(req) + } + p := <-chn + if p == nil { + if len(rq.canSendTo) != 0 { + t.Errorf("Request that could have been sent was dropped") + } + } else { + peer := p.(*testDistPeer) + if _, ok := rq.canSendTo[peer]; !ok { + t.Errorf("Request sent to wrong peer") + } + } + } + wg.Done() + }() + if rand.Intn(1000) == 0 { + time.Sleep(time.Duration(rand.Intn(5000000))) + } + } + + wg.Wait() +} diff --git a/les/execqueue.go b/les/execqueue.go new file mode 100644 index 000000000..ac779003b --- /dev/null +++ b/les/execqueue.go @@ -0,0 +1,71 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package les + +import ( + "sync/atomic" +) + +// ExecQueue implements a queue that executes function calls in a single thread, +// in the same order as they have been queued. +type execQueue struct { + chn chan func() + cnt, stop, capacity int32 +} + +// NewExecQueue creates a new execution queue. +func newExecQueue(capacity int32) *execQueue { + q := &execQueue{ + chn: make(chan func(), capacity), + capacity: capacity, + } + go q.loop() + return q +} + +func (q *execQueue) loop() { + for f := range q.chn { + atomic.AddInt32(&q.cnt, -1) + if atomic.LoadInt32(&q.stop) != 0 { + return + } + f() + } +} + +// CanQueue returns true if more function calls can be added to the execution queue. +func (q *execQueue) canQueue() bool { + return atomic.LoadInt32(&q.stop) == 0 && atomic.LoadInt32(&q.cnt) < q.capacity +} + +// Queue adds a function call to the execution queue. Returns true if successful. +func (q *execQueue) queue(f func()) bool { + if atomic.LoadInt32(&q.stop) != 0 { + return false + } + if atomic.AddInt32(&q.cnt, 1) > q.capacity { + atomic.AddInt32(&q.cnt, -1) + return false + } + q.chn <- f + return true +} + +// Stop stops the exec queue. +func (q *execQueue) quit() { + atomic.StoreInt32(&q.stop, 1) +} diff --git a/les/fetcher.go b/les/fetcher.go index f9e517d25..353e91932 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -135,35 +135,38 @@ func (f *lightFetcher) syncLoop() { f.lock.Lock() s := requesting requesting = false + var ( + rq *distReq + reqID uint64 + ) if !f.syncing && !(newAnnounce && s) { - reqID := getNextReqID() - if peer, node, amount, retry := f.nextRequest(reqID); node != nil { - requesting = true - if reqID, ok := f.request(peer, reqID, node, amount); ok { - go func() { - time.Sleep(softRequestTimeout) - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - req.timeout = true - f.requested[reqID] = req - } - f.reqMu.Unlock() - // keep starting new requests while possible - f.requestChn <- false - }() - } - } else { - if retry { - requesting = true - go func() { - time.Sleep(time.Millisecond * 100) - f.requestChn <- false - }() - } - } + rq, reqID = f.nextRequest() } + syncing := f.syncing f.lock.Unlock() + + if rq != nil { + requesting = true + _, ok := <-f.pm.reqDist.queue(rq) + if !ok { + f.requestChn <- false + } + + if !syncing { + go func() { + time.Sleep(softRequestTimeout) + f.reqMu.Lock() + req, ok := f.requested[reqID] + if ok { + req.timeout = true + f.requested[reqID] = req + } + f.reqMu.Unlock() + // keep starting new requests while possible + f.requestChn <- false + }() + } + } case reqID := <-f.timeoutChn: f.reqMu.Lock() req, ok := f.requested[reqID] @@ -334,6 +337,12 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo f.lock.Lock() defer f.lock.Unlock() + if f.syncing { + // always return true when syncing + // false positives are acceptable, a more sophisticated condition can be implemented later + return true + } + fp := f.peers[p] if fp == nil || fp.root == nil { return false @@ -346,43 +355,13 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo f.chain.LockChain() defer f.chain.UnlockChain() // if it's older than the peer's block tree root but it's in the same canonical chain - // than the root, we can still be sure the peer knows it + // as the root, we can still be sure the peer knows it + // + // when syncing, just check if it is part of the known chain, there is nothing better we + // can do since we do not know the most recent block hash yet return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash } -// request initiates a header download request from a certain peer -func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) { - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Requesting from unknown peer") - p.fcServer.DeassignRequest(reqID) - return 0, false - } - if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) { - f.syncing = true - go func() { - p.Log().Debug("Synchronisation started") - f.pm.synchronise(p) - f.syncDone <- p - }() - p.fcServer.DeassignRequest(reqID) - return 0, false - } - - n.requested = true - cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) - p.fcServer.SendRequest(reqID, cost) - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()} - f.reqMu.Unlock() - go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true) - go func() { - time.Sleep(hardRequestTimeout) - f.timeoutChn <- reqID - }() - return reqID, true -} - // requestAmount calculates the amount of headers to be downloaded starting // from a certain head backwards func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 { @@ -408,12 +387,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // nextRequest selects the peer and announced head to be requested next, amount // to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) { +func (f *lightFetcher) nextRequest() (*distReq, uint64) { var ( bestHash common.Hash bestAmount uint64 ) bestTd := f.maxConfirmedTd + bestSyncing := false for p, fp := range f.peers { for hash, n := range fp.nodeByHash { @@ -423,29 +403,83 @@ func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint6 bestHash = hash bestAmount = amount bestTd = n.td + bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) } } } } if bestTd == f.maxConfirmedTd { - return nil, nil, 0, false - } - - peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) { - fp := f.peers[p] - if fp == nil || fp.nodeByHash[bestHash] == nil { - return false, 0 + return nil, 0 + } + + f.syncing = bestSyncing + + var rq *distReq + reqID := getNextReqID() + if f.syncing { + rq = &distReq{ + getCost: func(dp distPeer) uint64 { + return 0 + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + fp := f.peers[p] + return fp != nil && fp.nodeByHash[bestHash] != nil + }, + request: func(dp distPeer) func() { + go func() { + p := dp.(*peer) + p.Log().Debug("Synchronisation started") + f.pm.synchronise(p) + f.syncDone <- p + }() + return nil + }, + } + } else { + rq = &distReq{ + getCost: func(dp distPeer) uint64 { + p := dp.(*peer) + return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + f.lock.Lock() + defer f.lock.Unlock() + + fp := f.peers[p] + if fp == nil { + return false + } + n := fp.nodeByHash[bestHash] + return n != nil && !n.requested + }, + request: func(dp distPeer) func() { + p := dp.(*peer) + f.lock.Lock() + fp := f.peers[p] + if fp != nil { + n := fp.nodeByHash[bestHash] + if n != nil { + n.requested = true + } + } + f.lock.Unlock() + + cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + p.fcServer.QueueRequest(reqID, cost) + f.reqMu.Lock() + f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} + f.reqMu.Unlock() + go func() { + time.Sleep(hardRequestTimeout) + f.timeoutChn <- reqID + }() + return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } + }, } - return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) - }) - if !locked { - return nil, nil, 0, true - } - var node *fetcherTreeNode - if peer != nil { - node = f.peers[peer].nodeByHash[bestHash] } - return peer, node, bestAmount, false + return rq, reqID } // deliverHeaders delivers header download request responses for processing diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go index e45537cf5..e40e69346 100644 --- a/les/flowcontrol/control.go +++ b/les/flowcontrol/control.go @@ -94,14 +94,12 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { } type ServerNode struct { - bufEstimate uint64 - lastTime mclock.AbsTime - params *ServerParams - sumCost uint64 // sum of req costs sent to this server - pending map[uint64]uint64 // value = sumCost after sending the given req - assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer - assignToken chan struct{} // send to this channel before assigning, read from it after deassigning - lock sync.RWMutex + bufEstimate uint64 + lastTime mclock.AbsTime + params *ServerParams + sumCost uint64 // sum of req costs sent to this server + pending map[uint64]uint64 // value = sumCost after sending the given req + lock sync.RWMutex } func NewServerNode(params *ServerParams) *ServerNode { @@ -110,7 +108,6 @@ func NewServerNode(params *ServerParams) *ServerNode { lastTime: mclock.Now(), params: params, pending: make(map[uint64]uint64), - assignToken: make(chan struct{}, 1), } } @@ -127,94 +124,37 @@ func (peer *ServerNode) recalcBLE(time mclock.AbsTime) { } // safetyMargin is added to the flow control waiting time when estimated buffer value is low -const safetyMargin = time.Millisecond * 200 +const safetyMargin = time.Millisecond -func (peer *ServerNode) canSend(maxCost uint64) time.Duration { +func (peer *ServerNode) canSend(maxCost uint64) (time.Duration, float64) { + peer.recalcBLE(mclock.Now()) maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) if maxCost > peer.params.BufLimit { maxCost = peer.params.BufLimit } if peer.bufEstimate >= maxCost { - return 0 + return 0, float64(peer.bufEstimate-maxCost) / float64(peer.params.BufLimit) } - return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge) + return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge), 0 } // CanSend returns the minimum waiting time required before sending a request -// with the given maximum estimated cost -func (peer *ServerNode) CanSend(maxCost uint64) time.Duration { +// with the given maximum estimated cost. Second return value is the relative +// estimated buffer level after sending the request (divided by BufLimit). +func (peer *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) { peer.lock.RLock() defer peer.lock.RUnlock() return peer.canSend(maxCost) } -// AssignRequest tries to assign the server node to the given request, guaranteeing -// that once it returns true, no request will be sent to the node before this one -func (peer *ServerNode) AssignRequest(reqID uint64) bool { - select { - case peer.assignToken <- struct{}{}: - default: - return false - } - peer.lock.Lock() - peer.assignedRequest = reqID - peer.lock.Unlock() - return true -} - -// MustAssignRequest waits until the node can be assigned to the given request. -// It is always guaranteed that assignments are released in a short amount of time. -func (peer *ServerNode) MustAssignRequest(reqID uint64) { - peer.assignToken <- struct{}{} - peer.lock.Lock() - peer.assignedRequest = reqID - peer.lock.Unlock() -} - -// DeassignRequest releases a request assignment in case the planned request -// is not being sent. -func (peer *ServerNode) DeassignRequest(reqID uint64) { - peer.lock.Lock() - if peer.assignedRequest == reqID { - peer.assignedRequest = 0 - <-peer.assignToken - } - peer.lock.Unlock() -} - -// IsAssigned returns true if the server node has already been assigned to a request -// (note that this function returning false does not guarantee that you can assign a request -// immediately afterwards, its only purpose is to help peer selection) -func (peer *ServerNode) IsAssigned() bool { - peer.lock.RLock() - locked := peer.assignedRequest != 0 - peer.lock.RUnlock() - return locked -} - -// blocks until request can be sent -func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { +// QueueRequest should be called when the request has been assigned to the given +// server node, before putting it in the send queue. It is mandatory that requests +// are sent in the same order as the QueueRequest calls are made. +func (peer *ServerNode) QueueRequest(reqID, maxCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - if peer.assignedRequest != reqID { - peer.lock.Unlock() - peer.MustAssignRequest(reqID) - peer.lock.Lock() - } - - peer.recalcBLE(mclock.Now()) - wait := peer.canSend(maxCost) - for wait > 0 { - peer.lock.Unlock() - time.Sleep(wait) - peer.lock.Lock() - peer.recalcBLE(mclock.Now()) - wait = peer.canSend(maxCost) - } - peer.assignedRequest = 0 - <-peer.assignToken peer.bufEstimate -= maxCost peer.sumCost += maxCost if reqID >= 0 { @@ -222,6 +162,8 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { } } +// GotReply adjusts estimated buffer value according to the value included in +// the latest request reply. func (peer *ServerNode) GotReply(reqID, bv uint64) { peer.lock.Lock() @@ -235,6 +177,10 @@ func (peer *ServerNode) GotReply(reqID, bv uint64) { return } delete(peer.pending, reqID) - peer.bufEstimate = bv - (peer.sumCost - sc) + cc := peer.sumCost - sc + peer.bufEstimate = 0 + if bv > cc { + peer.bufEstimate = bv - cc + } peer.lastTime = mclock.Now() } diff --git a/les/handler.go b/les/handler.go index 4271da8b8..ece2060ee 100644 --- a/les/handler.go +++ b/les/handler.go @@ -102,6 +102,7 @@ type ProtocolManager struct { odr *LesOdr server *LesServer serverPool *serverPool + reqDist *requestDistributor downloader *downloader.Downloader fetcher *lightFetcher @@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) } + manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} { + m := make(map[distPeer]struct{}) + peers := manager.peers.AllPeers() + for _, peer := range peers { + m[peer] = struct{}{} + } + return m + }, manager.quitSync) if odr != nil { odr.removePeer = removePeer + odr.reqDist = manager.reqDist } /*validator := func(block *types.Block, parent *types.Block) error { @@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error { if pm.lightSync { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { reqID := getNextReqID() - cost := p.GetRequestCost(GetBlockHeadersMsg, amount) - p.fcServer.MustAssignRequest(reqID) - p.fcServer.SendRequest(reqID, cost) - return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { reqID := getNextReqID() - cost := p.GetRequestCost(GetBlockHeadersMsg, amount) - p.fcServer.MustAssignRequest(reqID) - p.fcServer.SendRequest(reqID, cost) - return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil } if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { @@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } if deliverMsg != nil { - return pm.odr.Deliver(p, deliverMsg) + err := pm.odr.Deliver(p, deliverMsg) + if err != nil { + p.responseErrors++ + if p.responseErrors > maxResponseErrors { + return err + } + } } return nil } diff --git a/les/helper_test.go b/les/helper_test.go index f6293ad1a..3e8ce57b6 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -352,11 +352,15 @@ func (p *testServerPool) setPeer(peer *peer) { p.peer = peer } -func (p *testServerPool) selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer { +func (p *testServerPool) getAllPeers() map[distPeer]struct{} { p.lock.RLock() defer p.lock.RUnlock() - return p.peer + m := make(map[distPeer]struct{}) + if p.peer != nil { + m[p.peer] = struct{}{} + } + return m } func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) { diff --git a/les/odr.go b/les/odr.go index afc894ab5..06b44d318 100644 --- a/les/odr.go +++ b/les/odr.go @@ -32,14 +32,12 @@ import ( var ( softRequestTimeout = time.Millisecond * 500 hardRequestTimeout = time.Second * 10 - retryPeers = time.Second * 1 ) // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) type odrPeerSelector interface { - selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer adjustResponseTime(*poolEntry, time.Duration, bool) } @@ -51,6 +49,7 @@ type LesOdr struct { mlock, clock sync.Mutex sentReqs map[uint64]*sentReq serverPool odrPeerSelector + reqDist *requestDistributor } func NewLesOdr(db ethdb.Database) *LesOdr { @@ -165,69 +164,81 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error { answered := make(chan struct{}) req := &sentReq{ - valFunc: lreq.Valid, + valFunc: lreq.Validate, sentTo: make(map[*peer]chan struct{}), answered: answered, // reply delivered by any peer } - reqID := getNextReqID() - self.mlock.Lock() - self.sentReqs[reqID] = req - self.mlock.Unlock() + + exclude := make(map[*peer]struct{}) reqWg := new(sync.WaitGroup) reqWg.Add(1) defer reqWg.Done() - go func() { - reqWg.Wait() - self.mlock.Lock() - delete(self.sentReqs, reqID) - self.mlock.Unlock() - }() - exclude := make(map[*peer]struct{}) - for { - var p *peer - if self.serverPool != nil { - p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) { - if _, ok := exclude[p]; ok || !lreq.CanSend(p) { - return false, 0 - } - return true, p.fcServer.CanSend(lreq.GetCost(p)) - }, ctx.Done()) - } - if p == nil { - select { - case <-ctx.Done(): - return ctx.Err() - case <-req.answered: - return nil - case <-time.After(retryPeers): - } - } else { + var timeout chan struct{} + reqID := getNextReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + return lreq.GetCost(dp.(*peer)) + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + _, ok := exclude[p] + return !ok && lreq.CanSend(p) + }, + request: func(dp distPeer) func() { + p := dp.(*peer) exclude[p] = struct{}{} delivered := make(chan struct{}) - timeout := make(chan struct{}) + timeout = make(chan struct{}) req.lock.Lock() req.sentTo[p] = delivered req.lock.Unlock() reqWg.Add(1) cost := lreq.GetCost(p) - p.fcServer.SendRequest(reqID, cost) + p.fcServer.QueueRequest(reqID, cost) go self.requestPeer(req, p, delivered, timeout, reqWg) - lreq.Request(reqID, p) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-answered: - return nil - case <-timeout: + return func() { lreq.Request(reqID, p) } + }, + } + + self.mlock.Lock() + self.sentReqs[reqID] = req + self.mlock.Unlock() + + go func() { + reqWg.Wait() + self.mlock.Lock() + delete(self.sentReqs, reqID) + self.mlock.Unlock() + }() + + for { + peerChn := self.reqDist.queue(rq) + select { + case <-ctx.Done(): + self.reqDist.cancel(rq) + return ctx.Err() + case <-answered: + self.reqDist.cancel(rq) + return nil + case _, ok := <-peerChn: + if !ok { + return ErrNoPeers } } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-answered: + return nil + case <-timeout: + } } } -// Retrieve tries to fetch an object from the local db, then from the LES network. +// Retrieve tries to fetch an object from the LES network. // If the network retrieval was successful, it stores the object in local db. func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { lreq := LesRequest(req) diff --git a/les/odr_requests.go b/les/odr_requests.go index 53aced93c..1f853b341 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -49,7 +49,7 @@ type LesOdrRequest interface { GetCost(*peer) uint64 CanSend(*peer) bool Request(uint64, *peer) error - Valid(ethdb.Database, *Msg) error // if true, keeps the retrieved object + Validate(ethdb.Database, *Msg) error } func LesRequest(req light.OdrRequest) LesOdrRequest { @@ -92,7 +92,7 @@ func (r *BlockRequest) Request(reqID uint64, peer *peer) error { // Valid processes an ODR request reply message from the LES network // returns true and stores results in memory if the message was a valid reply // to the request (implementation of LesOdrRequest) -func (r *BlockRequest) Valid(db ethdb.Database, msg *Msg) error { +func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error { log.Debug("Validating block body", "hash", r.Hash) // Ensure we have a correct message with a single block body @@ -148,7 +148,7 @@ func (r *ReceiptsRequest) Request(reqID uint64, peer *peer) error { // Valid processes an ODR request reply message from the LES network // returns true and stores results in memory if the message was a valid reply // to the request (implementation of LesOdrRequest) -func (r *ReceiptsRequest) Valid(db ethdb.Database, msg *Msg) error { +func (r *ReceiptsRequest) Validate(db ethdb.Database, msg *Msg) error { log.Debug("Validating block receipts", "hash", r.Hash) // Ensure we have a correct message with a single block receipt @@ -208,7 +208,7 @@ func (r *TrieRequest) Request(reqID uint64, peer *peer) error { // Valid processes an ODR request reply message from the LES network // returns true and stores results in memory if the message was a valid reply // to the request (implementation of LesOdrRequest) -func (r *TrieRequest) Valid(db ethdb.Database, msg *Msg) error { +func (r *TrieRequest) Validate(db ethdb.Database, msg *Msg) error { log.Debug("Validating trie proof", "root", r.Id.Root, "key", r.Key) // Ensure we have a correct message with a single proof @@ -259,7 +259,7 @@ func (r *CodeRequest) Request(reqID uint64, peer *peer) error { // Valid processes an ODR request reply message from the LES network // returns true and stores results in memory if the message was a valid reply // to the request (implementation of LesOdrRequest) -func (r *CodeRequest) Valid(db ethdb.Database, msg *Msg) error { +func (r *CodeRequest) Validate(db ethdb.Database, msg *Msg) error { log.Debug("Validating code data", "hash", r.Hash) // Ensure we have a correct message with a single code element @@ -319,7 +319,7 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error { // Valid processes an ODR request reply message from the LES network // returns true and stores results in memory if the message was a valid reply // to the request (implementation of LesOdrRequest) -func (r *ChtRequest) Valid(db ethdb.Database, msg *Msg) error { +func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error { log.Debug("Validating CHT", "cht", r.ChtNum, "block", r.BlockNum) // Ensure we have a correct message with a single proof element diff --git a/les/odr_test.go b/les/odr_test.go index 4f1fccb24..1b436b8e6 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -162,8 +162,11 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) pool := &testServerPool{} + lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync) + odr.reqDist = lpm.reqDist pool.setPeer(lpeer) odr.serverPool = pool + lpeer.hasBlock = func(common.Hash, uint64) bool { return true } select { case <-time.After(time.Millisecond * 100): case err := <-err1: diff --git a/les/peer.go b/les/peer.go index ef5f8a6ce..4793da296 100644 --- a/les/peer.go +++ b/les/peer.go @@ -22,6 +22,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -37,7 +38,10 @@ var ( errNotRegistered = errors.New("peer is not registered") ) -const maxHeadInfoLen = 20 +const ( + maxHeadInfoLen = 20 + maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) +) type peer struct { *p2p.Peer @@ -53,9 +57,11 @@ type peer struct { lock sync.RWMutex announceChn chan announceData + sendQueue *execQueue - poolEntry *poolEntry - hasBlock func(common.Hash, uint64) bool + poolEntry *poolEntry + hasBlock func(common.Hash, uint64) bool + responseErrors int fcClient *flowcontrol.ClientNode // nil if the peer is server only fcServer *flowcontrol.ServerNode // nil if the peer is client only @@ -76,6 +82,14 @@ func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { } } +func (p *peer) canQueue() bool { + return p.sendQueue.canQueue() +} + +func (p *peer) queueSend(f func()) { + p.sendQueue.queue(f) +} + // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *eth.PeerInfo { return ð.PeerInfo{ @@ -117,6 +131,11 @@ func (p *peer) Td() *big.Int { return new(big.Int).Set(p.headInfo.Td) } +// waitBefore implements distPeer interface +func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) { + return p.fcServer.CanSend(maxCost) +} + func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error { type req struct { ReqID uint64 @@ -237,11 +256,8 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error { return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs) } -func (p *peer) SendTxs(cost uint64, txs types.Transactions) error { +func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { p.Log().Debug("Fetching batch of transactions", "count", len(txs)) - reqID := getNextReqID() - p.fcServer.MustAssignRequest(reqID) - p.fcServer.SendRequest(reqID, cost) return p2p.Send(p.rw, SendTxMsg, txs) } @@ -444,6 +460,7 @@ func (ps *peerSet) Register(p *peer) error { return errAlreadyRegistered } ps.peers[p.id] = p + p.sendQueue = newExecQueue(100) return nil } @@ -453,8 +470,10 @@ func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() defer ps.lock.Unlock() - if _, ok := ps.peers[id]; !ok { + if p, ok := ps.peers[id]; !ok { return errNotRegistered + } else { + p.sendQueue.quit() } delete(ps.peers, id) return nil diff --git a/les/request_test.go b/les/request_test.go index 10e9edf8b..bec6bf1bc 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -72,8 +72,11 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) pool := &testServerPool{} + lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync) + odr.reqDist = lpm.reqDist pool.setPeer(lpeer) odr.serverPool = pool + lpeer.hasBlock = func(common.Hash, uint64) bool { return true } select { case <-time.After(time.Millisecond * 100): case err := <-err1: diff --git a/les/serverpool.go b/les/serverpool.go index 55d481dbf..64fe991c6 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -268,82 +268,6 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, } } -type selectPeerItem struct { - peer *peer - weight int64 - wait time.Duration -} - -func (sp selectPeerItem) Weight() int64 { - return sp.weight -} - -// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request -// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed -// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time. -func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) { - pool.lock.Lock() - type selectPeer struct { - peer *peer - rstat, tstat float64 - } - var list []selectPeer - sel := newWeightedRandomSelect() - for _, entry := range pool.entries { - if entry.state == psRegistered { - if !entry.peer.fcServer.IsAssigned() { - list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()}) - } - } - } - pool.lock.Unlock() - - for _, sp := range list { - ok, wait := canSend(sp.peer) - if ok { - w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow))) - sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait}) - } - } - choice := sel.choose() - if choice == nil { - return nil, 0, false - } - peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait - locked := false - if wait < time.Millisecond*100 { - if peer.fcServer.AssignRequest(reqID) { - ok, w := canSend(peer) - wait = time.Duration(w) - if ok && wait < time.Millisecond*100 { - locked = true - } else { - peer.fcServer.DeassignRequest(reqID) - wait = time.Millisecond * 100 - } - } - } else { - wait = time.Millisecond * 100 - } - return peer, wait, locked -} - -// selectPeer selects a suitable peer for a request, waiting until an assignment to -// the request is guaranteed or the process is aborted. -func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer { - for { - peer, wait, locked := pool.selectPeer(reqID, canSend) - if locked { - return peer - } - select { - case <-abort: - return nil - case <-time.After(wait): - } - } -} - // eventLoop handles pool events and mutex locking for all internal functions func (pool *serverPool) eventLoop() { lookupCnt := 0 diff --git a/les/txrelay.go b/les/txrelay.go index 76d416c57..1ca3467e4 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -35,13 +35,14 @@ type LesTxRelay struct { peerList []*peer peerStartPos int lock sync.RWMutex + + reqDist *requestDistributor } func NewLesTxRelay() *LesTxRelay { return &LesTxRelay{ txSent: make(map[common.Hash]*ltrInfo), txPending: make(map[common.Hash]struct{}), - ps: newPeerSet(), } } @@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { } for p, list := range sendTo { - cost := p.GetRequestCost(SendTxMsg, len(list)) - go func(p *peer, list types.Transactions, cost uint64) { - p.SendTxs(cost, list) - }(p, list, cost) + pp := p + ll := list + + reqID := getNextReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(SendTxMsg, len(ll)) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == pp + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(SendTxMsg, len(ll)) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.SendTxs(reqID, cost, ll) } + }, + } + self.reqDist.queue(rq) } } |