1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
|
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package network
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/tracing"
olog "github.com/opentracing/opentracing-go/log"
)
const (
defaultSearchTimeout = 1 * time.Second
// maximum number of forwarded requests (hops), to make sure requests are not
// forwarded forever in peer loops
maxHopCount uint8 = 20
)
// Time to consider peer to be skipped.
// Also used in stream delivery.
var RequestTimeout = 10 * time.Second
type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)
// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
// keeps it alive until all active requests are completed. This can happen:
// 1. either because the chunk is delivered
// 2. or because the requester cancelled/timed out
// Fetcher self destroys itself after it is completed.
// TODO: cancel all forward requests after termination
type Fetcher struct {
protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
addr storage.Address // the address of the chunk to be fetched
offerC chan *enode.ID // channel of sources (peer node id strings)
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration
skipCheck bool
ctx context.Context
}
type Request struct {
Addr storage.Address // chunk address
Source *enode.ID // nodeID of peer to request from (can be nil)
SkipCheck bool // whether to offer the chunk first or deliver directly
peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil)
HopCount uint8 // number of forwarded requests (hops)
}
// NewRequest returns a new instance of Request based on chunk address skip check and
// a map of peers to skip.
func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request {
return &Request{
Addr: addr,
SkipCheck: skipCheck,
peersToSkip: peersToSkip,
}
}
// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk.
// Peers to skip are kept per Request and for a time period of RequestTimeout.
// This function is used in stream package in Delivery.RequestFromPeers to optimize
// requests for chunks.
func (r *Request) SkipPeer(nodeID string) bool {
val, ok := r.peersToSkip.Load(nodeID)
if !ok {
return false
}
t, ok := val.(time.Time)
if ok && time.Now().After(t.Add(RequestTimeout)) {
// deadline expired
r.peersToSkip.Delete(nodeID)
return false
}
return true
}
// FetcherFactory is initialised with a request function and can create fetchers
type FetcherFactory struct {
request RequestFunc
skipCheck bool
}
// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory
func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
return &FetcherFactory{
request: request,
skipCheck: skipCheck,
}
}
// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip
// are not requested to deliver the given chunk. peersToSkip should always
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
go fetcher.run(peers)
return fetcher
}
// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{
addr: addr,
protoRequestFunc: rf,
offerC: make(chan *enode.ID),
requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
ctx: ctx,
}
}
// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done
select {
case <-f.ctx.Done():
return
default:
}
// This select alone would not guarantee that we return of context is done, it could potentially
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.offerC <- source:
case <-f.ctx.Done():
}
}
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
case <-f.ctx.Done():
return
default:
}
if hopCount >= maxHopCount {
log.Debug("fetcher request hop count limit reached", "hops", hopCount)
return
}
// This select alone would not guarantee that we return of context is done, it could potentially
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.requestC <- hopCount + 1:
case <-f.ctx.Done():
}
}
// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
func (f *Fetcher) run(peers *sync.Map) {
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
waitC <-chan time.Time // timer channel
sources []*enode.ID // known sources, ie. peers that offered the chunk
requested bool // true if the chunk was actually requested
hopCount uint8
)
gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected
// loop that keeps the fetching process alive
// after every request a timer is set. If this goes off we request again from another peer
// note that the previous request is still alive and has the chance to deliver, so
// requesting again extends the search. ie.,
// if a peer we requested from is gone we issue a new request, so the number of active
// requests never decreases
for {
select {
// incoming offer
case source := <-f.offerC:
log.Trace("new source", "peer addr", source, "request addr", f.addr)
// 1) the chunk is offered by a syncing peer
// add to known sources
sources = append(sources, source)
// launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer)
doRequest = requested
// incoming request
case hopCount = <-f.requestC:
log.Trace("new request", "request addr", f.addr)
// 2) chunk is requested, set requested flag
// launch a request iff none been launched yet
doRequest = !requested
requested = true
// peer we requested from is gone. fall back to another
// and remove the peer from the peers map
case id := <-gone:
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
peers.Delete(id.String())
doRequest = requested
// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case <-waitC:
log.Trace("search timed out: requesting", "request addr", f.addr)
doRequest = requested
// all Fetcher context closed, can quit
case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
}
// need to issue a new request
if doRequest {
var err error
sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
}
// if wait channel is not set, set it to a timer
if requested {
if wait == nil {
wait = time.NewTimer(f.searchTimeout)
defer wait.Stop()
waitC = wait.C
} else {
// stop the timer and drain the channel if it was not drained earlier
if !wait.Stop() {
select {
case <-wait.C:
default:
}
}
// reset the timer to go off after defaultSearchTimeout
wait.Reset(f.searchTimeout)
}
}
doRequest = false
}
}
// doRequest attempts at finding a peer to request the chunk from
// * first it tries to request explicitly from peers that are known to have offered the chunk
// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address
// excluding those in the peersToSkip map
// * if no such peer is found an error is returned
//
// if a request is successful,
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
req := &Request{
Addr: f.addr,
SkipCheck: f.skipCheck,
peersToSkip: peersToSkip,
HopCount: hopCount,
}
foundSource := false
// iterate over known sources
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
sources = append(sources[:i], sources[i+1:]...)
foundSource = true
break
}
}
// if there are no known sources, or none available, we try request from a closest node
if !foundSource {
req.Source = nil
var err error
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil {
// if no peers found to request from
return sources, err
}
}
// add peer to the set of peers to skip from now
peersToSkip.Store(sourceID.String(), time.Now())
// if the quit channel is closed, it indicates that the source peer we requested from
// disconnected or terminated its streamer
// here start a go routine that watches this channel and reports the source peer on the gone channel
// this go routine quits if the fetcher global context is done to prevent process leak
go func() {
select {
case <-quit:
gone <- sourceID
case <-f.ctx.Done():
}
// finish the request span
spanId := fmt.Sprintf("stream.send.request.%v.%v", *sourceID, req.Addr)
span := tracing.ShiftSpanByKey(spanId)
if span != nil {
span.LogFields(olog.String("finish", "from doRequest"))
span.Finish()
}
}()
return sources, nil
}
|