aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/pyramid.go
diff options
context:
space:
mode:
authoraron <homotopycolimit@users.noreply.github.com>2016-10-08 18:33:52 +0800
committeraron <homotopycolimit@users.noreply.github.com>2016-10-11 05:34:44 +0800
commita45421baaf2065203fa133a932967d5ea18fd0f7 (patch)
tree5268c7a5c76d91906d0fd713a7b16c8f734ac673 /swarm/storage/pyramid.go
parentbe6a3696a9642c9511c565f4c35d1c8aae6434ad (diff)
downloaddexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar
dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.gz
dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.bz2
dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.lz
dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.xz
dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.tar.zst
dexon-a45421baaf2065203fa133a932967d5ea18fd0f7.zip
swarm/storage: fix chunker when reader is broken
* brokenLimitedReader gives error after half size * TestRandomBrokenData tests chunker with broken reader * add blocking quitC (instead of errC) and use errC only for errors * don't close chunkC in tester Split, * use quitC to quit chunk storage loop
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r--swarm/storage/pyramid.go16
1 files changed, 3 insertions, 13 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
index 3c1ef17a0..79e1927b9 100644
--- a/swarm/storage/pyramid.go
+++ b/swarm/storage/pyramid.go
@@ -81,7 +81,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk
chunks := (size + self.chunkSize - 1) / self.chunkSize
depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1
- // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth)
results := Tree{
Chunks: chunks,
@@ -99,26 +98,24 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk
go self.processor(pend, swg, tasks, chunkC, &results)
}
// Feed the chunks into the task pool
+ read := 0
for index := 0; ; index++ {
buffer := make([]byte, self.chunkSize+8)
n, err := data.Read(buffer[8:])
- last := err == io.ErrUnexpectedEOF || err == io.EOF
- // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth)
+ read += n
+ last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF
if err != nil && !last {
- // glog.V(logger.Info).Infof("error: %v", err)
close(abortC)
break
}
binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
pend.Add(1)
- // glog.V(logger.Info).Infof("-> task %v (%v)", index, n)
select {
case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
case <-abortC:
return nil, err
}
if last {
- // glog.V(logger.Info).Infof("last task %v (%v)", index, n)
break
}
}
@@ -126,7 +123,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk
close(tasks)
pend.Wait()
- // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0])
key := results.Levels[0][0].Children[0][:]
return key, nil
}
@@ -134,12 +130,10 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk
func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
defer pend.Done()
- // glog.V(logger.Info).Infof("processor started")
// Start processing leaf chunks ad infinitum
hasher := self.hashFunc()
for task := range tasks {
depth, pow := len(results.Levels)-1, self.branches
- // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last)
size := task.Size
data := task.Data
var node *Node
@@ -171,10 +165,8 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas
}
node = &Node{pending, 0, make([]common.Hash, pending), last}
results.Levels[depth][task.Index/pow] = node
- // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending)
}
node.Pending--
- // glog.V(logger.Info).Infof("pending now: %v", node.Pending)
i := task.Index / (pow / self.branches) % self.branches
if last {
node.Last = true
@@ -182,7 +174,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas
copy(node.Children[i][:], hash)
node.Size += size
left := node.Pending
- // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size)
if chunkC != nil {
if swg != nil {
swg.Add(1)
@@ -198,7 +189,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas
results.Lock.Unlock()
// If there's more work to be done, leave for others
- // glog.V(logger.Info).Infof("left %v", left)
if left > 0 {
break
}