diff options
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r-- | swarm/storage/chunker.go | 490 |
1 files changed, 271 insertions, 219 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 2b397f801..5780742e3 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -13,7 +13,6 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - package storage import ( @@ -25,6 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" ) /* @@ -65,129 +65,214 @@ var ( errOperationTimedOut = errors.New("operation timed out") ) -//metrics variables -var ( - newChunkCounter = metrics.NewRegisteredCounter("storage.chunks.new", nil) +const ( + DefaultChunkSize int64 = 4096 ) +type ChunkerParams struct { + chunkSize int64 + hashSize int64 +} + +type SplitterParams struct { + ChunkerParams + reader io.Reader + putter Putter + addr Address +} + +type TreeSplitterParams struct { + SplitterParams + size int64 +} + +type JoinerParams struct { + ChunkerParams + addr Address + getter Getter + // TODO: there is a bug, so depth can only be 0 today, see: https://github.com/ethersphere/go-ethereum/issues/344 + depth int +} + type TreeChunker struct { branches int64 hashFunc SwarmHasher + dataSize int64 + data io.Reader // calculated + addr Address + depth int hashSize int64 // self.hashFunc.New().Size() chunkSize int64 // hashSize* branches workerCount int64 // the number of worker routines used workerLock sync.RWMutex // lock for the worker count + jobC chan *hashJob + wg *sync.WaitGroup + putter Putter + getter Getter + errC chan error + quitC chan bool } -func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) { - self = &TreeChunker{} - self.hashFunc = MakeHashFunc(params.Hash) - self.branches = params.Branches - self.hashSize = int64(self.hashFunc().Size()) - self.chunkSize = self.hashSize * self.branches - self.workerCount = 0 +/* + Join reconstructs original content based on a root key. + When joining, the caller gets returned a Lazy SectionReader, which is + seekable and implements on-demand fetching of chunks as and where it is read. + New chunks to retrieve are coming from the getter, which the caller provides. + If an error is encountered during joining, it appears as a reader error. + The SectionReader. + As a result, partial reads from a document are possible even if other parts + are corrupt or lost. + The chunks are not meant to be validated by the chunker when joining. This + is because it is left to the DPA to decide which sources are trusted. +*/ +func TreeJoin(addr Address, getter Getter, depth int) *LazyChunkReader { + jp := &JoinerParams{ + ChunkerParams: ChunkerParams{ + chunkSize: DefaultChunkSize, + hashSize: int64(len(addr)), + }, + addr: addr, + getter: getter, + depth: depth, + } - return + return NewTreeJoiner(jp).Join() +} + +/* + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + New chunks to store are store using the putter which the caller provides. +*/ +func TreeSplit(data io.Reader, size int64, putter Putter) (k Address, wait func(), err error) { + tsp := &TreeSplitterParams{ + SplitterParams: SplitterParams{ + ChunkerParams: ChunkerParams{ + chunkSize: DefaultChunkSize, + hashSize: putter.RefSize(), + }, + reader: data, + putter: putter, + }, + size: size, + } + return NewTreeSplitter(tsp).Split() } -// func (self *TreeChunker) KeySize() int64 { -// return self.hashSize -// } +func NewTreeJoiner(params *JoinerParams) *TreeChunker { + tc := &TreeChunker{} + tc.hashSize = params.hashSize + tc.branches = params.chunkSize / params.hashSize + tc.addr = params.addr + tc.getter = params.getter + tc.depth = params.depth + tc.chunkSize = params.chunkSize + tc.workerCount = 0 + tc.jobC = make(chan *hashJob, 2*ChunkProcessors) + tc.wg = &sync.WaitGroup{} + tc.errC = make(chan error) + tc.quitC = make(chan bool) + + return tc +} + +func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker { + tc := &TreeChunker{} + tc.data = params.reader + tc.dataSize = params.size + tc.hashSize = params.hashSize + tc.branches = params.chunkSize / params.hashSize + tc.addr = params.addr + tc.chunkSize = params.chunkSize + tc.putter = params.putter + tc.workerCount = 0 + tc.jobC = make(chan *hashJob, 2*ChunkProcessors) + tc.wg = &sync.WaitGroup{} + tc.errC = make(chan error) + tc.quitC = make(chan bool) + + return tc +} // String() for pretty printing -func (self *Chunk) String() string { - return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", self.Key.Log(), self.Size, len(self.SData)) +func (c *Chunk) String() string { + return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", c.Addr.Log(), c.Size, len(c.SData)) } type hashJob struct { - key Key + key Address chunk []byte size int64 parentWg *sync.WaitGroup } -func (self *TreeChunker) incrementWorkerCount() { - self.workerLock.Lock() - defer self.workerLock.Unlock() - self.workerCount += 1 +func (tc *TreeChunker) incrementWorkerCount() { + tc.workerLock.Lock() + defer tc.workerLock.Unlock() + tc.workerCount += 1 } -func (self *TreeChunker) getWorkerCount() int64 { - self.workerLock.RLock() - defer self.workerLock.RUnlock() - return self.workerCount +func (tc *TreeChunker) getWorkerCount() int64 { + tc.workerLock.RLock() + defer tc.workerLock.RUnlock() + return tc.workerCount } -func (self *TreeChunker) decrementWorkerCount() { - self.workerLock.Lock() - defer self.workerLock.Unlock() - self.workerCount -= 1 +func (tc *TreeChunker) decrementWorkerCount() { + tc.workerLock.Lock() + defer tc.workerLock.Unlock() + tc.workerCount -= 1 } -func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) { - if self.chunkSize <= 0 { +func (tc *TreeChunker) Split() (k Address, wait func(), err error) { + if tc.chunkSize <= 0 { panic("chunker must be initialised") } - jobC := make(chan *hashJob, 2*ChunkProcessors) - wg := &sync.WaitGroup{} - errC := make(chan error) - quitC := make(chan bool) - - // wwg = workers waitgroup keeps track of hashworkers spawned by this split call - if wwg != nil { - wwg.Add(1) - } - - self.incrementWorkerCount() - go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) + tc.runWorker() depth := 0 - treeSize := self.chunkSize + treeSize := tc.chunkSize // takes lowest depth such that chunksize*HashCount^(depth+1) > size // power series, will find the order of magnitude of the data size in base hashCount or numbers of levels of branching in the resulting tree. - for ; treeSize < size; treeSize *= self.branches { + for ; treeSize < tc.dataSize; treeSize *= tc.branches { depth++ } - key := make([]byte, self.hashFunc().Size()) + key := make([]byte, tc.hashSize) // this waitgroup member is released after the root hash is calculated - wg.Add(1) + tc.wg.Add(1) //launch actual recursive function passing the waitgroups - go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg) + go tc.split(depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) // closes internal error channel if all subprocesses in the workgroup finished go func() { // waiting for all threads to finish - wg.Wait() - // if storage waitgroup is non-nil, we wait for storage to finish too - if swg != nil { - swg.Wait() - } - close(errC) + tc.wg.Wait() + close(tc.errC) }() - defer close(quitC) + defer close(tc.quitC) + defer tc.putter.Close() select { - case err := <-errC: + case err := <-tc.errC: if err != nil { - return nil, err + return nil, nil, err } case <-time.NewTimer(splitTimeout).C: - return nil, errOperationTimedOut + return nil, nil, errOperationTimedOut } - return key, nil + return key, tc.putter.Wait, nil } -func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) { +func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { // for depth > 0 && size < treeSize { - treeSize /= self.branches + treeSize /= tc.branches depth-- } @@ -197,16 +282,16 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size)) var readBytes int64 for readBytes < size { - n, err := data.Read(chunkData[8+readBytes:]) + n, err := tc.data.Read(chunkData[8+readBytes:]) readBytes += int64(n) if err != nil && !(err == io.EOF && readBytes == size) { - errC <- err + tc.errC <- err return } } select { - case jobC <- &hashJob{key, chunkData, size, parentWg}: - case <-quitC: + case tc.jobC <- &hashJob{addr, chunkData, size, parentWg}: + case <-tc.quitC: } return } @@ -214,7 +299,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade // intermediate chunk containing child nodes hashes branchCnt := (size + treeSize - 1) / treeSize - var chunk = make([]byte, branchCnt*self.hashSize+8) + var chunk = make([]byte, branchCnt*tc.hashSize+8) var pos, i int64 binary.LittleEndian.PutUint64(chunk[0:8], uint64(size)) @@ -229,10 +314,10 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade secSize = treeSize } // the hash of that data - subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize] + subTreeKey := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] childrenWg.Add(1) - self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg) + tc.split(depth-1, treeSize/tc.branches, subTreeKey, secSize, childrenWg) i++ pos += treeSize @@ -242,135 +327,107 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade // go func() { childrenWg.Wait() - worker := self.getWorkerCount() - if int64(len(jobC)) > worker && worker < ChunkProcessors { - if wwg != nil { - wwg.Add(1) - } - self.incrementWorkerCount() - go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) + worker := tc.getWorkerCount() + if int64(len(tc.jobC)) > worker && worker < ChunkProcessors { + tc.runWorker() } select { - case jobC <- &hashJob{key, chunk, size, parentWg}: - case <-quitC: + case tc.jobC <- &hashJob{addr, chunk, size, parentWg}: + case <-tc.quitC: } } -func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) { - defer self.decrementWorkerCount() +func (tc *TreeChunker) runWorker() { + tc.incrementWorkerCount() + go func() { + defer tc.decrementWorkerCount() + for { + select { - hasher := self.hashFunc() - if wwg != nil { - defer wwg.Done() - } - for { - select { + case job, ok := <-tc.jobC: + if !ok { + return + } - case job, ok := <-jobC: - if !ok { + h, err := tc.putter.Put(job.chunk) + if err != nil { + tc.errC <- err + return + } + copy(job.key, h) + job.parentWg.Done() + case <-tc.quitC: return } - // now we got the hashes in the chunk, then hash the chunks - self.hashChunk(hasher, job, chunkC, swg) - case <-quitC: - return } - } -} - -// The treeChunkers own Hash hashes together -// - the size (of the subtree encoded in the Chunk) -// - the Chunk, ie. the contents read from the input reader -func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) { - hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length - hasher.Write(job.chunk[8:]) // minus 8 []byte length - h := hasher.Sum(nil) - - newChunk := &Chunk{ - Key: h, - SData: job.chunk, - Size: job.size, - wg: swg, - } - - // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) - copy(job.key, h) - // send off new chunk to storage - if chunkC != nil { - if swg != nil { - swg.Add(1) - } - } - job.parentWg.Done() - - if chunkC != nil { - //NOTE: this increases the chunk count even if the local node already has this chunk; - //on file upload the node will increase this counter even if the same file has already been uploaded - //So it should be evaluated whether it is worth keeping this counter - //and/or actually better track when the chunk is Put to the local database - //(which may question the need for disambiguation when a completely new chunk has been created - //and/or a chunk is being put to the local DB; for chunk tracking it may be worth distinguishing - newChunkCounter.Inc(1) - chunkC <- newChunk - } + }() } -func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) { - return nil, errAppendOppNotSuported +func (tc *TreeChunker) Append() (Address, func(), error) { + return nil, nil, errAppendOppNotSuported } // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { - key Key // root key - chunkC chan *Chunk // chunk channel to send retrieve requests on - chunk *Chunk // size of the entire subtree - off int64 // offset - chunkSize int64 // inherit from chunker - branches int64 // inherit from chunker - hashSize int64 // inherit from chunker + key Address // root key + chunkData ChunkData + off int64 // offset + chunkSize int64 // inherit from chunker + branches int64 // inherit from chunker + hashSize int64 // inherit from chunker + depth int + getter Getter } -// implements the Joiner interface -func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader { +func (tc *TreeChunker) Join() *LazyChunkReader { return &LazyChunkReader{ - key: key, - chunkC: chunkC, - chunkSize: self.chunkSize, - branches: self.branches, - hashSize: self.hashSize, + key: tc.addr, + chunkSize: tc.chunkSize, + branches: tc.branches, + hashSize: tc.hashSize, + depth: tc.depth, + getter: tc.getter, } } // Size is meant to be called on the LazySectionReader -func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { - if self.chunk != nil { - return self.chunk.Size, nil - } - chunk := retrieve(self.key, self.chunkC, quitC) - if chunk == nil { - select { - case <-quitC: - return 0, errors.New("aborted") - default: - return 0, fmt.Errorf("root chunk not found for %v", self.key.Hex()) +func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { + metrics.GetOrRegisterCounter("lazychunkreader.size", nil).Inc(1) + + log.Debug("lazychunkreader.size", "key", r.key) + if r.chunkData == nil { + chunkData, err := r.getter.Get(Reference(r.key)) + if err != nil { + return 0, err } + if chunkData == nil { + select { + case <-quitC: + return 0, errors.New("aborted") + default: + return 0, fmt.Errorf("root chunk not found for %v", r.key.Hex()) + } + } + r.chunkData = chunkData } - self.chunk = chunk - return chunk.Size, nil + return r.chunkData.Size(), nil } // read at can be called numerous times // concurrent reads are allowed // Size() needs to be called synchronously on the LazyChunkReader first -func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { +func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { + metrics.GetOrRegisterCounter("lazychunkreader.readat", nil).Inc(1) + // this is correct, a swarm doc cannot be zero length, so no EOF is expected if len(b) == 0 { return 0, nil } quitC := make(chan bool) - size, err := self.Size(quitC) + size, err := r.Size(quitC) if err != nil { + log.Error("lazychunkreader.readat.size", "size", size, "err", err) return 0, err } @@ -380,13 +437,18 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { var treeSize int64 var depth int // calculate depth and max treeSize - treeSize = self.chunkSize - for ; treeSize < size; treeSize *= self.branches { + treeSize = r.chunkSize + for ; treeSize < size; treeSize *= r.branches { depth++ } wg := sync.WaitGroup{} + length := int64(len(b)) + for d := 0; d < r.depth; d++ { + off *= r.chunkSize + length *= r.chunkSize + } wg.Add(1) - go self.join(b, off, off+int64(len(b)), depth, treeSize/self.branches, self.chunk, &wg, errC, quitC) + go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -394,35 +456,31 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { err = <-errC if err != nil { + log.Error("lazychunkreader.readat.errc", "err", err) close(quitC) - return 0, err } if off+int64(len(b)) >= size { - return len(b), io.EOF + return int(size - off), io.EOF } return len(b), nil } -func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() - // return NewDPA(&LocalStore{}) - - // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - // find appropriate block level - for chunk.Size < treeSize && depth > 0 { - treeSize /= self.branches + for chunkData.Size() < treeSize && depth > r.depth { + treeSize /= r.branches depth-- } // leaf chunk found - if depth == 0 { - extra := 8 + eoff - int64(len(chunk.SData)) + if depth == r.depth { + extra := 8 + eoff - int64(len(chunkData)) if extra > 0 { eoff -= extra } - copy(b, chunk.SData[8+off:8+eoff]) + copy(b, chunkData[8+off:8+eoff]) return // simply give back the chunks reader for content chunks } @@ -430,9 +488,14 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr start := off / treeSize end := (eoff + treeSize - 1) / treeSize + // last non-leaf chunk can be shorter than default chunk size, let's not read it further then its end + currentBranches := int64(len(chunkData)-8) / r.hashSize + if end > currentBranches { + end = currentBranches + } + wg := &sync.WaitGroup{} defer wg.Wait() - for i := start; i < end; i++ { soff := i * treeSize roff := soff @@ -449,11 +512,19 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr } wg.Add(1) go func(j int64) { - childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize] - chunk := retrieve(childKey, self.chunkC, quitC) - if chunk == nil { + childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + chunkData, err := r.getter.Get(Reference(childKey)) + if err != nil { + log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) + select { + case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childKey)): + case <-quitC: + } + return + } + if l := len(chunkData); l < 9 { select { - case errC <- fmt.Errorf("chunk %v-%v not found", off, off+treeSize): + case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childKey), l): case <-quitC: } return @@ -461,45 +532,25 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr if soff < off { soff = off } - self.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/self.branches, chunk, wg, errC, quitC) + r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } -// the helper method submits chunks for a key to a oueue (DPA) and -// block until they time out or arrive -// abort if quitC is readable -func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { - chunk := &Chunk{ - Key: key, - C: make(chan bool), // close channel to signal data delivery - } - // submit chunk for retrieval - select { - case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally) - case <-quitC: - return nil - } - // waiting for the chunk retrieval - select { // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - - case <-quitC: - // this is how we control process leakage (quitC is closed once join is finished (after timeout)) - return nil - case <-chunk.C: // bells are ringing, data have been delivered - } - if len(chunk.SData) == 0 { - return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - +// Read keeps a cursor so cannot be called simulateously, see ReadAt +func (r *LazyChunkReader) Read(b []byte) (read int, err error) { + log.Debug("lazychunkreader.read", "key", r.key) + metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1) + + read, err = r.ReadAt(b, r.off) + if err != nil && err != io.EOF { + log.Error("lazychunkreader.readat", "read", read, "err", err) + metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1) } - return chunk -} -// Read keeps a cursor so cannot be called simulateously, see ReadAt -func (self *LazyChunkReader) Read(b []byte) (read int, err error) { - read, err = self.ReadAt(b, self.off) + metrics.GetOrRegisterCounter("lazychunkreader.read.bytes", nil).Inc(int64(read)) - self.off += int64(read) + r.off += int64(read) return } @@ -507,27 +558,28 @@ func (self *LazyChunkReader) Read(b []byte) (read int, err error) { var errWhence = errors.New("Seek: invalid whence") var errOffset = errors.New("Seek: invalid offset") -func (s *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { +func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { + log.Debug("lazychunkreader.seek", "key", r.key, "offset", offset) switch whence { default: return 0, errWhence case 0: offset += 0 case 1: - offset += s.off + offset += r.off case 2: - if s.chunk == nil { //seek from the end requires rootchunk for size. call Size first - _, err := s.Size(nil) + if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first + _, err := r.Size(nil) if err != nil { return 0, fmt.Errorf("can't get size: %v", err) } } - offset += s.chunk.Size + offset += r.chunkData.Size() } if offset < 0 { return 0, errOffset } - s.off = offset + r.off = offset return offset, nil } |