From a45421baaf2065203fa133a932967d5ea18fd0f7 Mon Sep 17 00:00:00 2001 From: aron Date: Sat, 8 Oct 2016 12:33:52 +0200 Subject: swarm/storage: fix chunker when reader is broken * brokenLimitedReader gives error after half size * TestRandomBrokenData tests chunker with broken reader * add blocking quitC (instead of errC) and use errC only for errors * don't close chunkC in tester Split, * use quitC to quit chunk storage loop --- swarm/storage/pyramid.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) (limited to 'swarm/storage/pyramid.go') diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 3c1ef17a0..79e1927b9 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -81,7 +81,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk chunks := (size + self.chunkSize - 1) / self.chunkSize depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1 - // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth) results := Tree{ Chunks: chunks, @@ -99,26 +98,24 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk go self.processor(pend, swg, tasks, chunkC, &results) } // Feed the chunks into the task pool + read := 0 for index := 0; ; index++ { buffer := make([]byte, self.chunkSize+8) n, err := data.Read(buffer[8:]) - last := err == io.ErrUnexpectedEOF || err == io.EOF - // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth) + read += n + last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF if err != nil && !last { - // glog.V(logger.Info).Infof("error: %v", err) close(abortC) break } binary.LittleEndian.PutUint64(buffer[:8], uint64(n)) pend.Add(1) - // glog.V(logger.Info).Infof("-> task %v (%v)", index, n) select { case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}: case <-abortC: return nil, err } if last { - // glog.V(logger.Info).Infof("last task %v (%v)", index, n) break } } @@ -126,7 +123,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk close(tasks) pend.Wait() - // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0]) key := results.Levels[0][0].Children[0][:] return key, nil } @@ -134,12 +130,10 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) { defer pend.Done() - // glog.V(logger.Info).Infof("processor started") // Start processing leaf chunks ad infinitum hasher := self.hashFunc() for task := range tasks { depth, pow := len(results.Levels)-1, self.branches - // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last) size := task.Size data := task.Data var node *Node @@ -171,10 +165,8 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas } node = &Node{pending, 0, make([]common.Hash, pending), last} results.Levels[depth][task.Index/pow] = node - // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending) } node.Pending-- - // glog.V(logger.Info).Infof("pending now: %v", node.Pending) i := task.Index / (pow / self.branches) % self.branches if last { node.Last = true @@ -182,7 +174,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas copy(node.Children[i][:], hash) node.Size += size left := node.Pending - // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size) if chunkC != nil { if swg != nil { swg.Add(1) @@ -198,7 +189,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas results.Lock.Unlock() // If there's more work to be done, leave for others - // glog.V(logger.Info).Infof("left %v", left) if left > 0 { break } -- cgit v1.2.3