blob: b68c5bc822f794af370876b5ad07edb2f51997af (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package downloader
import (
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"gopkg.in/fatih/set.v0"
)
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
hashPool *set.Set
mu sync.Mutex
fetching map[string]*chunk
blocks []*types.Block
}
func newqueue() *queue {
return &queue{
hashPool: set.New(),
fetching: make(map[string]*chunk),
}
}
// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
c.mu.Lock()
defer c.mu.Unlock()
// return nothing if the pool has been depleted
if c.hashPool.Size() == 0 {
return nil
}
limit := int(math.Min(float64(max), float64(c.hashPool.Size())))
// Create a new set of hashes
hashes, i := set.New(), 0
c.hashPool.Each(func(v interface{}) bool {
if i == limit {
return false
}
hashes.Add(v)
i++
return true
})
// remove the fetchable hashes from hash pool
c.hashPool.Separate(hashes)
// Create a new chunk for the seperated hashes. The time is being used
// to reset the chunk (timeout)
chunk := &chunk{hashes, time.Now()}
// register as 'fetching' state
c.fetching[p.id] = chunk
// create new chunk for peer
return chunk
}
func (c *queue) deliver(id string, blocks []*types.Block) {
c.mu.Lock()
defer c.mu.Unlock()
chunk := c.fetching[id]
// If the chunk was never requested simply ignore it
if chunk != nil {
delete(c.fetching, id)
// seperate the blocks and the hashes
chunk.seperate(blocks)
// Add the blocks
c.blocks = append(c.blocks, blocks...)
// Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes)
}
}
func (c *queue) put(hashes *set.Set) {
c.mu.Lock()
defer c.mu.Unlock()
c.hashPool.Merge(hashes)
}
type chunk struct {
hashes *set.Set
itime time.Time
}
func (ch *chunk) seperate(blocks []*types.Block) {
for _, block := range blocks {
ch.hashes.Remove(block.Hash())
}
}
|