diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/pyramid.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r-- | swarm/storage/pyramid.go | 520 |
1 files changed, 291 insertions, 229 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 19d493405..01172cb77 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -20,8 +20,11 @@ import ( "encoding/binary" "errors" "io" + "io/ioutil" "sync" "time" + + "github.com/ethereum/go-ethereum/swarm/log" ) /* @@ -62,9 +65,8 @@ var ( ) const ( - ChunkProcessors = 8 - DefaultBranches int64 = 128 - splitTimeout = time.Minute * 5 + ChunkProcessors = 8 + splitTimeout = time.Minute * 5 ) const ( @@ -72,18 +74,39 @@ const ( TreeChunk = 1 ) -type ChunkerParams struct { - Branches int64 - Hash string +type PyramidSplitterParams struct { + SplitterParams + getter Getter } -func NewChunkerParams() *ChunkerParams { - return &ChunkerParams{ - Branches: DefaultBranches, - Hash: SHA3Hash, +func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, getter Getter, chunkSize int64) *PyramidSplitterParams { + hashSize := putter.RefSize() + return &PyramidSplitterParams{ + SplitterParams: SplitterParams{ + ChunkerParams: ChunkerParams{ + chunkSize: chunkSize, + hashSize: hashSize, + }, + reader: reader, + putter: putter, + addr: addr, + }, + getter: getter, } } +/* + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + New chunks to store are store using the putter which the caller provides. +*/ +func PyramidSplit(reader io.Reader, putter Putter, getter Getter) (Address, func(), error) { + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split() +} + +func PyramidAppend(addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(), error) { + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append() +} + // Entry to create a tree node type TreeEntry struct { level int @@ -109,264 +132,250 @@ func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry { // Used by the hash processor to create a data/tree chunk and send to storage type chunkJob struct { - key Key - chunk []byte - size int64 - parentWg *sync.WaitGroup - chunkType int // used to identify the tree related chunks for debugging - chunkLvl int // leaf-1 is level 0 and goes upwards until it reaches root + key Address + chunk []byte + parentWg *sync.WaitGroup } type PyramidChunker struct { - hashFunc SwarmHasher chunkSize int64 hashSize int64 branches int64 + reader io.Reader + putter Putter + getter Getter + key Address workerCount int64 workerLock sync.RWMutex + jobC chan *chunkJob + wg *sync.WaitGroup + errC chan error + quitC chan bool + rootKey []byte + chunkLevel [][]*TreeEntry } -func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) { - self = &PyramidChunker{} - self.hashFunc = MakeHashFunc(params.Hash) - self.branches = params.Branches - self.hashSize = int64(self.hashFunc().Size()) - self.chunkSize = self.hashSize * self.branches - self.workerCount = 0 +func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { + pc = &PyramidChunker{} + pc.reader = params.reader + pc.hashSize = params.hashSize + pc.branches = params.chunkSize / pc.hashSize + pc.chunkSize = pc.hashSize * pc.branches + pc.putter = params.putter + pc.getter = params.getter + pc.key = params.addr + pc.workerCount = 0 + pc.jobC = make(chan *chunkJob, 2*ChunkProcessors) + pc.wg = &sync.WaitGroup{} + pc.errC = make(chan error) + pc.quitC = make(chan bool) + pc.rootKey = make([]byte, pc.hashSize) + pc.chunkLevel = make([][]*TreeEntry, pc.branches) return } -func (self *PyramidChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader { +func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader { return &LazyChunkReader{ - key: key, - chunkC: chunkC, - chunkSize: self.chunkSize, - branches: self.branches, - hashSize: self.hashSize, + key: addr, + depth: depth, + chunkSize: pc.chunkSize, + branches: pc.branches, + hashSize: pc.hashSize, + getter: getter, } } -func (self *PyramidChunker) incrementWorkerCount() { - self.workerLock.Lock() - defer self.workerLock.Unlock() - self.workerCount += 1 +func (pc *PyramidChunker) incrementWorkerCount() { + pc.workerLock.Lock() + defer pc.workerLock.Unlock() + pc.workerCount += 1 } -func (self *PyramidChunker) getWorkerCount() int64 { - self.workerLock.Lock() - defer self.workerLock.Unlock() - return self.workerCount +func (pc *PyramidChunker) getWorkerCount() int64 { + pc.workerLock.Lock() + defer pc.workerLock.Unlock() + return pc.workerCount } -func (self *PyramidChunker) decrementWorkerCount() { - self.workerLock.Lock() - defer self.workerLock.Unlock() - self.workerCount -= 1 +func (pc *PyramidChunker) decrementWorkerCount() { + pc.workerLock.Lock() + defer pc.workerLock.Unlock() + pc.workerCount -= 1 } -func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) { - jobC := make(chan *chunkJob, 2*ChunkProcessors) - wg := &sync.WaitGroup{} - errC := make(chan error) - quitC := make(chan bool) - rootKey := make([]byte, self.hashSize) - chunkLevel := make([][]*TreeEntry, self.branches) +func (pc *PyramidChunker) Split() (k Address, wait func(), err error) { + log.Debug("pyramid.chunker: Split()") - wg.Add(1) - go self.prepareChunks(false, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG) + pc.wg.Add(1) + pc.prepareChunks(false) // closes internal error channel if all subprocesses in the workgroup finished go func() { // waiting for all chunks to finish - wg.Wait() + pc.wg.Wait() - // if storage waitgroup is non-nil, we wait for storage to finish too - if storageWG != nil { - storageWG.Wait() - } //We close errC here because this is passed down to 8 parallel routines underneath. // if a error happens in one of them.. that particular routine raises error... // once they all complete successfully, the control comes back and we can safely close this here. - close(errC) + close(pc.errC) }() - defer close(quitC) + defer close(pc.quitC) + defer pc.putter.Close() select { - case err := <-errC: + case err := <-pc.errC: if err != nil { - return nil, err + return nil, nil, err } case <-time.NewTimer(splitTimeout).C: } - return rootKey, nil + return pc.rootKey, pc.putter.Wait, nil } -func (self *PyramidChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) { - quitC := make(chan bool) - rootKey := make([]byte, self.hashSize) - chunkLevel := make([][]*TreeEntry, self.branches) - +func (pc *PyramidChunker) Append() (k Address, wait func(), err error) { + log.Debug("pyramid.chunker: Append()") // Load the right most unfinished tree chunks in every level - self.loadTree(chunkLevel, key, chunkC, quitC) + pc.loadTree() - jobC := make(chan *chunkJob, 2*ChunkProcessors) - wg := &sync.WaitGroup{} - errC := make(chan error) - - wg.Add(1) - go self.prepareChunks(true, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG) + pc.wg.Add(1) + pc.prepareChunks(true) // closes internal error channel if all subprocesses in the workgroup finished go func() { // waiting for all chunks to finish - wg.Wait() + pc.wg.Wait() - // if storage waitgroup is non-nil, we wait for storage to finish too - if storageWG != nil { - storageWG.Wait() - } - close(errC) + close(pc.errC) }() - defer close(quitC) + defer close(pc.quitC) + defer pc.putter.Close() select { - case err := <-errC: + case err := <-pc.errC: if err != nil { - return nil, err + return nil, nil, err } case <-time.NewTimer(splitTimeout).C: } - return rootKey, nil -} + return pc.rootKey, pc.putter.Wait, nil -func (self *PyramidChunker) processor(id int64, jobC chan *chunkJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) { - defer self.decrementWorkerCount() +} - hasher := self.hashFunc() - if wwg != nil { - defer wwg.Done() - } +func (pc *PyramidChunker) processor(id int64) { + defer pc.decrementWorkerCount() for { select { - case job, ok := <-jobC: + case job, ok := <-pc.jobC: if !ok { return } - self.processChunk(id, hasher, job, chunkC, swg) - case <-quitC: + pc.processChunk(id, job) + case <-pc.quitC: return } } } -func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJob, chunkC chan *Chunk, swg *sync.WaitGroup) { - hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length - hasher.Write(job.chunk[8:]) // minus 8 []byte length - h := hasher.Sum(nil) +func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { + log.Debug("pyramid.chunker: processChunk()", "id", id) - newChunk := &Chunk{ - Key: h, - SData: job.chunk, - Size: job.size, - wg: swg, + ref, err := pc.putter.Put(job.chunk) + if err != nil { + pc.errC <- err } // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) - copy(job.key, h) + copy(job.key, ref) // send off new chunk to storage - if chunkC != nil { - if swg != nil { - swg.Add(1) - } - } job.parentWg.Done() - - if chunkC != nil { - chunkC <- newChunk - } } -func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC chan *Chunk, quitC chan bool) error { +func (pc *PyramidChunker) loadTree() error { + log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunk := retrieve(key, chunkC, quitC) - if chunk == nil { + chunkData, err := pc.getter.Get(Reference(pc.key)) + if err != nil { return errLoadingTreeRootChunk } + chunkSize := chunkData.Size() + log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize) //if data size is less than a chunk... add a parent with update as pending - if chunk.Size <= self.chunkSize { + if chunkSize <= pc.chunkSize { newEntry := &TreeEntry{ level: 0, branchCount: 1, - subtreeSize: uint64(chunk.Size), - chunk: make([]byte, self.chunkSize+8), - key: make([]byte, self.hashSize), + subtreeSize: uint64(chunkSize), + chunk: make([]byte, pc.chunkSize+8), + key: make([]byte, pc.hashSize), index: 0, updatePending: true, } - copy(newEntry.chunk[8:], chunk.Key) - chunkLevel[0] = append(chunkLevel[0], newEntry) + copy(newEntry.chunk[8:], pc.key) + pc.chunkLevel[0] = append(pc.chunkLevel[0], newEntry) return nil } var treeSize int64 var depth int - treeSize = self.chunkSize - for ; treeSize < chunk.Size; treeSize *= self.branches { + treeSize = pc.chunkSize + for ; treeSize < chunkSize; treeSize *= pc.branches { depth++ } + log.Trace("pyramid.chunker", "depth", depth) // Add the root chunk entry - branchCount := int64(len(chunk.SData)-8) / self.hashSize + branchCount := int64(len(chunkData)-8) / pc.hashSize newEntry := &TreeEntry{ level: depth - 1, branchCount: branchCount, - subtreeSize: uint64(chunk.Size), - chunk: chunk.SData, - key: key, + subtreeSize: uint64(chunkSize), + chunk: chunkData, + key: pc.key, index: 0, updatePending: true, } - chunkLevel[depth-1] = append(chunkLevel[depth-1], newEntry) + pc.chunkLevel[depth-1] = append(pc.chunkLevel[depth-1], newEntry) // Add the rest of the tree for lvl := depth - 1; lvl >= 1; lvl-- { //TODO(jmozah): instead of loading finished branches and then trim in the end, //avoid loading them in the first place - for _, ent := range chunkLevel[lvl] { - branchCount = int64(len(ent.chunk)-8) / self.hashSize + for _, ent := range pc.chunkLevel[lvl] { + branchCount = int64(len(ent.chunk)-8) / pc.hashSize for i := int64(0); i < branchCount; i++ { - key := ent.chunk[8+(i*self.hashSize) : 8+((i+1)*self.hashSize)] - newChunk := retrieve(key, chunkC, quitC) - if newChunk == nil { + key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] + newChunkData, err := pc.getter.Get(Reference(key)) + if err != nil { return errLoadingTreeChunk } - bewBranchCount := int64(len(newChunk.SData)-8) / self.hashSize + newChunkSize := newChunkData.Size() + bewBranchCount := int64(len(newChunkData)-8) / pc.hashSize newEntry := &TreeEntry{ level: lvl - 1, branchCount: bewBranchCount, - subtreeSize: uint64(newChunk.Size), - chunk: newChunk.SData, + subtreeSize: uint64(newChunkSize), + chunk: newChunkData, key: key, index: 0, updatePending: true, } - chunkLevel[lvl-1] = append(chunkLevel[lvl-1], newEntry) + pc.chunkLevel[lvl-1] = append(pc.chunkLevel[lvl-1], newEntry) } // We need to get only the right most unfinished branch.. so trim all finished branches - if int64(len(chunkLevel[lvl-1])) >= self.branches { - chunkLevel[lvl-1] = nil + if int64(len(pc.chunkLevel[lvl-1])) >= pc.branches { + pc.chunkLevel[lvl-1] = nil } } } @@ -374,29 +383,25 @@ func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC return nil } -func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEntry, data io.Reader, rootKey []byte, quitC chan bool, wg *sync.WaitGroup, jobC chan *chunkJob, processorWG *sync.WaitGroup, chunkC chan *Chunk, errC chan error, storageWG *sync.WaitGroup) { - defer wg.Done() +func (pc *PyramidChunker) prepareChunks(isAppend bool) { + log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend) + defer pc.wg.Done() chunkWG := &sync.WaitGroup{} - totalDataSize := 0 - // processorWG keeps track of workers spawned for hashing chunks - if processorWG != nil { - processorWG.Add(1) - } - - self.incrementWorkerCount() - go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG) + pc.incrementWorkerCount() - parent := NewTreeEntry(self) - var unFinishedChunk *Chunk + go pc.processor(pc.workerCount) - if isAppend && len(chunkLevel[0]) != 0 { + parent := NewTreeEntry(pc) + var unfinishedChunkData ChunkData + var unfinishedChunkSize int64 - lastIndex := len(chunkLevel[0]) - 1 - ent := chunkLevel[0][lastIndex] + if isAppend && len(pc.chunkLevel[0]) != 0 { + lastIndex := len(pc.chunkLevel[0]) - 1 + ent := pc.chunkLevel[0][lastIndex] - if ent.branchCount < self.branches { + if ent.branchCount < pc.branches { parent = &TreeEntry{ level: 0, branchCount: ent.branchCount, @@ -408,104 +413,132 @@ func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEnt } lastBranch := parent.branchCount - 1 - lastKey := parent.chunk[8+lastBranch*self.hashSize : 8+(lastBranch+1)*self.hashSize] - - unFinishedChunk = retrieve(lastKey, chunkC, quitC) - if unFinishedChunk.Size < self.chunkSize { + lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] - parent.subtreeSize = parent.subtreeSize - uint64(unFinishedChunk.Size) + var err error + unfinishedChunkData, err = pc.getter.Get(lastKey) + if err != nil { + pc.errC <- err + } + unfinishedChunkSize = unfinishedChunkData.Size() + if unfinishedChunkSize < pc.chunkSize { + parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize) parent.branchCount = parent.branchCount - 1 } else { - unFinishedChunk = nil + unfinishedChunkData = nil } } } for index := 0; ; index++ { - - var n int var err error - chunkData := make([]byte, self.chunkSize+8) - if unFinishedChunk != nil { - copy(chunkData, unFinishedChunk.SData) - n, err = data.Read(chunkData[8+unFinishedChunk.Size:]) - n += int(unFinishedChunk.Size) - unFinishedChunk = nil - } else { - n, err = data.Read(chunkData[8:]) + chunkData := make([]byte, pc.chunkSize+8) + + var readBytes int + + if unfinishedChunkData != nil { + copy(chunkData, unfinishedChunkData) + readBytes += int(unfinishedChunkSize) + unfinishedChunkData = nil + log.Trace("pyramid.chunker: found unfinished chunk", "readBytes", readBytes) + } + + var res []byte + res, err = ioutil.ReadAll(io.LimitReader(pc.reader, int64(len(chunkData)-(8+readBytes)))) + + // hack for ioutil.ReadAll: + // a successful call to ioutil.ReadAll returns err == nil, not err == EOF, whereas we + // want to propagate the io.EOF error + if len(res) == 0 && err == nil { + err = io.EOF } + copy(chunkData[8+readBytes:], res) + + readBytes += len(res) + log.Trace("pyramid.chunker: copied all data", "readBytes", readBytes) - totalDataSize += n if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { - if parent.branchCount == 1 { + + pc.cleanChunkLevels() + + // Check if we are appending or the chunk is the only one. + if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) { // Data is exactly one chunk.. pick the last chunk key as root chunkWG.Wait() - lastChunksKey := parent.chunk[8 : 8+self.hashSize] - copy(rootKey, lastChunksKey) + lastChunksKey := parent.chunk[8 : 8+pc.hashSize] + copy(pc.rootKey, lastChunksKey) break } } else { - close(quitC) + close(pc.quitC) break } } // Data ended in chunk boundary.. just signal to start bulding tree - if n == 0 { - self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey) + if readBytes == 0 { + pc.buildTree(isAppend, parent, chunkWG, true, nil) break } else { - - pkey := self.enqueueDataChunk(chunkData, uint64(n), parent, chunkWG, jobC, quitC) + pkey := pc.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG) // update tree related parent data structures - parent.subtreeSize += uint64(n) + parent.subtreeSize += uint64(readBytes) parent.branchCount++ // Data got exhausted... signal to send any parent tree related chunks - if int64(n) < self.chunkSize { + if int64(readBytes) < pc.chunkSize { + + pc.cleanChunkLevels() // only one data chunk .. so dont add any parent chunk if parent.branchCount <= 1 { chunkWG.Wait() - copy(rootKey, pkey) + + if isAppend || pc.depth() == 0 { + // No need to build the tree if the depth is 0 + // or we are appending. + // Just use the last key. + copy(pc.rootKey, pkey) + } else { + // We need to build the tree and and provide the lonely + // chunk key to replace the last tree chunk key. + pc.buildTree(isAppend, parent, chunkWG, true, pkey) + } break } - self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey) + pc.buildTree(isAppend, parent, chunkWG, true, nil) break } - if parent.branchCount == self.branches { - self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, false, rootKey) - parent = NewTreeEntry(self) + if parent.branchCount == pc.branches { + pc.buildTree(isAppend, parent, chunkWG, false, nil) + parent = NewTreeEntry(pc) } } - workers := self.getWorkerCount() - if int64(len(jobC)) > workers && workers < ChunkProcessors { - if processorWG != nil { - processorWG.Add(1) - } - self.incrementWorkerCount() - go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG) + workers := pc.getWorkerCount() + if int64(len(pc.jobC)) > workers && workers < ChunkProcessors { + pc.incrementWorkerCount() + go pc.processor(pc.workerCount) } } } -func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool, rootKey []byte) { +func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync.WaitGroup, last bool, lonelyChunkKey []byte) { chunkWG.Wait() - self.enqueueTreeChunk(chunkLevel, ent, chunkWG, jobC, quitC, last) + pc.enqueueTreeChunk(ent, chunkWG, last) compress := false - endLvl := self.branches - for lvl := int64(0); lvl < self.branches; lvl++ { - lvlCount := int64(len(chunkLevel[lvl])) - if lvlCount >= self.branches { + endLvl := pc.branches + for lvl := int64(0); lvl < pc.branches; lvl++ { + lvlCount := int64(len(pc.chunkLevel[lvl])) + if lvlCount >= pc.branches { endLvl = lvl + 1 compress = true break @@ -521,42 +554,42 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, for lvl := int64(ent.level); lvl < endLvl; lvl++ { - lvlCount := int64(len(chunkLevel[lvl])) + lvlCount := int64(len(pc.chunkLevel[lvl])) if lvlCount == 1 && last { - copy(rootKey, chunkLevel[lvl][0].key) + copy(pc.rootKey, pc.chunkLevel[lvl][0].key) return } - for startCount := int64(0); startCount < lvlCount; startCount += self.branches { + for startCount := int64(0); startCount < lvlCount; startCount += pc.branches { - endCount := startCount + self.branches + endCount := startCount + pc.branches if endCount > lvlCount { endCount = lvlCount } var nextLvlCount int64 var tempEntry *TreeEntry - if len(chunkLevel[lvl+1]) > 0 { - nextLvlCount = int64(len(chunkLevel[lvl+1]) - 1) - tempEntry = chunkLevel[lvl+1][nextLvlCount] + if len(pc.chunkLevel[lvl+1]) > 0 { + nextLvlCount = int64(len(pc.chunkLevel[lvl+1]) - 1) + tempEntry = pc.chunkLevel[lvl+1][nextLvlCount] } if isAppend && tempEntry != nil && tempEntry.updatePending { updateEntry := &TreeEntry{ level: int(lvl + 1), branchCount: 0, subtreeSize: 0, - chunk: make([]byte, self.chunkSize+8), - key: make([]byte, self.hashSize), + chunk: make([]byte, pc.chunkSize+8), + key: make([]byte, pc.hashSize), index: int(nextLvlCount), updatePending: true, } for index := int64(0); index < lvlCount; index++ { updateEntry.branchCount++ - updateEntry.subtreeSize += chunkLevel[lvl][index].subtreeSize - copy(updateEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], chunkLevel[lvl][index].key[:self.hashSize]) + updateEntry.subtreeSize += pc.chunkLevel[lvl][index].subtreeSize + copy(updateEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], pc.chunkLevel[lvl][index].key[:pc.hashSize]) } - self.enqueueTreeChunk(chunkLevel, updateEntry, chunkWG, jobC, quitC, last) + pc.enqueueTreeChunk(updateEntry, chunkWG, last) } else { @@ -565,21 +598,27 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, level: int(lvl + 1), branchCount: noOfBranches, subtreeSize: 0, - chunk: make([]byte, (noOfBranches*self.hashSize)+8), - key: make([]byte, self.hashSize), + chunk: make([]byte, (noOfBranches*pc.hashSize)+8), + key: make([]byte, pc.hashSize), index: int(nextLvlCount), updatePending: false, } index := int64(0) for i := startCount; i < endCount; i++ { - entry := chunkLevel[lvl][i] + entry := pc.chunkLevel[lvl][i] newEntry.subtreeSize += entry.subtreeSize - copy(newEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], entry.key[:self.hashSize]) + copy(newEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], entry.key[:pc.hashSize]) index++ } + // Lonely chunk key is the key of the last chunk that is only one on the last branch. + // In this case, ignore the its tree chunk key and replace it with the lonely chunk key. + if lonelyChunkKey != nil { + // Overwrite the last tree chunk key with the lonely data chunk key. + copy(newEntry.chunk[int64(len(newEntry.chunk))-pc.hashSize:], lonelyChunkKey[:pc.hashSize]) + } - self.enqueueTreeChunk(chunkLevel, newEntry, chunkWG, jobC, quitC, last) + pc.enqueueTreeChunk(newEntry, chunkWG, last) } @@ -588,15 +627,15 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, if !isAppend { chunkWG.Wait() if compress { - chunkLevel[lvl] = nil + pc.chunkLevel[lvl] = nil } } } } -func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool) { - if ent != nil { +func (pc *PyramidChunker) enqueueTreeChunk(ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) { + if ent != nil && ent.branchCount > 0 { // wait for data chunks to get over before processing the tree chunk if last { @@ -604,34 +643,57 @@ func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *Tre } binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize) - ent.key = make([]byte, self.hashSize) + ent.key = make([]byte, pc.hashSize) chunkWG.Add(1) select { - case jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*self.hashSize+8], int64(ent.subtreeSize), chunkWG, TreeChunk, 0}: - case <-quitC: + case pc.jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*pc.hashSize+8], chunkWG}: + case <-pc.quitC: } // Update or append based on weather it is a new entry or being reused if ent.updatePending { chunkWG.Wait() - chunkLevel[ent.level][ent.index] = ent + pc.chunkLevel[ent.level][ent.index] = ent } else { - chunkLevel[ent.level] = append(chunkLevel[ent.level], ent) + pc.chunkLevel[ent.level] = append(pc.chunkLevel[ent.level], ent) } } } -func (self *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool) Key { +func (pc *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup) Address { binary.LittleEndian.PutUint64(chunkData[:8], size) - pkey := parent.chunk[8+parent.branchCount*self.hashSize : 8+(parent.branchCount+1)*self.hashSize] + pkey := parent.chunk[8+parent.branchCount*pc.hashSize : 8+(parent.branchCount+1)*pc.hashSize] chunkWG.Add(1) select { - case jobC <- &chunkJob{pkey, chunkData[:size+8], int64(size), chunkWG, DataChunk, -1}: - case <-quitC: + case pc.jobC <- &chunkJob{pkey, chunkData[:size+8], chunkWG}: + case <-pc.quitC: } return pkey } + +// depth returns the number of chunk levels. +// It is used to detect if there is only one data chunk +// left for the last branch. +func (pc *PyramidChunker) depth() (d int) { + for _, l := range pc.chunkLevel { + if l == nil { + return + } + d++ + } + return +} + +// cleanChunkLevels removes gaps (nil levels) between chunk levels +// that are not nil. +func (pc *PyramidChunker) cleanChunkLevels() { + for i, l := range pc.chunkLevel { + if l == nil { + pc.chunkLevel = append(pc.chunkLevel[:i], append(pc.chunkLevel[i+1:], nil)...) + } + } +} |