aboutsummaryrefslogtreecommitdiffstats
path: root/les/distributor.go
blob: f90765b624b40b192f9088bac4dc2b3d1325e900 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les

import (
    "container/list"
    "sync"
    "time"
)

// requestDistributor implements a mechanism that distributes requests to
// suitable peers, obeying flow control rules and prioritizing them in creation
// order (even when a resend is necessary).
type requestDistributor struct {
    reqQueue         *list.List
    lastReqOrder     uint64
    peers            map[distPeer]struct{}
    peerLock         sync.RWMutex
    stopChn, loopChn chan struct{}
    loopNextSent     bool
    lock             sync.Mutex
}

// distPeer is an LES server peer interface for the request distributor.
// waitBefore returns either the necessary waiting time before sending a request
// with the given upper estimated cost or the estimated remaining relative buffer
// value after sending such a request (in which case the request can be sent
// immediately). At least one of these values is always zero.
type distPeer interface {
    waitBefore(uint64) (time.Duration, float64)
    canQueue() bool
    queueSend(f func())
}

// distReq is the request abstraction used by the distributor. It is based on
// three callback functions:
// - getCost returns the upper estimate of the cost of sending the request to a given peer
// - canSend tells if the server peer is suitable to serve the request
// - request prepares sending the request to the given peer and returns a function that
// does the actual sending. Request order should be preserved but the callback itself should not
// block until it is sent because other peers might still be able to receive requests while
// one of them is blocking. Instead, the returned function is put in the peer's send queue.
type distReq struct {
    getCost func(distPeer) uint64
    canSend func(distPeer) bool
    request func(distPeer) func()

    reqOrder uint64
    sentChn  chan distPeer
    element  *list.Element
}

// newRequestDistributor creates a new request distributor
func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor {
    d := &requestDistributor{
        reqQueue: list.New(),
        loopChn:  make(chan struct{}, 2),
        stopChn:  stopChn,
        peers:    make(map[distPeer]struct{}),
    }
    if peers != nil {
        peers.notify(d)
    }
    go d.loop()
    return d
}

// registerPeer implements peerSetNotify
func (d *requestDistributor) registerPeer(p *peer) {
    d.peerLock.Lock()
    d.peers[p] = struct{}{}
    d.peerLock.Unlock()
}

// unregisterPeer implements peerSetNotify
func (d *requestDistributor) unregisterPeer(p *peer) {
    d.peerLock.Lock()
    delete(d.peers, p)
    d.peerLock.Unlock()
}

// registerTestPeer adds a new test peer
func (d *requestDistributor) registerTestPeer(p distPeer) {
    d.peerLock.Lock()
    d.peers[p] = struct{}{}
    d.peerLock.Unlock()
}

// distMaxWait is the maximum waiting time after which further necessary waiting
// times are recalculated based on new feedback from the servers
const distMaxWait = time.Millisecond * 10

// main event loop
func (d *requestDistributor) loop() {
    for {
        select {
        case <-d.stopChn:
            d.lock.Lock()
            elem := d.reqQueue.Front()
            for elem != nil {
                req := elem.Value.(*distReq)
                close(req.sentChn)
                req.sentChn = nil
                elem = elem.Next()
            }
            d.lock.Unlock()
            return
        case <-d.loopChn:
            d.lock.Lock()
            d.loopNextSent = false
        loop:
            for {
                peer, req, wait := d.nextRequest()
                if req != nil && wait == 0 {
                    chn := req.sentChn // save sentChn because remove sets it to nil
                    d.remove(req)
                    send := req.request(peer)
                    if send != nil {
                        peer.queueSend(send)
                    }
                    chn <- peer
                    close(chn)
                } else {
                    if wait == 0 {
                        // no request to send and nothing to wait for; the next
                        // queued request will wake up the loop
                        break loop
                    }
                    d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
                    if wait > distMaxWait {
                        // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
                        wait = distMaxWait
                    }
                    go func() {
                        time.Sleep(wait)
                        d.loopChn <- struct{}{}
                    }()
                    break loop
                }
            }
            d.lock.Unlock()
        }
    }
}

// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
type selectPeerItem struct {
    peer   distPeer
    req    *distReq
    weight int64
}

// Weight implements wrsItem interface
func (sp selectPeerItem) Weight() int64 {
    return sp.weight
}

// nextRequest returns the next possible request from any peer, along with the
// associated peer and necessary waiting time
func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
    checkedPeers := make(map[distPeer]struct{})
    elem := d.reqQueue.Front()
    var (
        bestPeer distPeer
        bestReq  *distReq
        bestWait time.Duration
        sel      *weightedRandomSelect
    )

    d.peerLock.RLock()
    defer d.peerLock.RUnlock()

    for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
        req := elem.Value.(*distReq)
        canSend := false
        for peer := range d.peers {
            if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
                canSend = true
                cost := req.getCost(peer)
                wait, bufRemain := peer.waitBefore(cost)
                if wait == 0 {
                    if sel == nil {
                        sel = newWeightedRandomSelect()
                    }
                    sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
                } else {
                    if bestReq == nil || wait < bestWait {
                        bestPeer = peer
                        bestReq = req
                        bestWait = wait
                    }
                }
                checkedPeers[peer] = struct{}{}
            }
        }
        next := elem.Next()
        if !canSend && elem == d.reqQueue.Front() {
            close(req.sentChn)
            d.remove(req)
        }
        elem = next
    }

    if sel != nil {
        c := sel.choose().(selectPeerItem)
        return c.peer, c.req, 0
    }
    return bestPeer, bestReq, bestWait
}

// queue adds a request to the distribution queue, returns a channel where the
// receiving peer is sent once the request has been sent (request callback returned).
// If the request is cancelled or timed out without suitable peers, the channel is
// closed without sending any peer references to it.
func (d *requestDistributor) queue(r *distReq) chan distPeer {
    d.lock.Lock()
    defer d.lock.Unlock()

    if r.reqOrder == 0 {
        d.lastReqOrder++
        r.reqOrder = d.lastReqOrder
    }

    back := d.reqQueue.Back()
    if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
        r.element = d.reqQueue.PushBack(r)
    } else {
        before := d.reqQueue.Front()
        for before.Value.(*distReq).reqOrder < r.reqOrder {
            before = before.Next()
        }
        r.element = d.reqQueue.InsertBefore(r, before)
    }

    if !d.loopNextSent {
        d.loopNextSent = true
        d.loopChn <- struct{}{}
    }

    r.sentChn = make(chan distPeer, 1)
    return r.sentChn
}

// cancel removes a request from the queue if it has not been sent yet (returns
// false if it has been sent already). It is guaranteed that the callback functions
// will not be called after cancel returns.
func (d *requestDistributor) cancel(r *distReq) bool {
    d.lock.Lock()
    defer d.lock.Unlock()

    if r.sentChn == nil {
        return false
    }

    close(r.sentChn)
    d.remove(r)
    return true
}

// remove removes a request from the queue
func (d *requestDistributor) remove(r *distReq) {
    r.sentChn = nil
    if r.element != nil {
        d.reqQueue.Remove(r.element)
        r.element = nil
    }
}