aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/pyramid.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r--swarm/storage/pyramid.go211
1 files changed, 211 insertions, 0 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
new file mode 100644
index 000000000..3c1ef17a0
--- /dev/null
+++ b/swarm/storage/pyramid.go
@@ -0,0 +1,211 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "math"
+ "strings"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+)
+
+const (
+ processors = 8
+)
+
+type Tree struct {
+ Chunks int64
+ Levels []map[int64]*Node
+ Lock sync.RWMutex
+}
+
+type Node struct {
+ Pending int64
+ Size uint64
+ Children []common.Hash
+ Last bool
+}
+
+func (self *Node) String() string {
+ var children []string
+ for _, node := range self.Children {
+ children = append(children, node.Hex())
+ }
+ return fmt.Sprintf("pending: %v, size: %v, last :%v, children: %v", self.Pending, self.Size, self.Last, strings.Join(children, ", "))
+}
+
+type Task struct {
+ Index int64 // Index of the chunk being processed
+ Size uint64
+ Data []byte // Binary blob of the chunk
+ Last bool
+}
+
+type PyramidChunker struct {
+ hashFunc Hasher
+ chunkSize int64
+ hashSize int64
+ branches int64
+ workerCount int
+}
+
+func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
+ self = &PyramidChunker{}
+ self.hashFunc = MakeHashFunc(params.Hash)
+ self.branches = params.Branches
+ self.hashSize = int64(self.hashFunc().Size())
+ self.chunkSize = self.hashSize * self.branches
+ self.workerCount = 1
+ return
+}
+
+func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+
+ chunks := (size + self.chunkSize - 1) / self.chunkSize
+ depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1
+ // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth)
+
+ results := Tree{
+ Chunks: chunks,
+ Levels: make([]map[int64]*Node, depth),
+ }
+ for i := 0; i < depth; i++ {
+ results.Levels[i] = make(map[int64]*Node)
+ }
+ // Create a pool of workers to crunch through the file
+ tasks := make(chan *Task, 2*processors)
+ pend := new(sync.WaitGroup)
+ abortC := make(chan bool)
+ for i := 0; i < processors; i++ {
+ pend.Add(1)
+ go self.processor(pend, swg, tasks, chunkC, &results)
+ }
+ // Feed the chunks into the task pool
+ for index := 0; ; index++ {
+ buffer := make([]byte, self.chunkSize+8)
+ n, err := data.Read(buffer[8:])
+ last := err == io.ErrUnexpectedEOF || err == io.EOF
+ // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth)
+ if err != nil && !last {
+ // glog.V(logger.Info).Infof("error: %v", err)
+ close(abortC)
+ break
+ }
+ binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
+ pend.Add(1)
+ // glog.V(logger.Info).Infof("-> task %v (%v)", index, n)
+ select {
+ case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
+ case <-abortC:
+ return nil, err
+ }
+ if last {
+ // glog.V(logger.Info).Infof("last task %v (%v)", index, n)
+ break
+ }
+ }
+ // Wait for the workers and return
+ close(tasks)
+ pend.Wait()
+
+ // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0])
+ key := results.Levels[0][0].Children[0][:]
+ return key, nil
+}
+
+func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
+ defer pend.Done()
+
+ // glog.V(logger.Info).Infof("processor started")
+ // Start processing leaf chunks ad infinitum
+ hasher := self.hashFunc()
+ for task := range tasks {
+ depth, pow := len(results.Levels)-1, self.branches
+ // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last)
+ size := task.Size
+ data := task.Data
+ var node *Node
+ for depth >= 0 {
+ // New chunk received, reset the hasher and start processing
+ hasher.Reset()
+ if node == nil { // Leaf node, hash the data chunk
+ hasher.Write(task.Data)
+ } else { // Internal node, hash the children
+ size = node.Size
+ data = make([]byte, hasher.Size()*len(node.Children)+8)
+ binary.LittleEndian.PutUint64(data[:8], size)
+
+ hasher.Write(data[:8])
+ for i, hash := range node.Children {
+ copy(data[i*hasher.Size()+8:], hash[:])
+ hasher.Write(hash[:])
+ }
+ }
+ hash := hasher.Sum(nil)
+ last := task.Last || (node != nil) && node.Last
+ // Insert the subresult into the memoization tree
+ results.Lock.Lock()
+ if node = results.Levels[depth][task.Index/pow]; node == nil {
+ // Figure out the pending tasks
+ pending := self.branches
+ if task.Index/pow == results.Chunks/pow {
+ pending = (results.Chunks + pow/self.branches - 1) / (pow / self.branches) % self.branches
+ }
+ node = &Node{pending, 0, make([]common.Hash, pending), last}
+ results.Levels[depth][task.Index/pow] = node
+ // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending)
+ }
+ node.Pending--
+ // glog.V(logger.Info).Infof("pending now: %v", node.Pending)
+ i := task.Index / (pow / self.branches) % self.branches
+ if last {
+ node.Last = true
+ }
+ copy(node.Children[i][:], hash)
+ node.Size += size
+ left := node.Pending
+ // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size)
+ if chunkC != nil {
+ if swg != nil {
+ swg.Add(1)
+ }
+ select {
+ case chunkC <- &Chunk{Key: hash, SData: data, wg: swg}:
+ // case <- self.quitC
+ }
+ }
+ if depth+1 < len(results.Levels) {
+ delete(results.Levels[depth+1], task.Index/(pow/self.branches))
+ }
+
+ results.Lock.Unlock()
+ // If there's more work to be done, leave for others
+ // glog.V(logger.Info).Infof("left %v", left)
+ if left > 0 {
+ break
+ }
+ // We're the last ones in this batch, merge the children together
+ depth--
+ pow *= self.branches
+ }
+ pend.Done()
+ }
+}