diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 58 |
1 files changed, 54 insertions, 4 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index a21a44706..13768229f 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -18,7 +18,9 @@ type queue struct { mu sync.Mutex fetching map[string]*chunk - blocks []*types.Block + + blockOffset int + blocks []*types.Block } func newqueue() *queue { @@ -34,6 +36,10 @@ func (c *queue) reset() { c.mu.Lock() defer c.mu.Unlock() + c.resetNoTS() +} +func (c *queue) resetNoTS() { + c.blockOffset = 0 c.hashPool.Clear() c.fetchPool.Clear() c.blockHashes.Clear() @@ -41,6 +47,10 @@ func (c *queue) reset() { c.fetching = make(map[string]*chunk) } +func (c *queue) size() int { + return c.hashPool.Size() + c.blockHashes.Size() + c.fetchPool.Size() +} + // reserve a `max` set of hashes for `p` peer. func (c *queue) get(p *peer, max int) *chunk { c.mu.Lock() @@ -89,7 +99,7 @@ func (c *queue) get(p *peer, max int) *chunk { } func (c *queue) has(hash common.Hash) bool { - return c.hashPool.Has(hash) || c.fetchPool.Has(hash) + return c.hashPool.Has(hash) || c.fetchPool.Has(hash) || c.blockHashes.Has(hash) } func (c *queue) addBlock(id string, block *types.Block) { @@ -103,8 +113,24 @@ func (c *queue) addBlock(id string, block *types.Block) { } } +func (c *queue) getBlock(hash common.Hash) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.blockHashes.Has(hash) { + return nil + } + + for _, block := range c.blocks { + if block.Hash() == hash { + return block + } + } + return nil +} + // deliver delivers a chunk to the queue that was requested of the peer -func (c *queue) deliver(id string, blocks []*types.Block) { +func (c *queue) deliver(id string, blocks []*types.Block) error { c.mu.Lock() defer c.mu.Unlock() @@ -124,11 +150,35 @@ func (c *queue) deliver(id string, blocks []*types.Block) { // merge block hashes c.blockHashes.Merge(blockHashes) // Add the blocks - c.blocks = append(c.blocks, blocks...) + for _, block := range blocks { + // See (1) for future limitation + n := int(block.NumberU64()) - c.blockOffset + if n > len(c.blocks) || n < 0 { + return errBlockNumberOverflow + } + c.blocks[n] = block + } // Add back whatever couldn't be delivered c.hashPool.Merge(chunk.hashes) c.fetchPool.Separate(chunk.hashes) } + + return nil +} + +func (c *queue) alloc(offset, size int) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.blockOffset < offset { + c.blockOffset = offset + } + + // (1) XXX at some point we could limit allocation to memory and use the disk + // to store future blocks. + if len(c.blocks) < size { + c.blocks = append(c.blocks, make([]*types.Block, size)...) + } } // puts puts sets of hashes on to the queue for fetching |