diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-12 18:38:25 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-12 18:38:25 +0800 |
commit | acf8452c33061c200c31529e9ffae0575b859ce5 (patch) | |
tree | b4afd91f6401cb813fb4860db7e6ef29fa3d54ca /eth/downloader/chunk.go | |
parent | 61db7a71dd809a003b5c899398b02945c04890c9 (diff) | |
download | go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.tar go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.tar.gz go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.tar.bz2 go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.tar.lz go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.tar.xz go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.tar.zst go-tangerine-acf8452c33061c200c31529e9ffae0575b859ce5.zip |
downloader: implemented new downloader
Diffstat (limited to 'eth/downloader/chunk.go')
-rw-r--r-- | eth/downloader/chunk.go | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/eth/downloader/chunk.go b/eth/downloader/chunk.go new file mode 100644 index 000000000..b68c5bc82 --- /dev/null +++ b/eth/downloader/chunk.go @@ -0,0 +1,98 @@ +package downloader + +import ( + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "gopkg.in/fatih/set.v0" +) + +// queue represents hashes that are either need fetching or are being fetched +type queue struct { + hashPool *set.Set + + mu sync.Mutex + fetching map[string]*chunk + blocks []*types.Block +} + +func newqueue() *queue { + return &queue{ + hashPool: set.New(), + fetching: make(map[string]*chunk), + } +} + +// reserve a `max` set of hashes for `p` peer. +func (c *queue) get(p *peer, max int) *chunk { + c.mu.Lock() + defer c.mu.Unlock() + + // return nothing if the pool has been depleted + if c.hashPool.Size() == 0 { + return nil + } + + limit := int(math.Min(float64(max), float64(c.hashPool.Size()))) + // Create a new set of hashes + hashes, i := set.New(), 0 + c.hashPool.Each(func(v interface{}) bool { + if i == limit { + return false + } + + hashes.Add(v) + i++ + + return true + }) + // remove the fetchable hashes from hash pool + c.hashPool.Separate(hashes) + // Create a new chunk for the seperated hashes. The time is being used + // to reset the chunk (timeout) + chunk := &chunk{hashes, time.Now()} + // register as 'fetching' state + c.fetching[p.id] = chunk + + // create new chunk for peer + return chunk +} + +func (c *queue) deliver(id string, blocks []*types.Block) { + c.mu.Lock() + defer c.mu.Unlock() + + chunk := c.fetching[id] + // If the chunk was never requested simply ignore it + if chunk != nil { + delete(c.fetching, id) + + // seperate the blocks and the hashes + chunk.seperate(blocks) + // Add the blocks + c.blocks = append(c.blocks, blocks...) + + // Add back whatever couldn't be delivered + c.hashPool.Merge(chunk.hashes) + } +} + +func (c *queue) put(hashes *set.Set) { + c.mu.Lock() + defer c.mu.Unlock() + + c.hashPool.Merge(hashes) +} + +type chunk struct { + hashes *set.Set + itime time.Time +} + +func (ch *chunk) seperate(blocks []*types.Block) { + for _, block := range blocks { + ch.hashes.Remove(block.Hash()) + } +} |