aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
blob: b68c5bc822f794af370876b5ad07edb2f51997af (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package downloader

import (
    "math"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/core/types"
    "gopkg.in/fatih/set.v0"
)

// queue represents hashes that are either need fetching or are being fetched
type queue struct {
    hashPool *set.Set

    mu       sync.Mutex
    fetching map[string]*chunk
    blocks   []*types.Block
}

func newqueue() *queue {
    return &queue{
        hashPool: set.New(),
        fetching: make(map[string]*chunk),
    }
}

// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
    c.mu.Lock()
    defer c.mu.Unlock()

    // return nothing if the pool has been depleted
    if c.hashPool.Size() == 0 {
        return nil
    }

    limit := int(math.Min(float64(max), float64(c.hashPool.Size())))
    // Create a new set of hashes
    hashes, i := set.New(), 0
    c.hashPool.Each(func(v interface{}) bool {
        if i == limit {
            return false
        }

        hashes.Add(v)
        i++

        return true
    })
    // remove the fetchable hashes from hash pool
    c.hashPool.Separate(hashes)
    // Create a new chunk for the seperated hashes. The time is being used
    // to reset the chunk (timeout)
    chunk := &chunk{hashes, time.Now()}
    // register as 'fetching' state
    c.fetching[p.id] = chunk

    // create new chunk for peer
    return chunk
}

func (c *queue) deliver(id string, blocks []*types.Block) {
    c.mu.Lock()
    defer c.mu.Unlock()

    chunk := c.fetching[id]
    // If the chunk was never requested simply ignore it
    if chunk != nil {
        delete(c.fetching, id)

        // seperate the blocks and the hashes
        chunk.seperate(blocks)
        // Add the blocks
        c.blocks = append(c.blocks, blocks...)

        // Add back whatever couldn't be delivered
        c.hashPool.Merge(chunk.hashes)
    }
}

func (c *queue) put(hashes *set.Set) {
    c.mu.Lock()
    defer c.mu.Unlock()

    c.hashPool.Merge(hashes)
}

type chunk struct {
    hashes *set.Set
    itime  time.Time
}

func (ch *chunk) seperate(blocks []*types.Block) {
    for _, block := range blocks {
        ch.hashes.Remove(block.Hash())
    }
}