aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go44
1 files changed, 38 insertions, 6 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index b68c5bc82..4d1aa4e93 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -2,16 +2,20 @@ package downloader
import (
"math"
+ "math/big"
"sync"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"gopkg.in/fatih/set.v0"
)
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
- hashPool *set.Set
+ hashPool *set.Set
+ fetchPool *set.Set
+ blockHashes *set.Set
mu sync.Mutex
fetching map[string]*chunk
@@ -20,8 +24,10 @@ type queue struct {
func newqueue() *queue {
return &queue{
- hashPool: set.New(),
- fetching: make(map[string]*chunk),
+ hashPool: set.New(),
+ fetchPool: set.New(),
+ blockHashes: set.New(),
+ fetching: make(map[string]*chunk),
}
}
@@ -50,6 +56,8 @@ func (c *queue) get(p *peer, max int) *chunk {
})
// remove the fetchable hashes from hash pool
c.hashPool.Separate(hashes)
+ c.fetchPool.Merge(hashes)
+
// Create a new chunk for the seperated hashes. The time is being used
// to reset the chunk (timeout)
chunk := &chunk{hashes, time.Now()}
@@ -60,6 +68,22 @@ func (c *queue) get(p *peer, max int) *chunk {
return chunk
}
+func (c *queue) has(hash common.Hash) bool {
+ return c.hashPool.Has(hash) || c.fetchPool.Has(hash)
+}
+
+func (c *queue) addBlock(id string, block *types.Block, td *big.Int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ // when adding a block make sure it doesn't already exist
+ if !c.blockHashes.Has(block.Hash()) {
+ c.hashPool.Remove(block.Hash())
+ c.blocks = append(c.blocks, block)
+ }
+}
+
+// deliver delivers a chunk to the queue that was requested of the peer
func (c *queue) deliver(id string, blocks []*types.Block) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -70,15 +94,19 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
delete(c.fetching, id)
// seperate the blocks and the hashes
- chunk.seperate(blocks)
+ blockHashes := chunk.fetchedHashes(blocks)
+ // merge block hashes
+ c.blockHashes.Merge(blockHashes)
// Add the blocks
c.blocks = append(c.blocks, blocks...)
// Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes)
+ c.fetchPool.Separate(chunk.hashes)
}
}
+// puts puts sets of hashes on to the queue for fetching
func (c *queue) put(hashes *set.Set) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -91,8 +119,12 @@ type chunk struct {
itime time.Time
}
-func (ch *chunk) seperate(blocks []*types.Block) {
+func (ch *chunk) fetchedHashes(blocks []*types.Block) *set.Set {
+ fhashes := set.New()
for _, block := range blocks {
- ch.hashes.Remove(block.Hash())
+ fhashes.Add(block.Hash())
}
+ ch.hashes.Separate(fhashes)
+
+ return fhashes
}