diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 65 |
1 files changed, 55 insertions, 10 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index a21a44706..1b63a5ffb 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -1,6 +1,7 @@ package downloader import ( + "fmt" "math" "sync" "time" @@ -18,7 +19,9 @@ type queue struct { mu sync.Mutex fetching map[string]*chunk - blocks []*types.Block + + blockOffset int + blocks []*types.Block } func newqueue() *queue { @@ -34,6 +37,10 @@ func (c *queue) reset() { c.mu.Lock() defer c.mu.Unlock() + c.resetNoTS() +} +func (c *queue) resetNoTS() { + c.blockOffset = 0 c.hashPool.Clear() c.fetchPool.Clear() c.blockHashes.Clear() @@ -41,6 +48,10 @@ func (c *queue) reset() { c.fetching = make(map[string]*chunk) } +func (c *queue) size() int { + return c.hashPool.Size() + c.blockHashes.Size() + c.fetchPool.Size() +} + // reserve a `max` set of hashes for `p` peer. func (c *queue) get(p *peer, max int) *chunk { c.mu.Lock() @@ -89,22 +100,27 @@ func (c *queue) get(p *peer, max int) *chunk { } func (c *queue) has(hash common.Hash) bool { - return c.hashPool.Has(hash) || c.fetchPool.Has(hash) + return c.hashPool.Has(hash) || c.fetchPool.Has(hash) || c.blockHashes.Has(hash) } -func (c *queue) addBlock(id string, block *types.Block) { +func (c *queue) getBlock(hash common.Hash) *types.Block { c.mu.Lock() defer c.mu.Unlock() - // when adding a block make sure it doesn't already exist - if !c.blockHashes.Has(block.Hash()) { - c.hashPool.Remove(block.Hash()) - c.blocks = append(c.blocks, block) + if !c.blockHashes.Has(hash) { + return nil + } + + for _, block := range c.blocks { + if block.Hash() == hash { + return block + } } + return nil } // deliver delivers a chunk to the queue that was requested of the peer -func (c *queue) deliver(id string, blocks []*types.Block) { +func (c *queue) deliver(id string, blocks []*types.Block) (err error) { c.mu.Lock() defer c.mu.Unlock() @@ -119,16 +135,45 @@ func (c *queue) deliver(id string, blocks []*types.Block) { chunk.peer.ignored.Merge(chunk.hashes) } + // Add the blocks + for i, block := range blocks { + // See (1) for future limitation + n := int(block.NumberU64()) - c.blockOffset + if n > len(c.blocks) || n < 0 { + // set the error and set the blocks which could be processed + // abort the rest of the blocks (FIXME this could be improved) + err = fmt.Errorf("received block which overflow (N=%v O=%v)", block.Number(), c.blockOffset) + blocks = blocks[:i] + break + } + c.blocks[n] = block + } // seperate the blocks and the hashes blockHashes := chunk.fetchedHashes(blocks) // merge block hashes c.blockHashes.Merge(blockHashes) - // Add the blocks - c.blocks = append(c.blocks, blocks...) // Add back whatever couldn't be delivered c.hashPool.Merge(chunk.hashes) + // Remove the hashes from the fetch pool c.fetchPool.Separate(chunk.hashes) } + + return +} + +func (c *queue) alloc(offset, size int) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.blockOffset < offset { + c.blockOffset = offset + } + + // (1) XXX at some point we could limit allocation to memory and use the disk + // to store future blocks. + if len(c.blocks) < size { + c.blocks = append(c.blocks, make([]*types.Block, size)...) + } } // puts puts sets of hashes on to the queue for fetching |