aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/fetcher.go
blob: 35e2f01328efe92244a6ef6019f9fdb21907f0e7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package network

import (
    "context"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/p2p/discover"
    "github.com/ethereum/go-ethereum/swarm/storage"
)

var searchTimeout = 1 * time.Second

// Time to consider peer to be skipped.
// Also used in stream delivery.
var RequestTimeout = 10 * time.Second

type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error)

// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
// keeps it alive until all active requests are completed. This can happen:
//     1. either because the chunk is delivered
//     2. or becuse the requestor cancelled/timed out
// Fetcher self destroys itself after it is completed.
// TODO: cancel all forward requests after termination
type Fetcher struct {
    protoRequestFunc RequestFunc           // request function fetcher calls to issue retrieve request for a chunk
    addr             storage.Address       // the address of the chunk to be fetched
    offerC           chan *discover.NodeID // channel of sources (peer node id strings)
    requestC         chan struct{}
    skipCheck        bool
}

type Request struct {
    Addr        storage.Address  // chunk address
    Source      *discover.NodeID // nodeID of peer to request from (can be nil)
    SkipCheck   bool             // whether to offer the chunk first or deliver directly
    peersToSkip *sync.Map        // peers not to request chunk from (only makes sense if source is nil)
}

// NewRequest returns a new instance of Request based on chunk address skip check and
// a map of peers to skip.
func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request {
    return &Request{
        Addr:        addr,
        SkipCheck:   skipCheck,
        peersToSkip: peersToSkip,
    }
}

// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk.
// Peers to skip are kept per Request and for a time period of RequestTimeout.
// This function is used in stream package in Delivery.RequestFromPeers to optimize
// requests for chunks.
func (r *Request) SkipPeer(nodeID string) bool {
    val, ok := r.peersToSkip.Load(nodeID)
    if !ok {
        return false
    }
    t, ok := val.(time.Time)
    if ok && time.Now().After(t.Add(RequestTimeout)) {
        // deadine expired
        r.peersToSkip.Delete(nodeID)
        return false
    }
    return true
}

// FetcherFactory is initialised with a request function and can create fetchers
type FetcherFactory struct {
    request   RequestFunc
    skipCheck bool
}

// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory
func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
    return &FetcherFactory{
        request:   request,
        skipCheck: skipCheck,
    }
}

// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
// this chunk, to make sure we don't request back the chunks from them.
// The created Fetcher is started and returned.
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
    fetcher := NewFetcher(source, f.request, f.skipCheck)
    go fetcher.run(ctx, peersToSkip)
    return fetcher
}

// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
    return &Fetcher{
        addr:             addr,
        protoRequestFunc: rf,
        offerC:           make(chan *discover.NodeID),
        requestC:         make(chan struct{}),
        skipCheck:        skipCheck,
    }
}

// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) {
    // First we need to have this select to make sure that we return if context is done
    select {
    case <-ctx.Done():
        return
    default:
    }

    // This select alone would not guarantee that we return of context is done, it could potentially
    // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
    select {
    case f.offerC <- source:
    case <-ctx.Done():
    }
}

// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
func (f *Fetcher) Request(ctx context.Context) {
    // First we need to have this select to make sure that we return if context is done
    select {
    case <-ctx.Done():
        return
    default:
    }

    // This select alone would not guarantee that we return of context is done, it could potentially
    // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
    select {
    case f.requestC <- struct{}{}:
    case <-ctx.Done():
    }
}

// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
    var (
        doRequest bool               // determines if retrieval is initiated in the current iteration
        wait      *time.Timer        // timer for search timeout
        waitC     <-chan time.Time   // timer channel
        sources   []*discover.NodeID // known sources, ie. peers that offered the chunk
        requested bool               // true if the chunk was actually requested
    )
    gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected

    // loop that keeps the fetching process alive
    // after every request a timer is set. If this goes off we request again from another peer
    // note that the previous request is still alive and has the chance to deliver, so
    // rerequesting extends the search. ie.,
    // if a peer we requested from is gone we issue a new request, so the number of active
    // requests never decreases
    for {
        select {

        // incoming offer
        case source := <-f.offerC:
            log.Trace("new source", "peer addr", source, "request addr", f.addr)
            // 1) the chunk is offered by a syncing peer
            // add to known sources
            sources = append(sources, source)
            // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer)
            doRequest = requested

        // incoming request
        case <-f.requestC:
            log.Trace("new request", "request addr", f.addr)
            // 2) chunk is requested, set requested flag
            // launch a request iff none been launched yet
            doRequest = !requested
            requested = true

            // peer we requested from is gone. fall back to another
            // and remove the peer from the peers map
        case id := <-gone:
            log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
            peers.Delete(id.String())
            doRequest = requested

        // search timeout: too much time passed since the last request,
        // extend the search to a new peer if we can find one
        case <-waitC:
            log.Trace("search timed out: rerequesting", "request addr", f.addr)
            doRequest = requested

            // all Fetcher context closed, can quit
        case <-ctx.Done():
            log.Trace("terminate fetcher", "request addr", f.addr)
            // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
            return
        }

        // need to issue a new request
        if doRequest {
            var err error
            sources, err = f.doRequest(ctx, gone, peers, sources)
            if err != nil {
                log.Warn("unable to request", "request addr", f.addr, "err", err)
            }
        }

        // if wait channel is not set, set it to a timer
        if requested {
            if wait == nil {
                wait = time.NewTimer(searchTimeout)
                defer wait.Stop()
                waitC = wait.C
            } else {
                // stop the timer and drain the channel if it was not drained earlier
                if !wait.Stop() {
                    select {
                    case <-wait.C:
                    default:
                    }
                }
                // reset the timer to go off after searchTimeout
                wait.Reset(searchTimeout)
            }
        }
        doRequest = false
    }
}

// doRequest attempts at finding a peer to request the chunk from
// * first it tries to request explicitly from peers that are known to have offered the chunk
// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address
//   excluding those in the peersToSkip map
// * if no such peer is found an error is returned
//
// if a request is successful,
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) {
    var i int
    var sourceID *discover.NodeID
    var quit chan struct{}

    req := &Request{
        Addr:        f.addr,
        SkipCheck:   f.skipCheck,
        peersToSkip: peersToSkip,
    }

    foundSource := false
    // iterate over known sources
    for i = 0; i < len(sources); i++ {
        req.Source = sources[i]
        var err error
        sourceID, quit, err = f.protoRequestFunc(ctx, req)
        if err == nil {
            // remove the peer from known sources
            // Note: we can modify the source although we are looping on it, because we break from the loop immediately
            sources = append(sources[:i], sources[i+1:]...)
            foundSource = true
            break
        }
    }

    // if there are no known sources, or none available, we try request from a closest node
    if !foundSource {
        req.Source = nil
        var err error
        sourceID, quit, err = f.protoRequestFunc(ctx, req)
        if err != nil {
            // if no peers found to request from
            return sources, err
        }
    }
    // add peer to the set of peers to skip from now
    peersToSkip.Store(sourceID.String(), time.Now())

    // if the quit channel is closed, it indicates that the source peer we requested from
    // disconnected or terminated its streamer
    // here start a go routine that watches this channel and reports the source peer on the gone channel
    // this go routine quits if the fetcher global context is done to prevent process leak
    go func() {
        select {
        case <-quit:
            gone <- sourceID
        case <-ctx.Done():
        }
    }()
    return sources, nil
}