diff options
author | Zahoor Mohamed <zahoor@zahoor.in> | 2017-09-22 04:22:51 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-09-22 04:22:51 +0800 |
commit | d558a595adf4e89bab5b28ffde1448dc1e5768b0 (patch) | |
tree | 1316cca927bfd4dfc4a8673ae0b9c2f75724f07e /swarm/storage/chunker.go | |
parent | 3c8656347f67dbc8e57c663ec5c26d24c4151678 (diff) | |
download | go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.gz go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.bz2 go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.lz go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.xz go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.zst go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.zip |
swarm/storage: pyramid chunker re-write (#14382)
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r-- | swarm/storage/chunker.go | 93 |
1 files changed, 58 insertions, 35 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index ca85e4333..0454828b9 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -20,9 +20,9 @@ import ( "encoding/binary" "errors" "fmt" - "hash" "io" "sync" + "time" ) /* @@ -50,14 +50,6 @@ data_{i} := size(subtree_{i}) || key_{j} || key_{j+1} .... || key_{j+n-1} The underlying hash function is configurable */ -const ( - defaultHash = "SHA3" - // defaultHash = "BMTSHA3" // http://golang.org/pkg/hash/#Hash - // defaultHash = "SHA256" // http://golang.org/pkg/hash/#Hash - defaultBranches int64 = 128 - // hashSize int64 = hasherfunc.New().Size() // hasher knows about its own length in bytes - // chunksize int64 = branches * hashSize // chunk is defined as this -) /* Tree chunker is a concrete implementation of data chunking. @@ -67,25 +59,19 @@ If all is well it is possible to implement this by simply composing readers so t The hashing itself does use extra copies and allocation though, since it does need it. */ -type ChunkerParams struct { - Branches int64 - Hash string -} - -func NewChunkerParams() *ChunkerParams { - return &ChunkerParams{ - Branches: defaultBranches, - Hash: defaultHash, - } -} +var ( + errAppendOppNotSuported = errors.New("Append operation not supported") + errOperationTimedOut = errors.New("operation timed out") +) type TreeChunker struct { branches int64 - hashFunc Hasher + hashFunc SwarmHasher // calculated hashSize int64 // self.hashFunc.New().Size() chunkSize int64 // hashSize* branches - workerCount int + workerCount int64 // the number of worker routines used + workerLock sync.RWMutex // lock for the worker count } func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) { @@ -94,7 +80,8 @@ func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) { self.branches = params.Branches self.hashSize = int64(self.hashFunc().Size()) self.chunkSize = self.hashSize * self.branches - self.workerCount = 1 + self.workerCount = 0 + return } @@ -114,13 +101,31 @@ type hashJob struct { parentWg *sync.WaitGroup } -func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) { +func (self *TreeChunker) incrementWorkerCount() { + self.workerLock.Lock() + defer self.workerLock.Unlock() + self.workerCount += 1 +} + +func (self *TreeChunker) getWorkerCount() int64 { + self.workerLock.RLock() + defer self.workerLock.RUnlock() + return self.workerCount +} +func (self *TreeChunker) decrementWorkerCount() { + self.workerLock.Lock() + defer self.workerLock.Unlock() + self.workerCount -= 1 +} + +func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) { if self.chunkSize <= 0 { panic("chunker must be initialised") } - jobC := make(chan *hashJob, 2*processors) + + jobC := make(chan *hashJob, 2*ChunkProcessors) wg := &sync.WaitGroup{} errC := make(chan error) quitC := make(chan bool) @@ -129,6 +134,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s if wwg != nil { wwg.Add(1) } + + self.incrementWorkerCount() go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) depth := 0 @@ -157,10 +164,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s close(errC) }() - //TODO: add a timeout - if err := <-errC; err != nil { - close(quitC) - return nil, err + + defer close(quitC) + select { + case err := <-errC: + if err != nil { + return nil, err + } + case <-time.NewTimer(splitTimeout).C: + return nil,errOperationTimedOut } return key, nil @@ -168,6 +180,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) { + // + for depth > 0 && size < treeSize { treeSize /= self.branches depth-- @@ -223,12 +237,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade // parentWg.Add(1) // go func() { childrenWg.Wait() - if len(jobC) > self.workerCount && self.workerCount < processors { + + worker := self.getWorkerCount() + if int64(len(jobC)) > worker && worker < ChunkProcessors { if wwg != nil { wwg.Add(1) } - self.workerCount++ + self.incrementWorkerCount() go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) + } select { case jobC <- &hashJob{key, chunk, size, parentWg}: @@ -237,6 +254,8 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade } func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) { + defer self.decrementWorkerCount() + hasher := self.hashFunc() if wwg != nil { defer wwg.Done() @@ -249,7 +268,6 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC return } // now we got the hashes in the chunk, then hash the chunks - hasher.Reset() self.hashChunk(hasher, job, chunkC, swg) case <-quitC: return @@ -260,9 +278,11 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC // The treeChunkers own Hash hashes together // - the size (of the subtree encoded in the Chunk) // - the Chunk, ie. the contents read from the input reader -func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) { - hasher.Write(job.chunk) +func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) { + hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length + hasher.Write(job.chunk[8:]) // minus 8 []byte length h := hasher.Sum(nil) + newChunk := &Chunk{ Key: h, SData: job.chunk, @@ -285,6 +305,10 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan * } } +func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) { + return nil, errAppendOppNotSuported +} + // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { key Key // root key @@ -298,7 +322,6 @@ type LazyChunkReader struct { // implements the Joiner interface func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader { - return &LazyChunkReader{ key: key, chunkC: chunkC, |