aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/chunker.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r--swarm/storage/chunker.go490
1 files changed, 271 insertions, 219 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index 2b397f801..5780742e3 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -13,7 +13,6 @@
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
package storage
import (
@@ -25,6 +24,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/swarm/log"
)
/*
@@ -65,129 +65,214 @@ var (
errOperationTimedOut = errors.New("operation timed out")
)
-//metrics variables
-var (
- newChunkCounter = metrics.NewRegisteredCounter("storage.chunks.new", nil)
+const (
+ DefaultChunkSize int64 = 4096
)
+type ChunkerParams struct {
+ chunkSize int64
+ hashSize int64
+}
+
+type SplitterParams struct {
+ ChunkerParams
+ reader io.Reader
+ putter Putter
+ addr Address
+}
+
+type TreeSplitterParams struct {
+ SplitterParams
+ size int64
+}
+
+type JoinerParams struct {
+ ChunkerParams
+ addr Address
+ getter Getter
+ // TODO: there is a bug, so depth can only be 0 today, see: https://github.com/ethersphere/go-ethereum/issues/344
+ depth int
+}
+
type TreeChunker struct {
branches int64
hashFunc SwarmHasher
+ dataSize int64
+ data io.Reader
// calculated
+ addr Address
+ depth int
hashSize int64 // self.hashFunc.New().Size()
chunkSize int64 // hashSize* branches
workerCount int64 // the number of worker routines used
workerLock sync.RWMutex // lock for the worker count
+ jobC chan *hashJob
+ wg *sync.WaitGroup
+ putter Putter
+ getter Getter
+ errC chan error
+ quitC chan bool
}
-func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
- self = &TreeChunker{}
- self.hashFunc = MakeHashFunc(params.Hash)
- self.branches = params.Branches
- self.hashSize = int64(self.hashFunc().Size())
- self.chunkSize = self.hashSize * self.branches
- self.workerCount = 0
+/*
+ Join reconstructs original content based on a root key.
+ When joining, the caller gets returned a Lazy SectionReader, which is
+ seekable and implements on-demand fetching of chunks as and where it is read.
+ New chunks to retrieve are coming from the getter, which the caller provides.
+ If an error is encountered during joining, it appears as a reader error.
+ The SectionReader.
+ As a result, partial reads from a document are possible even if other parts
+ are corrupt or lost.
+ The chunks are not meant to be validated by the chunker when joining. This
+ is because it is left to the DPA to decide which sources are trusted.
+*/
+func TreeJoin(addr Address, getter Getter, depth int) *LazyChunkReader {
+ jp := &JoinerParams{
+ ChunkerParams: ChunkerParams{
+ chunkSize: DefaultChunkSize,
+ hashSize: int64(len(addr)),
+ },
+ addr: addr,
+ getter: getter,
+ depth: depth,
+ }
- return
+ return NewTreeJoiner(jp).Join()
+}
+
+/*
+ When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
+ New chunks to store are store using the putter which the caller provides.
+*/
+func TreeSplit(data io.Reader, size int64, putter Putter) (k Address, wait func(), err error) {
+ tsp := &TreeSplitterParams{
+ SplitterParams: SplitterParams{
+ ChunkerParams: ChunkerParams{
+ chunkSize: DefaultChunkSize,
+ hashSize: putter.RefSize(),
+ },
+ reader: data,
+ putter: putter,
+ },
+ size: size,
+ }
+ return NewTreeSplitter(tsp).Split()
}
-// func (self *TreeChunker) KeySize() int64 {
-// return self.hashSize
-// }
+func NewTreeJoiner(params *JoinerParams) *TreeChunker {
+ tc := &TreeChunker{}
+ tc.hashSize = params.hashSize
+ tc.branches = params.chunkSize / params.hashSize
+ tc.addr = params.addr
+ tc.getter = params.getter
+ tc.depth = params.depth
+ tc.chunkSize = params.chunkSize
+ tc.workerCount = 0
+ tc.jobC = make(chan *hashJob, 2*ChunkProcessors)
+ tc.wg = &sync.WaitGroup{}
+ tc.errC = make(chan error)
+ tc.quitC = make(chan bool)
+
+ return tc
+}
+
+func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker {
+ tc := &TreeChunker{}
+ tc.data = params.reader
+ tc.dataSize = params.size
+ tc.hashSize = params.hashSize
+ tc.branches = params.chunkSize / params.hashSize
+ tc.addr = params.addr
+ tc.chunkSize = params.chunkSize
+ tc.putter = params.putter
+ tc.workerCount = 0
+ tc.jobC = make(chan *hashJob, 2*ChunkProcessors)
+ tc.wg = &sync.WaitGroup{}
+ tc.errC = make(chan error)
+ tc.quitC = make(chan bool)
+
+ return tc
+}
// String() for pretty printing
-func (self *Chunk) String() string {
- return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", self.Key.Log(), self.Size, len(self.SData))
+func (c *Chunk) String() string {
+ return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", c.Addr.Log(), c.Size, len(c.SData))
}
type hashJob struct {
- key Key
+ key Address
chunk []byte
size int64
parentWg *sync.WaitGroup
}
-func (self *TreeChunker) incrementWorkerCount() {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- self.workerCount += 1
+func (tc *TreeChunker) incrementWorkerCount() {
+ tc.workerLock.Lock()
+ defer tc.workerLock.Unlock()
+ tc.workerCount += 1
}
-func (self *TreeChunker) getWorkerCount() int64 {
- self.workerLock.RLock()
- defer self.workerLock.RUnlock()
- return self.workerCount
+func (tc *TreeChunker) getWorkerCount() int64 {
+ tc.workerLock.RLock()
+ defer tc.workerLock.RUnlock()
+ return tc.workerCount
}
-func (self *TreeChunker) decrementWorkerCount() {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- self.workerCount -= 1
+func (tc *TreeChunker) decrementWorkerCount() {
+ tc.workerLock.Lock()
+ defer tc.workerLock.Unlock()
+ tc.workerCount -= 1
}
-func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
- if self.chunkSize <= 0 {
+func (tc *TreeChunker) Split() (k Address, wait func(), err error) {
+ if tc.chunkSize <= 0 {
panic("chunker must be initialised")
}
- jobC := make(chan *hashJob, 2*ChunkProcessors)
- wg := &sync.WaitGroup{}
- errC := make(chan error)
- quitC := make(chan bool)
-
- // wwg = workers waitgroup keeps track of hashworkers spawned by this split call
- if wwg != nil {
- wwg.Add(1)
- }
-
- self.incrementWorkerCount()
- go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
+ tc.runWorker()
depth := 0
- treeSize := self.chunkSize
+ treeSize := tc.chunkSize
// takes lowest depth such that chunksize*HashCount^(depth+1) > size
// power series, will find the order of magnitude of the data size in base hashCount or numbers of levels of branching in the resulting tree.
- for ; treeSize < size; treeSize *= self.branches {
+ for ; treeSize < tc.dataSize; treeSize *= tc.branches {
depth++
}
- key := make([]byte, self.hashFunc().Size())
+ key := make([]byte, tc.hashSize)
// this waitgroup member is released after the root hash is calculated
- wg.Add(1)
+ tc.wg.Add(1)
//launch actual recursive function passing the waitgroups
- go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg)
+ go tc.split(depth, treeSize/tc.branches, key, tc.dataSize, tc.wg)
// closes internal error channel if all subprocesses in the workgroup finished
go func() {
// waiting for all threads to finish
- wg.Wait()
- // if storage waitgroup is non-nil, we wait for storage to finish too
- if swg != nil {
- swg.Wait()
- }
- close(errC)
+ tc.wg.Wait()
+ close(tc.errC)
}()
- defer close(quitC)
+ defer close(tc.quitC)
+ defer tc.putter.Close()
select {
- case err := <-errC:
+ case err := <-tc.errC:
if err != nil {
- return nil, err
+ return nil, nil, err
}
case <-time.NewTimer(splitTimeout).C:
- return nil, errOperationTimedOut
+ return nil, nil, errOperationTimedOut
}
- return key, nil
+ return key, tc.putter.Wait, nil
}
-func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {
+func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) {
//
for depth > 0 && size < treeSize {
- treeSize /= self.branches
+ treeSize /= tc.branches
depth--
}
@@ -197,16 +282,16 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size))
var readBytes int64
for readBytes < size {
- n, err := data.Read(chunkData[8+readBytes:])
+ n, err := tc.data.Read(chunkData[8+readBytes:])
readBytes += int64(n)
if err != nil && !(err == io.EOF && readBytes == size) {
- errC <- err
+ tc.errC <- err
return
}
}
select {
- case jobC <- &hashJob{key, chunkData, size, parentWg}:
- case <-quitC:
+ case tc.jobC <- &hashJob{addr, chunkData, size, parentWg}:
+ case <-tc.quitC:
}
return
}
@@ -214,7 +299,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
// intermediate chunk containing child nodes hashes
branchCnt := (size + treeSize - 1) / treeSize
- var chunk = make([]byte, branchCnt*self.hashSize+8)
+ var chunk = make([]byte, branchCnt*tc.hashSize+8)
var pos, i int64
binary.LittleEndian.PutUint64(chunk[0:8], uint64(size))
@@ -229,10 +314,10 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
secSize = treeSize
}
// the hash of that data
- subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize]
+ subTreeKey := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize]
childrenWg.Add(1)
- self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg)
+ tc.split(depth-1, treeSize/tc.branches, subTreeKey, secSize, childrenWg)
i++
pos += treeSize
@@ -242,135 +327,107 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
// go func() {
childrenWg.Wait()
- worker := self.getWorkerCount()
- if int64(len(jobC)) > worker && worker < ChunkProcessors {
- if wwg != nil {
- wwg.Add(1)
- }
- self.incrementWorkerCount()
- go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
+ worker := tc.getWorkerCount()
+ if int64(len(tc.jobC)) > worker && worker < ChunkProcessors {
+ tc.runWorker()
}
select {
- case jobC <- &hashJob{key, chunk, size, parentWg}:
- case <-quitC:
+ case tc.jobC <- &hashJob{addr, chunk, size, parentWg}:
+ case <-tc.quitC:
}
}
-func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
- defer self.decrementWorkerCount()
+func (tc *TreeChunker) runWorker() {
+ tc.incrementWorkerCount()
+ go func() {
+ defer tc.decrementWorkerCount()
+ for {
+ select {
- hasher := self.hashFunc()
- if wwg != nil {
- defer wwg.Done()
- }
- for {
- select {
+ case job, ok := <-tc.jobC:
+ if !ok {
+ return
+ }
- case job, ok := <-jobC:
- if !ok {
+ h, err := tc.putter.Put(job.chunk)
+ if err != nil {
+ tc.errC <- err
+ return
+ }
+ copy(job.key, h)
+ job.parentWg.Done()
+ case <-tc.quitC:
return
}
- // now we got the hashes in the chunk, then hash the chunks
- self.hashChunk(hasher, job, chunkC, swg)
- case <-quitC:
- return
}
- }
-}
-
-// The treeChunkers own Hash hashes together
-// - the size (of the subtree encoded in the Chunk)
-// - the Chunk, ie. the contents read from the input reader
-func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
- hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
- hasher.Write(job.chunk[8:]) // minus 8 []byte length
- h := hasher.Sum(nil)
-
- newChunk := &Chunk{
- Key: h,
- SData: job.chunk,
- Size: job.size,
- wg: swg,
- }
-
- // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
- copy(job.key, h)
- // send off new chunk to storage
- if chunkC != nil {
- if swg != nil {
- swg.Add(1)
- }
- }
- job.parentWg.Done()
-
- if chunkC != nil {
- //NOTE: this increases the chunk count even if the local node already has this chunk;
- //on file upload the node will increase this counter even if the same file has already been uploaded
- //So it should be evaluated whether it is worth keeping this counter
- //and/or actually better track when the chunk is Put to the local database
- //(which may question the need for disambiguation when a completely new chunk has been created
- //and/or a chunk is being put to the local DB; for chunk tracking it may be worth distinguishing
- newChunkCounter.Inc(1)
- chunkC <- newChunk
- }
+ }()
}
-func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
- return nil, errAppendOppNotSuported
+func (tc *TreeChunker) Append() (Address, func(), error) {
+ return nil, nil, errAppendOppNotSuported
}
// LazyChunkReader implements LazySectionReader
type LazyChunkReader struct {
- key Key // root key
- chunkC chan *Chunk // chunk channel to send retrieve requests on
- chunk *Chunk // size of the entire subtree
- off int64 // offset
- chunkSize int64 // inherit from chunker
- branches int64 // inherit from chunker
- hashSize int64 // inherit from chunker
+ key Address // root key
+ chunkData ChunkData
+ off int64 // offset
+ chunkSize int64 // inherit from chunker
+ branches int64 // inherit from chunker
+ hashSize int64 // inherit from chunker
+ depth int
+ getter Getter
}
-// implements the Joiner interface
-func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
+func (tc *TreeChunker) Join() *LazyChunkReader {
return &LazyChunkReader{
- key: key,
- chunkC: chunkC,
- chunkSize: self.chunkSize,
- branches: self.branches,
- hashSize: self.hashSize,
+ key: tc.addr,
+ chunkSize: tc.chunkSize,
+ branches: tc.branches,
+ hashSize: tc.hashSize,
+ depth: tc.depth,
+ getter: tc.getter,
}
}
// Size is meant to be called on the LazySectionReader
-func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) {
- if self.chunk != nil {
- return self.chunk.Size, nil
- }
- chunk := retrieve(self.key, self.chunkC, quitC)
- if chunk == nil {
- select {
- case <-quitC:
- return 0, errors.New("aborted")
- default:
- return 0, fmt.Errorf("root chunk not found for %v", self.key.Hex())
+func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) {
+ metrics.GetOrRegisterCounter("lazychunkreader.size", nil).Inc(1)
+
+ log.Debug("lazychunkreader.size", "key", r.key)
+ if r.chunkData == nil {
+ chunkData, err := r.getter.Get(Reference(r.key))
+ if err != nil {
+ return 0, err
}
+ if chunkData == nil {
+ select {
+ case <-quitC:
+ return 0, errors.New("aborted")
+ default:
+ return 0, fmt.Errorf("root chunk not found for %v", r.key.Hex())
+ }
+ }
+ r.chunkData = chunkData
}
- self.chunk = chunk
- return chunk.Size, nil
+ return r.chunkData.Size(), nil
}
// read at can be called numerous times
// concurrent reads are allowed
// Size() needs to be called synchronously on the LazyChunkReader first
-func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
+func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
+ metrics.GetOrRegisterCounter("lazychunkreader.readat", nil).Inc(1)
+
// this is correct, a swarm doc cannot be zero length, so no EOF is expected
if len(b) == 0 {
return 0, nil
}
quitC := make(chan bool)
- size, err := self.Size(quitC)
+ size, err := r.Size(quitC)
if err != nil {
+ log.Error("lazychunkreader.readat.size", "size", size, "err", err)
return 0, err
}
@@ -380,13 +437,18 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
var treeSize int64
var depth int
// calculate depth and max treeSize
- treeSize = self.chunkSize
- for ; treeSize < size; treeSize *= self.branches {
+ treeSize = r.chunkSize
+ for ; treeSize < size; treeSize *= r.branches {
depth++
}
wg := sync.WaitGroup{}
+ length := int64(len(b))
+ for d := 0; d < r.depth; d++ {
+ off *= r.chunkSize
+ length *= r.chunkSize
+ }
wg.Add(1)
- go self.join(b, off, off+int64(len(b)), depth, treeSize/self.branches, self.chunk, &wg, errC, quitC)
+ go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
go func() {
wg.Wait()
close(errC)
@@ -394,35 +456,31 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
err = <-errC
if err != nil {
+ log.Error("lazychunkreader.readat.errc", "err", err)
close(quitC)
-
return 0, err
}
if off+int64(len(b)) >= size {
- return len(b), io.EOF
+ return int(size - off), io.EOF
}
return len(b), nil
}
-func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
+func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
defer parentWg.Done()
- // return NewDPA(&LocalStore{})
-
- // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
-
// find appropriate block level
- for chunk.Size < treeSize && depth > 0 {
- treeSize /= self.branches
+ for chunkData.Size() < treeSize && depth > r.depth {
+ treeSize /= r.branches
depth--
}
// leaf chunk found
- if depth == 0 {
- extra := 8 + eoff - int64(len(chunk.SData))
+ if depth == r.depth {
+ extra := 8 + eoff - int64(len(chunkData))
if extra > 0 {
eoff -= extra
}
- copy(b, chunk.SData[8+off:8+eoff])
+ copy(b, chunkData[8+off:8+eoff])
return // simply give back the chunks reader for content chunks
}
@@ -430,9 +488,14 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
start := off / treeSize
end := (eoff + treeSize - 1) / treeSize
+ // last non-leaf chunk can be shorter than default chunk size, let's not read it further then its end
+ currentBranches := int64(len(chunkData)-8) / r.hashSize
+ if end > currentBranches {
+ end = currentBranches
+ }
+
wg := &sync.WaitGroup{}
defer wg.Wait()
-
for i := start; i < end; i++ {
soff := i * treeSize
roff := soff
@@ -449,11 +512,19 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
}
wg.Add(1)
go func(j int64) {
- childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize]
- chunk := retrieve(childKey, self.chunkC, quitC)
- if chunk == nil {
+ childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
+ chunkData, err := r.getter.Get(Reference(childKey))
+ if err != nil {
+ log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err)
+ select {
+ case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childKey)):
+ case <-quitC:
+ }
+ return
+ }
+ if l := len(chunkData); l < 9 {
select {
- case errC <- fmt.Errorf("chunk %v-%v not found", off, off+treeSize):
+ case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childKey), l):
case <-quitC:
}
return
@@ -461,45 +532,25 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
if soff < off {
soff = off
}
- self.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/self.branches, chunk, wg, errC, quitC)
+ r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
}(i)
} //for
}
-// the helper method submits chunks for a key to a oueue (DPA) and
-// block until they time out or arrive
-// abort if quitC is readable
-func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
- chunk := &Chunk{
- Key: key,
- C: make(chan bool), // close channel to signal data delivery
- }
- // submit chunk for retrieval
- select {
- case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally)
- case <-quitC:
- return nil
- }
- // waiting for the chunk retrieval
- select { // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
-
- case <-quitC:
- // this is how we control process leakage (quitC is closed once join is finished (after timeout))
- return nil
- case <-chunk.C: // bells are ringing, data have been delivered
- }
- if len(chunk.SData) == 0 {
- return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
-
+// Read keeps a cursor so cannot be called simulateously, see ReadAt
+func (r *LazyChunkReader) Read(b []byte) (read int, err error) {
+ log.Debug("lazychunkreader.read", "key", r.key)
+ metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1)
+
+ read, err = r.ReadAt(b, r.off)
+ if err != nil && err != io.EOF {
+ log.Error("lazychunkreader.readat", "read", read, "err", err)
+ metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1)
}
- return chunk
-}
-// Read keeps a cursor so cannot be called simulateously, see ReadAt
-func (self *LazyChunkReader) Read(b []byte) (read int, err error) {
- read, err = self.ReadAt(b, self.off)
+ metrics.GetOrRegisterCounter("lazychunkreader.read.bytes", nil).Inc(int64(read))
- self.off += int64(read)
+ r.off += int64(read)
return
}
@@ -507,27 +558,28 @@ func (self *LazyChunkReader) Read(b []byte) (read int, err error) {
var errWhence = errors.New("Seek: invalid whence")
var errOffset = errors.New("Seek: invalid offset")
-func (s *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
+func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
+ log.Debug("lazychunkreader.seek", "key", r.key, "offset", offset)
switch whence {
default:
return 0, errWhence
case 0:
offset += 0
case 1:
- offset += s.off
+ offset += r.off
case 2:
- if s.chunk == nil { //seek from the end requires rootchunk for size. call Size first
- _, err := s.Size(nil)
+ if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first
+ _, err := r.Size(nil)
if err != nil {
return 0, fmt.Errorf("can't get size: %v", err)
}
}
- offset += s.chunk.Size
+ offset += r.chunkData.Size()
}
if offset < 0 {
return 0, errOffset
}
- s.off = offset
+ r.off = offset
return offset, nil
}