aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go65
1 files changed, 55 insertions, 10 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index a21a44706..1b63a5ffb 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -1,6 +1,7 @@
package downloader
import (
+ "fmt"
"math"
"sync"
"time"
@@ -18,7 +19,9 @@ type queue struct {
mu sync.Mutex
fetching map[string]*chunk
- blocks []*types.Block
+
+ blockOffset int
+ blocks []*types.Block
}
func newqueue() *queue {
@@ -34,6 +37,10 @@ func (c *queue) reset() {
c.mu.Lock()
defer c.mu.Unlock()
+ c.resetNoTS()
+}
+func (c *queue) resetNoTS() {
+ c.blockOffset = 0
c.hashPool.Clear()
c.fetchPool.Clear()
c.blockHashes.Clear()
@@ -41,6 +48,10 @@ func (c *queue) reset() {
c.fetching = make(map[string]*chunk)
}
+func (c *queue) size() int {
+ return c.hashPool.Size() + c.blockHashes.Size() + c.fetchPool.Size()
+}
+
// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
c.mu.Lock()
@@ -89,22 +100,27 @@ func (c *queue) get(p *peer, max int) *chunk {
}
func (c *queue) has(hash common.Hash) bool {
- return c.hashPool.Has(hash) || c.fetchPool.Has(hash)
+ return c.hashPool.Has(hash) || c.fetchPool.Has(hash) || c.blockHashes.Has(hash)
}
-func (c *queue) addBlock(id string, block *types.Block) {
+func (c *queue) getBlock(hash common.Hash) *types.Block {
c.mu.Lock()
defer c.mu.Unlock()
- // when adding a block make sure it doesn't already exist
- if !c.blockHashes.Has(block.Hash()) {
- c.hashPool.Remove(block.Hash())
- c.blocks = append(c.blocks, block)
+ if !c.blockHashes.Has(hash) {
+ return nil
+ }
+
+ for _, block := range c.blocks {
+ if block.Hash() == hash {
+ return block
+ }
}
+ return nil
}
// deliver delivers a chunk to the queue that was requested of the peer
-func (c *queue) deliver(id string, blocks []*types.Block) {
+func (c *queue) deliver(id string, blocks []*types.Block) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -119,16 +135,45 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
chunk.peer.ignored.Merge(chunk.hashes)
}
+ // Add the blocks
+ for i, block := range blocks {
+ // See (1) for future limitation
+ n := int(block.NumberU64()) - c.blockOffset
+ if n > len(c.blocks) || n < 0 {
+ // set the error and set the blocks which could be processed
+ // abort the rest of the blocks (FIXME this could be improved)
+ err = fmt.Errorf("received block which overflow (N=%v O=%v)", block.Number(), c.blockOffset)
+ blocks = blocks[:i]
+ break
+ }
+ c.blocks[n] = block
+ }
// seperate the blocks and the hashes
blockHashes := chunk.fetchedHashes(blocks)
// merge block hashes
c.blockHashes.Merge(blockHashes)
- // Add the blocks
- c.blocks = append(c.blocks, blocks...)
// Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes)
+ // Remove the hashes from the fetch pool
c.fetchPool.Separate(chunk.hashes)
}
+
+ return
+}
+
+func (c *queue) alloc(offset, size int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.blockOffset < offset {
+ c.blockOffset = offset
+ }
+
+ // (1) XXX at some point we could limit allocation to memory and use the disk
+ // to store future blocks.
+ if len(c.blocks) < size {
+ c.blocks = append(c.blocks, make([]*types.Block, size)...)
+ }
}
// puts puts sets of hashes on to the queue for fetching