aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/chunker.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r--swarm/storage/chunker.go93
1 files changed, 58 insertions, 35 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index ca85e4333..0454828b9 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -20,9 +20,9 @@ import (
"encoding/binary"
"errors"
"fmt"
- "hash"
"io"
"sync"
+ "time"
)
/*
@@ -50,14 +50,6 @@ data_{i} := size(subtree_{i}) || key_{j} || key_{j+1} .... || key_{j+n-1}
The underlying hash function is configurable
*/
-const (
- defaultHash = "SHA3"
- // defaultHash = "BMTSHA3" // http://golang.org/pkg/hash/#Hash
- // defaultHash = "SHA256" // http://golang.org/pkg/hash/#Hash
- defaultBranches int64 = 128
- // hashSize int64 = hasherfunc.New().Size() // hasher knows about its own length in bytes
- // chunksize int64 = branches * hashSize // chunk is defined as this
-)
/*
Tree chunker is a concrete implementation of data chunking.
@@ -67,25 +59,19 @@ If all is well it is possible to implement this by simply composing readers so t
The hashing itself does use extra copies and allocation though, since it does need it.
*/
-type ChunkerParams struct {
- Branches int64
- Hash string
-}
-
-func NewChunkerParams() *ChunkerParams {
- return &ChunkerParams{
- Branches: defaultBranches,
- Hash: defaultHash,
- }
-}
+var (
+ errAppendOppNotSuported = errors.New("Append operation not supported")
+ errOperationTimedOut = errors.New("operation timed out")
+)
type TreeChunker struct {
branches int64
- hashFunc Hasher
+ hashFunc SwarmHasher
// calculated
hashSize int64 // self.hashFunc.New().Size()
chunkSize int64 // hashSize* branches
- workerCount int
+ workerCount int64 // the number of worker routines used
+ workerLock sync.RWMutex // lock for the worker count
}
func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
@@ -94,7 +80,8 @@ func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
self.branches = params.Branches
self.hashSize = int64(self.hashFunc().Size())
self.chunkSize = self.hashSize * self.branches
- self.workerCount = 1
+ self.workerCount = 0
+
return
}
@@ -114,13 +101,31 @@ type hashJob struct {
parentWg *sync.WaitGroup
}
-func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+func (self *TreeChunker) incrementWorkerCount() {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ self.workerCount += 1
+}
+
+func (self *TreeChunker) getWorkerCount() int64 {
+ self.workerLock.RLock()
+ defer self.workerLock.RUnlock()
+ return self.workerCount
+}
+func (self *TreeChunker) decrementWorkerCount() {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ self.workerCount -= 1
+}
+
+func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
if self.chunkSize <= 0 {
panic("chunker must be initialised")
}
- jobC := make(chan *hashJob, 2*processors)
+
+ jobC := make(chan *hashJob, 2*ChunkProcessors)
wg := &sync.WaitGroup{}
errC := make(chan error)
quitC := make(chan bool)
@@ -129,6 +134,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
if wwg != nil {
wwg.Add(1)
}
+
+ self.incrementWorkerCount()
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
depth := 0
@@ -157,10 +164,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
close(errC)
}()
- //TODO: add a timeout
- if err := <-errC; err != nil {
- close(quitC)
- return nil, err
+
+ defer close(quitC)
+ select {
+ case err := <-errC:
+ if err != nil {
+ return nil, err
+ }
+ case <-time.NewTimer(splitTimeout).C:
+ return nil,errOperationTimedOut
}
return key, nil
@@ -168,6 +180,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {
+ //
+
for depth > 0 && size < treeSize {
treeSize /= self.branches
depth--
@@ -223,12 +237,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
// parentWg.Add(1)
// go func() {
childrenWg.Wait()
- if len(jobC) > self.workerCount && self.workerCount < processors {
+
+ worker := self.getWorkerCount()
+ if int64(len(jobC)) > worker && worker < ChunkProcessors {
if wwg != nil {
wwg.Add(1)
}
- self.workerCount++
+ self.incrementWorkerCount()
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
+
}
select {
case jobC <- &hashJob{key, chunk, size, parentWg}:
@@ -237,6 +254,8 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
}
func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
+ defer self.decrementWorkerCount()
+
hasher := self.hashFunc()
if wwg != nil {
defer wwg.Done()
@@ -249,7 +268,6 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
return
}
// now we got the hashes in the chunk, then hash the chunks
- hasher.Reset()
self.hashChunk(hasher, job, chunkC, swg)
case <-quitC:
return
@@ -260,9 +278,11 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
// The treeChunkers own Hash hashes together
// - the size (of the subtree encoded in the Chunk)
// - the Chunk, ie. the contents read from the input reader
-func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
- hasher.Write(job.chunk)
+func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
+ hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
+ hasher.Write(job.chunk[8:]) // minus 8 []byte length
h := hasher.Sum(nil)
+
newChunk := &Chunk{
Key: h,
SData: job.chunk,
@@ -285,6 +305,10 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *
}
}
+func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+ return nil, errAppendOppNotSuported
+}
+
// LazyChunkReader implements LazySectionReader
type LazyChunkReader struct {
key Key // root key
@@ -298,7 +322,6 @@ type LazyChunkReader struct {
// implements the Joiner interface
func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
-
return &LazyChunkReader{
key: key,
chunkC: chunkC,