aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
blob: 5fc0db5876c04fec560c5e6a1b9a6b3dede66292 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Contains the active peer-set of the downloader, maintaining both failures
// as well as reputation metrics to prioritize the block retrievals.

package downloader

import (
    "errors"
    "fmt"
    "math"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "gopkg.in/fatih/set.v0"
)

// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error

// Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error

var (
    errAlreadyFetching   = errors.New("already fetching blocks from peer")
    errAlreadyRegistered = errors.New("peer is already registered")
    errNotRegistered     = errors.New("peer is not registered")
)

// peer represents an active peer from which hashes and blocks are retrieved.
type peer struct {
    id   string      // Unique identifier of the peer
    head common.Hash // Hash of the peers latest known block

    blockIdle   int32 // Current block activity state of the peer (idle = 0, active = 1)
    receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
    rep         int32 // Simple peer reputation

    blockCapacity   int32     // Number of blocks (bodies) allowed to fetch per request
    receiptCapacity int32     // Number of receipts allowed to fetch per request
    blockStarted    time.Time // Time instance when the last block (body)fetch was started
    receiptStarted  time.Time // Time instance when the last receipt fetch was started

    ignored *set.Set // Set of hashes not to request (didn't have previously)

    getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
    getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
    getBlocks    blockFetcherFn        // [eth/61] Method to retrieve a batch of blocks

    getRelHeaders  relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
    getAbsHeaders  absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
    getBlockBodies blockBodyFetcherFn      // [eth/62] Method to retrieve a batch of block bodies

    getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts

    version int // Eth protocol version number to switch strategies
}

// newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms.
func newPeer(id string, version int, head common.Hash,
    getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
    getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
    getReceipts receiptFetcherFn) *peer {
    return &peer{
        id:              id,
        head:            head,
        blockCapacity:   1,
        receiptCapacity: 1,
        ignored:         set.New(),

        getRelHashes: getRelHashes,
        getAbsHashes: getAbsHashes,
        getBlocks:    getBlocks,

        getRelHeaders:  getRelHeaders,
        getAbsHeaders:  getAbsHeaders,
        getBlockBodies: getBlockBodies,

        getReceipts: getReceipts,

        version: version,
    }
}

// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
    atomic.StoreInt32(&p.blockIdle, 0)
    atomic.StoreInt32(&p.receiptIdle, 0)
    atomic.StoreInt32(&p.blockCapacity, 1)
    atomic.StoreInt32(&p.receiptCapacity, 1)
    p.ignored.Clear()
}

// Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch61(request *fetchRequest) error {
    // Short circuit if the peer is already fetching
    if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
        return errAlreadyFetching
    }
    p.blockStarted = time.Now()

    // Convert the hash set to a retrievable slice
    hashes := make([]common.Hash, 0, len(request.Hashes))
    for hash, _ := range request.Hashes {
        hashes = append(hashes, hash)
    }
    go p.getBlocks(hashes)

    return nil
}

// FetchBodies sends a block body retrieval request to the remote peer.
func (p *peer) FetchBodies(request *fetchRequest) error {
    // Short circuit if the peer is already fetching
    if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
        return errAlreadyFetching
    }
    p.blockStarted = time.Now()

    // Convert the header set to a retrievable slice
    hashes := make([]common.Hash, 0, len(request.Headers))
    for _, header := range request.Headers {
        hashes = append(hashes, header.Hash())
    }
    go p.getBlockBodies(hashes)

    return nil
}

// FetchReceipts sends a receipt retrieval request to the remote peer.
func (p *peer) FetchReceipts(request *fetchRequest) error {
    // Short circuit if the peer is already fetching
    if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
        return errAlreadyFetching
    }
    p.receiptStarted = time.Now()

    // Convert the header set to a retrievable slice
    hashes := make([]common.Hash, 0, len(request.Headers))
    for _, header := range request.Headers {
        hashes = append(hashes, header.Hash())
    }
    go p.getReceipts(hashes)

    return nil
}

// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetBlocksIdle() {
    p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
}

// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block body retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetBodiesIdle() {
    p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
}

// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its receipt retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetReceiptsIdle() {
    p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
}

// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
    // Update the peer's download allowance based on previous performance
    scale := 2.0
    if time.Since(started) > softTTL {
        scale = 0.5
        if time.Since(started) > hardTTL {
            scale = 1 / float64(maxFetch) // reduces capacity to 1
        }
    }
    for {
        // Calculate the new download bandwidth allowance
        prev := atomic.LoadInt32(capacity)
        next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))

        // Try to update the old value
        if atomic.CompareAndSwapInt32(capacity, prev, next) {
            // If we're having problems at 1 capacity, try to find better peers
            if next == 1 {
                p.Demote()
            }
            break
        }
    }
    // Set the peer to idle to allow further fetch requests
    atomic.StoreInt32(idle, 0)
}

// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered bandwidth capacity.
func (p *peer) BlockCapacity() int {
    return int(atomic.LoadInt32(&p.blockCapacity))
}

// ReceiptCapacity retrieves the peers block download allowance based on its
// previously discovered bandwidth capacity.
func (p *peer) ReceiptCapacity() int {
    return int(atomic.LoadInt32(&p.receiptCapacity))
}

// Promote increases the peer's reputation.
func (p *peer) Promote() {
    atomic.AddInt32(&p.rep, 1)
}

// Demote decreases the peer's reputation or leaves it at 0.
func (p *peer) Demote() {
    for {
        // Calculate the new reputation value
        prev := atomic.LoadInt32(&p.rep)
        next := prev / 2

        // Try to update the old value
        if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
            return
        }
    }
}

// String implements fmt.Stringer.
func (p *peer) String() string {
    return fmt.Sprintf("Peer %s [%s]", p.id,
        fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
            fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
            fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
            fmt.Sprintf("ignored %4d", p.ignored.Size()),
    )
}

// peerSet represents the collection of active peer participating in the block
// download procedure.
type peerSet struct {
    peers map[string]*peer
    lock  sync.RWMutex
}

// newPeerSet creates a new peer set top track the active download sources.
func newPeerSet() *peerSet {
    return &peerSet{
        peers: make(map[string]*peer),
    }
}

// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    for _, peer := range ps.peers {
        peer.Reset()
    }
}

// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
    ps.lock.Lock()
    defer ps.lock.Unlock()

    if _, ok := ps.peers[p.id]; ok {
        return errAlreadyRegistered
    }
    ps.peers[p.id] = p
    return nil
}

// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
    ps.lock.Lock()
    defer ps.lock.Unlock()

    if _, ok := ps.peers[id]; !ok {
        return errNotRegistered
    }
    delete(ps.peers, id)
    return nil
}

// Peer retrieves the registered peer with the given id.
func (ps *peerSet) Peer(id string) *peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    return ps.peers[id]
}

// Len returns if the current number of peers in the set.
func (ps *peerSet) Len() int {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    return len(ps.peers)
}

// AllPeers retrieves a flat list of all the peers within the set.
func (ps *peerSet) AllPeers() []*peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    list := make([]*peer, 0, len(ps.peers))
    for _, p := range ps.peers {
        list = append(list, p)
    }
    return list
}

// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    idle, total := make([]*peer, 0, len(ps.peers)), 0
    for _, p := range ps.peers {
        if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
            if atomic.LoadInt32(&p.blockIdle) == 0 {
                idle = append(idle, p)
            }
            total++
        }
    }
    for i := 0; i < len(idle); i++ {
        for j := i + 1; j < len(idle); j++ {
            if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
                idle[i], idle[j] = idle[j], idle[i]
            }
        }
    }
    return idle, total
}

// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    idle, total := make([]*peer, 0, len(ps.peers)), 0
    for _, p := range ps.peers {
        if p.version >= 63 {
            if atomic.LoadInt32(&p.receiptIdle) == 0 {
                idle = append(idle, p)
            }
            total++
        }
    }
    for i := 0; i < len(idle); i++ {
        for j := i + 1; j < len(idle); j++ {
            if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
                idle[i], idle[j] = idle[j], idle[i]
            }
        }
    }
    return idle, total
}