aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/pyramid.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/pyramid.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloadgo-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r--swarm/storage/pyramid.go520
1 files changed, 291 insertions, 229 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
index 19d493405..01172cb77 100644
--- a/swarm/storage/pyramid.go
+++ b/swarm/storage/pyramid.go
@@ -20,8 +20,11 @@ import (
"encoding/binary"
"errors"
"io"
+ "io/ioutil"
"sync"
"time"
+
+ "github.com/ethereum/go-ethereum/swarm/log"
)
/*
@@ -62,9 +65,8 @@ var (
)
const (
- ChunkProcessors = 8
- DefaultBranches int64 = 128
- splitTimeout = time.Minute * 5
+ ChunkProcessors = 8
+ splitTimeout = time.Minute * 5
)
const (
@@ -72,18 +74,39 @@ const (
TreeChunk = 1
)
-type ChunkerParams struct {
- Branches int64
- Hash string
+type PyramidSplitterParams struct {
+ SplitterParams
+ getter Getter
}
-func NewChunkerParams() *ChunkerParams {
- return &ChunkerParams{
- Branches: DefaultBranches,
- Hash: SHA3Hash,
+func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, getter Getter, chunkSize int64) *PyramidSplitterParams {
+ hashSize := putter.RefSize()
+ return &PyramidSplitterParams{
+ SplitterParams: SplitterParams{
+ ChunkerParams: ChunkerParams{
+ chunkSize: chunkSize,
+ hashSize: hashSize,
+ },
+ reader: reader,
+ putter: putter,
+ addr: addr,
+ },
+ getter: getter,
}
}
+/*
+ When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
+ New chunks to store are store using the putter which the caller provides.
+*/
+func PyramidSplit(reader io.Reader, putter Putter, getter Getter) (Address, func(), error) {
+ return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split()
+}
+
+func PyramidAppend(addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(), error) {
+ return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append()
+}
+
// Entry to create a tree node
type TreeEntry struct {
level int
@@ -109,264 +132,250 @@ func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry {
// Used by the hash processor to create a data/tree chunk and send to storage
type chunkJob struct {
- key Key
- chunk []byte
- size int64
- parentWg *sync.WaitGroup
- chunkType int // used to identify the tree related chunks for debugging
- chunkLvl int // leaf-1 is level 0 and goes upwards until it reaches root
+ key Address
+ chunk []byte
+ parentWg *sync.WaitGroup
}
type PyramidChunker struct {
- hashFunc SwarmHasher
chunkSize int64
hashSize int64
branches int64
+ reader io.Reader
+ putter Putter
+ getter Getter
+ key Address
workerCount int64
workerLock sync.RWMutex
+ jobC chan *chunkJob
+ wg *sync.WaitGroup
+ errC chan error
+ quitC chan bool
+ rootKey []byte
+ chunkLevel [][]*TreeEntry
}
-func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
- self = &PyramidChunker{}
- self.hashFunc = MakeHashFunc(params.Hash)
- self.branches = params.Branches
- self.hashSize = int64(self.hashFunc().Size())
- self.chunkSize = self.hashSize * self.branches
- self.workerCount = 0
+func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) {
+ pc = &PyramidChunker{}
+ pc.reader = params.reader
+ pc.hashSize = params.hashSize
+ pc.branches = params.chunkSize / pc.hashSize
+ pc.chunkSize = pc.hashSize * pc.branches
+ pc.putter = params.putter
+ pc.getter = params.getter
+ pc.key = params.addr
+ pc.workerCount = 0
+ pc.jobC = make(chan *chunkJob, 2*ChunkProcessors)
+ pc.wg = &sync.WaitGroup{}
+ pc.errC = make(chan error)
+ pc.quitC = make(chan bool)
+ pc.rootKey = make([]byte, pc.hashSize)
+ pc.chunkLevel = make([][]*TreeEntry, pc.branches)
return
}
-func (self *PyramidChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
+func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader {
return &LazyChunkReader{
- key: key,
- chunkC: chunkC,
- chunkSize: self.chunkSize,
- branches: self.branches,
- hashSize: self.hashSize,
+ key: addr,
+ depth: depth,
+ chunkSize: pc.chunkSize,
+ branches: pc.branches,
+ hashSize: pc.hashSize,
+ getter: getter,
}
}
-func (self *PyramidChunker) incrementWorkerCount() {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- self.workerCount += 1
+func (pc *PyramidChunker) incrementWorkerCount() {
+ pc.workerLock.Lock()
+ defer pc.workerLock.Unlock()
+ pc.workerCount += 1
}
-func (self *PyramidChunker) getWorkerCount() int64 {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- return self.workerCount
+func (pc *PyramidChunker) getWorkerCount() int64 {
+ pc.workerLock.Lock()
+ defer pc.workerLock.Unlock()
+ return pc.workerCount
}
-func (self *PyramidChunker) decrementWorkerCount() {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- self.workerCount -= 1
+func (pc *PyramidChunker) decrementWorkerCount() {
+ pc.workerLock.Lock()
+ defer pc.workerLock.Unlock()
+ pc.workerCount -= 1
}
-func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) {
- jobC := make(chan *chunkJob, 2*ChunkProcessors)
- wg := &sync.WaitGroup{}
- errC := make(chan error)
- quitC := make(chan bool)
- rootKey := make([]byte, self.hashSize)
- chunkLevel := make([][]*TreeEntry, self.branches)
+func (pc *PyramidChunker) Split() (k Address, wait func(), err error) {
+ log.Debug("pyramid.chunker: Split()")
- wg.Add(1)
- go self.prepareChunks(false, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG)
+ pc.wg.Add(1)
+ pc.prepareChunks(false)
// closes internal error channel if all subprocesses in the workgroup finished
go func() {
// waiting for all chunks to finish
- wg.Wait()
+ pc.wg.Wait()
- // if storage waitgroup is non-nil, we wait for storage to finish too
- if storageWG != nil {
- storageWG.Wait()
- }
//We close errC here because this is passed down to 8 parallel routines underneath.
// if a error happens in one of them.. that particular routine raises error...
// once they all complete successfully, the control comes back and we can safely close this here.
- close(errC)
+ close(pc.errC)
}()
- defer close(quitC)
+ defer close(pc.quitC)
+ defer pc.putter.Close()
select {
- case err := <-errC:
+ case err := <-pc.errC:
if err != nil {
- return nil, err
+ return nil, nil, err
}
case <-time.NewTimer(splitTimeout).C:
}
- return rootKey, nil
+ return pc.rootKey, pc.putter.Wait, nil
}
-func (self *PyramidChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) {
- quitC := make(chan bool)
- rootKey := make([]byte, self.hashSize)
- chunkLevel := make([][]*TreeEntry, self.branches)
-
+func (pc *PyramidChunker) Append() (k Address, wait func(), err error) {
+ log.Debug("pyramid.chunker: Append()")
// Load the right most unfinished tree chunks in every level
- self.loadTree(chunkLevel, key, chunkC, quitC)
+ pc.loadTree()
- jobC := make(chan *chunkJob, 2*ChunkProcessors)
- wg := &sync.WaitGroup{}
- errC := make(chan error)
-
- wg.Add(1)
- go self.prepareChunks(true, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG)
+ pc.wg.Add(1)
+ pc.prepareChunks(true)
// closes internal error channel if all subprocesses in the workgroup finished
go func() {
// waiting for all chunks to finish
- wg.Wait()
+ pc.wg.Wait()
- // if storage waitgroup is non-nil, we wait for storage to finish too
- if storageWG != nil {
- storageWG.Wait()
- }
- close(errC)
+ close(pc.errC)
}()
- defer close(quitC)
+ defer close(pc.quitC)
+ defer pc.putter.Close()
select {
- case err := <-errC:
+ case err := <-pc.errC:
if err != nil {
- return nil, err
+ return nil, nil, err
}
case <-time.NewTimer(splitTimeout).C:
}
- return rootKey, nil
-}
+ return pc.rootKey, pc.putter.Wait, nil
-func (self *PyramidChunker) processor(id int64, jobC chan *chunkJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
- defer self.decrementWorkerCount()
+}
- hasher := self.hashFunc()
- if wwg != nil {
- defer wwg.Done()
- }
+func (pc *PyramidChunker) processor(id int64) {
+ defer pc.decrementWorkerCount()
for {
select {
- case job, ok := <-jobC:
+ case job, ok := <-pc.jobC:
if !ok {
return
}
- self.processChunk(id, hasher, job, chunkC, swg)
- case <-quitC:
+ pc.processChunk(id, job)
+ case <-pc.quitC:
return
}
}
}
-func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
- hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
- hasher.Write(job.chunk[8:]) // minus 8 []byte length
- h := hasher.Sum(nil)
+func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) {
+ log.Debug("pyramid.chunker: processChunk()", "id", id)
- newChunk := &Chunk{
- Key: h,
- SData: job.chunk,
- Size: job.size,
- wg: swg,
+ ref, err := pc.putter.Put(job.chunk)
+ if err != nil {
+ pc.errC <- err
}
// report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
- copy(job.key, h)
+ copy(job.key, ref)
// send off new chunk to storage
- if chunkC != nil {
- if swg != nil {
- swg.Add(1)
- }
- }
job.parentWg.Done()
-
- if chunkC != nil {
- chunkC <- newChunk
- }
}
-func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC chan *Chunk, quitC chan bool) error {
+func (pc *PyramidChunker) loadTree() error {
+ log.Debug("pyramid.chunker: loadTree()")
// Get the root chunk to get the total size
- chunk := retrieve(key, chunkC, quitC)
- if chunk == nil {
+ chunkData, err := pc.getter.Get(Reference(pc.key))
+ if err != nil {
return errLoadingTreeRootChunk
}
+ chunkSize := chunkData.Size()
+ log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize)
//if data size is less than a chunk... add a parent with update as pending
- if chunk.Size <= self.chunkSize {
+ if chunkSize <= pc.chunkSize {
newEntry := &TreeEntry{
level: 0,
branchCount: 1,
- subtreeSize: uint64(chunk.Size),
- chunk: make([]byte, self.chunkSize+8),
- key: make([]byte, self.hashSize),
+ subtreeSize: uint64(chunkSize),
+ chunk: make([]byte, pc.chunkSize+8),
+ key: make([]byte, pc.hashSize),
index: 0,
updatePending: true,
}
- copy(newEntry.chunk[8:], chunk.Key)
- chunkLevel[0] = append(chunkLevel[0], newEntry)
+ copy(newEntry.chunk[8:], pc.key)
+ pc.chunkLevel[0] = append(pc.chunkLevel[0], newEntry)
return nil
}
var treeSize int64
var depth int
- treeSize = self.chunkSize
- for ; treeSize < chunk.Size; treeSize *= self.branches {
+ treeSize = pc.chunkSize
+ for ; treeSize < chunkSize; treeSize *= pc.branches {
depth++
}
+ log.Trace("pyramid.chunker", "depth", depth)
// Add the root chunk entry
- branchCount := int64(len(chunk.SData)-8) / self.hashSize
+ branchCount := int64(len(chunkData)-8) / pc.hashSize
newEntry := &TreeEntry{
level: depth - 1,
branchCount: branchCount,
- subtreeSize: uint64(chunk.Size),
- chunk: chunk.SData,
- key: key,
+ subtreeSize: uint64(chunkSize),
+ chunk: chunkData,
+ key: pc.key,
index: 0,
updatePending: true,
}
- chunkLevel[depth-1] = append(chunkLevel[depth-1], newEntry)
+ pc.chunkLevel[depth-1] = append(pc.chunkLevel[depth-1], newEntry)
// Add the rest of the tree
for lvl := depth - 1; lvl >= 1; lvl-- {
//TODO(jmozah): instead of loading finished branches and then trim in the end,
//avoid loading them in the first place
- for _, ent := range chunkLevel[lvl] {
- branchCount = int64(len(ent.chunk)-8) / self.hashSize
+ for _, ent := range pc.chunkLevel[lvl] {
+ branchCount = int64(len(ent.chunk)-8) / pc.hashSize
for i := int64(0); i < branchCount; i++ {
- key := ent.chunk[8+(i*self.hashSize) : 8+((i+1)*self.hashSize)]
- newChunk := retrieve(key, chunkC, quitC)
- if newChunk == nil {
+ key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)]
+ newChunkData, err := pc.getter.Get(Reference(key))
+ if err != nil {
return errLoadingTreeChunk
}
- bewBranchCount := int64(len(newChunk.SData)-8) / self.hashSize
+ newChunkSize := newChunkData.Size()
+ bewBranchCount := int64(len(newChunkData)-8) / pc.hashSize
newEntry := &TreeEntry{
level: lvl - 1,
branchCount: bewBranchCount,
- subtreeSize: uint64(newChunk.Size),
- chunk: newChunk.SData,
+ subtreeSize: uint64(newChunkSize),
+ chunk: newChunkData,
key: key,
index: 0,
updatePending: true,
}
- chunkLevel[lvl-1] = append(chunkLevel[lvl-1], newEntry)
+ pc.chunkLevel[lvl-1] = append(pc.chunkLevel[lvl-1], newEntry)
}
// We need to get only the right most unfinished branch.. so trim all finished branches
- if int64(len(chunkLevel[lvl-1])) >= self.branches {
- chunkLevel[lvl-1] = nil
+ if int64(len(pc.chunkLevel[lvl-1])) >= pc.branches {
+ pc.chunkLevel[lvl-1] = nil
}
}
}
@@ -374,29 +383,25 @@ func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC
return nil
}
-func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEntry, data io.Reader, rootKey []byte, quitC chan bool, wg *sync.WaitGroup, jobC chan *chunkJob, processorWG *sync.WaitGroup, chunkC chan *Chunk, errC chan error, storageWG *sync.WaitGroup) {
- defer wg.Done()
+func (pc *PyramidChunker) prepareChunks(isAppend bool) {
+ log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend)
+ defer pc.wg.Done()
chunkWG := &sync.WaitGroup{}
- totalDataSize := 0
- // processorWG keeps track of workers spawned for hashing chunks
- if processorWG != nil {
- processorWG.Add(1)
- }
-
- self.incrementWorkerCount()
- go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG)
+ pc.incrementWorkerCount()
- parent := NewTreeEntry(self)
- var unFinishedChunk *Chunk
+ go pc.processor(pc.workerCount)
- if isAppend && len(chunkLevel[0]) != 0 {
+ parent := NewTreeEntry(pc)
+ var unfinishedChunkData ChunkData
+ var unfinishedChunkSize int64
- lastIndex := len(chunkLevel[0]) - 1
- ent := chunkLevel[0][lastIndex]
+ if isAppend && len(pc.chunkLevel[0]) != 0 {
+ lastIndex := len(pc.chunkLevel[0]) - 1
+ ent := pc.chunkLevel[0][lastIndex]
- if ent.branchCount < self.branches {
+ if ent.branchCount < pc.branches {
parent = &TreeEntry{
level: 0,
branchCount: ent.branchCount,
@@ -408,104 +413,132 @@ func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEnt
}
lastBranch := parent.branchCount - 1
- lastKey := parent.chunk[8+lastBranch*self.hashSize : 8+(lastBranch+1)*self.hashSize]
-
- unFinishedChunk = retrieve(lastKey, chunkC, quitC)
- if unFinishedChunk.Size < self.chunkSize {
+ lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize]
- parent.subtreeSize = parent.subtreeSize - uint64(unFinishedChunk.Size)
+ var err error
+ unfinishedChunkData, err = pc.getter.Get(lastKey)
+ if err != nil {
+ pc.errC <- err
+ }
+ unfinishedChunkSize = unfinishedChunkData.Size()
+ if unfinishedChunkSize < pc.chunkSize {
+ parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize)
parent.branchCount = parent.branchCount - 1
} else {
- unFinishedChunk = nil
+ unfinishedChunkData = nil
}
}
}
for index := 0; ; index++ {
-
- var n int
var err error
- chunkData := make([]byte, self.chunkSize+8)
- if unFinishedChunk != nil {
- copy(chunkData, unFinishedChunk.SData)
- n, err = data.Read(chunkData[8+unFinishedChunk.Size:])
- n += int(unFinishedChunk.Size)
- unFinishedChunk = nil
- } else {
- n, err = data.Read(chunkData[8:])
+ chunkData := make([]byte, pc.chunkSize+8)
+
+ var readBytes int
+
+ if unfinishedChunkData != nil {
+ copy(chunkData, unfinishedChunkData)
+ readBytes += int(unfinishedChunkSize)
+ unfinishedChunkData = nil
+ log.Trace("pyramid.chunker: found unfinished chunk", "readBytes", readBytes)
+ }
+
+ var res []byte
+ res, err = ioutil.ReadAll(io.LimitReader(pc.reader, int64(len(chunkData)-(8+readBytes))))
+
+ // hack for ioutil.ReadAll:
+ // a successful call to ioutil.ReadAll returns err == nil, not err == EOF, whereas we
+ // want to propagate the io.EOF error
+ if len(res) == 0 && err == nil {
+ err = io.EOF
}
+ copy(chunkData[8+readBytes:], res)
+
+ readBytes += len(res)
+ log.Trace("pyramid.chunker: copied all data", "readBytes", readBytes)
- totalDataSize += n
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
- if parent.branchCount == 1 {
+
+ pc.cleanChunkLevels()
+
+ // Check if we are appending or the chunk is the only one.
+ if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) {
// Data is exactly one chunk.. pick the last chunk key as root
chunkWG.Wait()
- lastChunksKey := parent.chunk[8 : 8+self.hashSize]
- copy(rootKey, lastChunksKey)
+ lastChunksKey := parent.chunk[8 : 8+pc.hashSize]
+ copy(pc.rootKey, lastChunksKey)
break
}
} else {
- close(quitC)
+ close(pc.quitC)
break
}
}
// Data ended in chunk boundary.. just signal to start bulding tree
- if n == 0 {
- self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey)
+ if readBytes == 0 {
+ pc.buildTree(isAppend, parent, chunkWG, true, nil)
break
} else {
-
- pkey := self.enqueueDataChunk(chunkData, uint64(n), parent, chunkWG, jobC, quitC)
+ pkey := pc.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG)
// update tree related parent data structures
- parent.subtreeSize += uint64(n)
+ parent.subtreeSize += uint64(readBytes)
parent.branchCount++
// Data got exhausted... signal to send any parent tree related chunks
- if int64(n) < self.chunkSize {
+ if int64(readBytes) < pc.chunkSize {
+
+ pc.cleanChunkLevels()
// only one data chunk .. so dont add any parent chunk
if parent.branchCount <= 1 {
chunkWG.Wait()
- copy(rootKey, pkey)
+
+ if isAppend || pc.depth() == 0 {
+ // No need to build the tree if the depth is 0
+ // or we are appending.
+ // Just use the last key.
+ copy(pc.rootKey, pkey)
+ } else {
+ // We need to build the tree and and provide the lonely
+ // chunk key to replace the last tree chunk key.
+ pc.buildTree(isAppend, parent, chunkWG, true, pkey)
+ }
break
}
- self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey)
+ pc.buildTree(isAppend, parent, chunkWG, true, nil)
break
}
- if parent.branchCount == self.branches {
- self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, false, rootKey)
- parent = NewTreeEntry(self)
+ if parent.branchCount == pc.branches {
+ pc.buildTree(isAppend, parent, chunkWG, false, nil)
+ parent = NewTreeEntry(pc)
}
}
- workers := self.getWorkerCount()
- if int64(len(jobC)) > workers && workers < ChunkProcessors {
- if processorWG != nil {
- processorWG.Add(1)
- }
- self.incrementWorkerCount()
- go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG)
+ workers := pc.getWorkerCount()
+ if int64(len(pc.jobC)) > workers && workers < ChunkProcessors {
+ pc.incrementWorkerCount()
+ go pc.processor(pc.workerCount)
}
}
}
-func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool, rootKey []byte) {
+func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync.WaitGroup, last bool, lonelyChunkKey []byte) {
chunkWG.Wait()
- self.enqueueTreeChunk(chunkLevel, ent, chunkWG, jobC, quitC, last)
+ pc.enqueueTreeChunk(ent, chunkWG, last)
compress := false
- endLvl := self.branches
- for lvl := int64(0); lvl < self.branches; lvl++ {
- lvlCount := int64(len(chunkLevel[lvl]))
- if lvlCount >= self.branches {
+ endLvl := pc.branches
+ for lvl := int64(0); lvl < pc.branches; lvl++ {
+ lvlCount := int64(len(pc.chunkLevel[lvl]))
+ if lvlCount >= pc.branches {
endLvl = lvl + 1
compress = true
break
@@ -521,42 +554,42 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry,
for lvl := int64(ent.level); lvl < endLvl; lvl++ {
- lvlCount := int64(len(chunkLevel[lvl]))
+ lvlCount := int64(len(pc.chunkLevel[lvl]))
if lvlCount == 1 && last {
- copy(rootKey, chunkLevel[lvl][0].key)
+ copy(pc.rootKey, pc.chunkLevel[lvl][0].key)
return
}
- for startCount := int64(0); startCount < lvlCount; startCount += self.branches {
+ for startCount := int64(0); startCount < lvlCount; startCount += pc.branches {
- endCount := startCount + self.branches
+ endCount := startCount + pc.branches
if endCount > lvlCount {
endCount = lvlCount
}
var nextLvlCount int64
var tempEntry *TreeEntry
- if len(chunkLevel[lvl+1]) > 0 {
- nextLvlCount = int64(len(chunkLevel[lvl+1]) - 1)
- tempEntry = chunkLevel[lvl+1][nextLvlCount]
+ if len(pc.chunkLevel[lvl+1]) > 0 {
+ nextLvlCount = int64(len(pc.chunkLevel[lvl+1]) - 1)
+ tempEntry = pc.chunkLevel[lvl+1][nextLvlCount]
}
if isAppend && tempEntry != nil && tempEntry.updatePending {
updateEntry := &TreeEntry{
level: int(lvl + 1),
branchCount: 0,
subtreeSize: 0,
- chunk: make([]byte, self.chunkSize+8),
- key: make([]byte, self.hashSize),
+ chunk: make([]byte, pc.chunkSize+8),
+ key: make([]byte, pc.hashSize),
index: int(nextLvlCount),
updatePending: true,
}
for index := int64(0); index < lvlCount; index++ {
updateEntry.branchCount++
- updateEntry.subtreeSize += chunkLevel[lvl][index].subtreeSize
- copy(updateEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], chunkLevel[lvl][index].key[:self.hashSize])
+ updateEntry.subtreeSize += pc.chunkLevel[lvl][index].subtreeSize
+ copy(updateEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], pc.chunkLevel[lvl][index].key[:pc.hashSize])
}
- self.enqueueTreeChunk(chunkLevel, updateEntry, chunkWG, jobC, quitC, last)
+ pc.enqueueTreeChunk(updateEntry, chunkWG, last)
} else {
@@ -565,21 +598,27 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry,
level: int(lvl + 1),
branchCount: noOfBranches,
subtreeSize: 0,
- chunk: make([]byte, (noOfBranches*self.hashSize)+8),
- key: make([]byte, self.hashSize),
+ chunk: make([]byte, (noOfBranches*pc.hashSize)+8),
+ key: make([]byte, pc.hashSize),
index: int(nextLvlCount),
updatePending: false,
}
index := int64(0)
for i := startCount; i < endCount; i++ {
- entry := chunkLevel[lvl][i]
+ entry := pc.chunkLevel[lvl][i]
newEntry.subtreeSize += entry.subtreeSize
- copy(newEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], entry.key[:self.hashSize])
+ copy(newEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], entry.key[:pc.hashSize])
index++
}
+ // Lonely chunk key is the key of the last chunk that is only one on the last branch.
+ // In this case, ignore the its tree chunk key and replace it with the lonely chunk key.
+ if lonelyChunkKey != nil {
+ // Overwrite the last tree chunk key with the lonely data chunk key.
+ copy(newEntry.chunk[int64(len(newEntry.chunk))-pc.hashSize:], lonelyChunkKey[:pc.hashSize])
+ }
- self.enqueueTreeChunk(chunkLevel, newEntry, chunkWG, jobC, quitC, last)
+ pc.enqueueTreeChunk(newEntry, chunkWG, last)
}
@@ -588,15 +627,15 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry,
if !isAppend {
chunkWG.Wait()
if compress {
- chunkLevel[lvl] = nil
+ pc.chunkLevel[lvl] = nil
}
}
}
}
-func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool) {
- if ent != nil {
+func (pc *PyramidChunker) enqueueTreeChunk(ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) {
+ if ent != nil && ent.branchCount > 0 {
// wait for data chunks to get over before processing the tree chunk
if last {
@@ -604,34 +643,57 @@ func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *Tre
}
binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize)
- ent.key = make([]byte, self.hashSize)
+ ent.key = make([]byte, pc.hashSize)
chunkWG.Add(1)
select {
- case jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*self.hashSize+8], int64(ent.subtreeSize), chunkWG, TreeChunk, 0}:
- case <-quitC:
+ case pc.jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*pc.hashSize+8], chunkWG}:
+ case <-pc.quitC:
}
// Update or append based on weather it is a new entry or being reused
if ent.updatePending {
chunkWG.Wait()
- chunkLevel[ent.level][ent.index] = ent
+ pc.chunkLevel[ent.level][ent.index] = ent
} else {
- chunkLevel[ent.level] = append(chunkLevel[ent.level], ent)
+ pc.chunkLevel[ent.level] = append(pc.chunkLevel[ent.level], ent)
}
}
}
-func (self *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool) Key {
+func (pc *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup) Address {
binary.LittleEndian.PutUint64(chunkData[:8], size)
- pkey := parent.chunk[8+parent.branchCount*self.hashSize : 8+(parent.branchCount+1)*self.hashSize]
+ pkey := parent.chunk[8+parent.branchCount*pc.hashSize : 8+(parent.branchCount+1)*pc.hashSize]
chunkWG.Add(1)
select {
- case jobC <- &chunkJob{pkey, chunkData[:size+8], int64(size), chunkWG, DataChunk, -1}:
- case <-quitC:
+ case pc.jobC <- &chunkJob{pkey, chunkData[:size+8], chunkWG}:
+ case <-pc.quitC:
}
return pkey
}
+
+// depth returns the number of chunk levels.
+// It is used to detect if there is only one data chunk
+// left for the last branch.
+func (pc *PyramidChunker) depth() (d int) {
+ for _, l := range pc.chunkLevel {
+ if l == nil {
+ return
+ }
+ d++
+ }
+ return
+}
+
+// cleanChunkLevels removes gaps (nil levels) between chunk levels
+// that are not nil.
+func (pc *PyramidChunker) cleanChunkLevels() {
+ for i, l := range pc.chunkLevel {
+ if l == nil {
+ pc.chunkLevel = append(pc.chunkLevel[:i], append(pc.chunkLevel[i+1:], nil)...)
+ }
+ }
+}