diff options
author | aron <homotopycolimit@users.noreply.github.com> | 2016-10-08 18:33:52 +0800 |
---|---|---|
committer | aron <homotopycolimit@users.noreply.github.com> | 2016-10-11 05:34:44 +0800 |
commit | a45421baaf2065203fa133a932967d5ea18fd0f7 (patch) | |
tree | 5268c7a5c76d91906d0fd713a7b16c8f734ac673 /swarm/storage/pyramid.go | |
parent | be6a3696a9642c9511c565f4c35d1c8aae6434ad (diff) | |
download | dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.gz dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.bz2 dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.lz dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.xz dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.zst dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.zip |
swarm/storage: fix chunker when reader is broken
* brokenLimitedReader gives error after half size
* TestRandomBrokenData tests chunker with broken reader
* add blocking quitC (instead of errC) and use errC only for errors
* don't close chunkC in tester Split,
* use quitC to quit chunk storage loop
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r-- | swarm/storage/pyramid.go | 16 |
1 files changed, 3 insertions, 13 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 3c1ef17a0..79e1927b9 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -81,7 +81,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk chunks := (size + self.chunkSize - 1) / self.chunkSize depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1 - // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth) results := Tree{ Chunks: chunks, @@ -99,26 +98,24 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk go self.processor(pend, swg, tasks, chunkC, &results) } // Feed the chunks into the task pool + read := 0 for index := 0; ; index++ { buffer := make([]byte, self.chunkSize+8) n, err := data.Read(buffer[8:]) - last := err == io.ErrUnexpectedEOF || err == io.EOF - // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth) + read += n + last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF if err != nil && !last { - // glog.V(logger.Info).Infof("error: %v", err) close(abortC) break } binary.LittleEndian.PutUint64(buffer[:8], uint64(n)) pend.Add(1) - // glog.V(logger.Info).Infof("-> task %v (%v)", index, n) select { case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}: case <-abortC: return nil, err } if last { - // glog.V(logger.Info).Infof("last task %v (%v)", index, n) break } } @@ -126,7 +123,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk close(tasks) pend.Wait() - // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0]) key := results.Levels[0][0].Children[0][:] return key, nil } @@ -134,12 +130,10 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) { defer pend.Done() - // glog.V(logger.Info).Infof("processor started") // Start processing leaf chunks ad infinitum hasher := self.hashFunc() for task := range tasks { depth, pow := len(results.Levels)-1, self.branches - // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last) size := task.Size data := task.Data var node *Node @@ -171,10 +165,8 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas } node = &Node{pending, 0, make([]common.Hash, pending), last} results.Levels[depth][task.Index/pow] = node - // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending) } node.Pending-- - // glog.V(logger.Info).Infof("pending now: %v", node.Pending) i := task.Index / (pow / self.branches) % self.branches if last { node.Last = true @@ -182,7 +174,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas copy(node.Children[i][:], hash) node.Size += size left := node.Pending - // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size) if chunkC != nil { if swg != nil { swg.Add(1) @@ -198,7 +189,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas results.Lock.Unlock() // If there's more work to be done, leave for others - // glog.V(logger.Info).Infof("left %v", left) if left > 0 { break } |