aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-05-01 06:23:51 +0800
committerobscuren <geffobscura@gmail.com>2015-05-01 21:58:44 +0800
commit016f152b36106130fa42514ef6cfacc09dfc3142 (patch)
treea38fa42c59a8a4e0c18b68fc8e5dcb6bd533719b /eth/downloader/queue.go
parent8595198c1b56364bb9589a912d2a9797b93a3357 (diff)
downloadgo-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.gz
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.bz2
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.lz
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.xz
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.tar.zst
go-tangerine-016f152b36106130fa42514ef6cfacc09dfc3142.zip
eth, eth/downloader: Moved block processing & graceful shutdown
The downloader is no longer responsible for processing blocks. The eth-protocol handler now takes care of this instead. Added graceful shutdown during block processing. Closes #846
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go58
1 files changed, 54 insertions, 4 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index a21a44706..13768229f 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -18,7 +18,9 @@ type queue struct {
mu sync.Mutex
fetching map[string]*chunk
- blocks []*types.Block
+
+ blockOffset int
+ blocks []*types.Block
}
func newqueue() *queue {
@@ -34,6 +36,10 @@ func (c *queue) reset() {
c.mu.Lock()
defer c.mu.Unlock()
+ c.resetNoTS()
+}
+func (c *queue) resetNoTS() {
+ c.blockOffset = 0
c.hashPool.Clear()
c.fetchPool.Clear()
c.blockHashes.Clear()
@@ -41,6 +47,10 @@ func (c *queue) reset() {
c.fetching = make(map[string]*chunk)
}
+func (c *queue) size() int {
+ return c.hashPool.Size() + c.blockHashes.Size() + c.fetchPool.Size()
+}
+
// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
c.mu.Lock()
@@ -89,7 +99,7 @@ func (c *queue) get(p *peer, max int) *chunk {
}
func (c *queue) has(hash common.Hash) bool {
- return c.hashPool.Has(hash) || c.fetchPool.Has(hash)
+ return c.hashPool.Has(hash) || c.fetchPool.Has(hash) || c.blockHashes.Has(hash)
}
func (c *queue) addBlock(id string, block *types.Block) {
@@ -103,8 +113,24 @@ func (c *queue) addBlock(id string, block *types.Block) {
}
}
+func (c *queue) getBlock(hash common.Hash) *types.Block {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if !c.blockHashes.Has(hash) {
+ return nil
+ }
+
+ for _, block := range c.blocks {
+ if block.Hash() == hash {
+ return block
+ }
+ }
+ return nil
+}
+
// deliver delivers a chunk to the queue that was requested of the peer
-func (c *queue) deliver(id string, blocks []*types.Block) {
+func (c *queue) deliver(id string, blocks []*types.Block) error {
c.mu.Lock()
defer c.mu.Unlock()
@@ -124,11 +150,35 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
// merge block hashes
c.blockHashes.Merge(blockHashes)
// Add the blocks
- c.blocks = append(c.blocks, blocks...)
+ for _, block := range blocks {
+ // See (1) for future limitation
+ n := int(block.NumberU64()) - c.blockOffset
+ if n > len(c.blocks) || n < 0 {
+ return errBlockNumberOverflow
+ }
+ c.blocks[n] = block
+ }
// Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes)
c.fetchPool.Separate(chunk.hashes)
}
+
+ return nil
+}
+
+func (c *queue) alloc(offset, size int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.blockOffset < offset {
+ c.blockOffset = offset
+ }
+
+ // (1) XXX at some point we could limit allocation to memory and use the disk
+ // to store future blocks.
+ if len(c.blocks) < size {
+ c.blocks = append(c.blocks, make([]*types.Block, size)...)
+ }
}
// puts puts sets of hashes on to the queue for fetching