diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm/storage | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.bz2 dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.lz dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.xz dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/storage')
27 files changed, 1765 insertions, 1322 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 6d805b8e2..40292e88f 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -22,10 +22,9 @@ import ( "fmt" "io" "sync" - "time" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" @@ -67,7 +66,6 @@ The hashing itself does use extra copies and allocation though, since it does ne var ( errAppendOppNotSuported = errors.New("Append operation not supported") - errOperationTimedOut = errors.New("operation timed out") ) type ChunkerParams struct { @@ -133,7 +131,7 @@ type TreeChunker struct { func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader { jp := &JoinerParams{ ChunkerParams: ChunkerParams{ - chunkSize: chunk.DefaultSize, + chunkSize: ch.DefaultSize, hashSize: int64(len(addr)), }, addr: addr, @@ -153,7 +151,7 @@ func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) ( tsp := &TreeSplitterParams{ SplitterParams: SplitterParams{ ChunkerParams: ChunkerParams{ - chunkSize: chunk.DefaultSize, + chunkSize: ch.DefaultSize, hashSize: putter.RefSize(), }, reader: data, @@ -201,11 +199,6 @@ func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker { return tc } -// String() for pretty printing -func (c *Chunk) String() string { - return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", c.Addr.Log(), c.Size, len(c.SData)) -} - type hashJob struct { key Address chunk []byte @@ -236,7 +229,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. panic("chunker must be initialised") } - tc.runWorker() + tc.runWorker(ctx) depth := 0 treeSize := tc.chunkSize @@ -251,7 +244,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. // this waitgroup member is released after the root hash is calculated tc.wg.Add(1) //launch actual recursive function passing the waitgroups - go tc.split(depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) + go tc.split(ctx, depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -267,14 +260,14 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. if err != nil { return nil, nil, err } - case <-time.NewTimer(splitTimeout).C: - return nil, nil, errOperationTimedOut + case <-ctx.Done(): + return nil, nil, ctx.Err() } return key, tc.putter.Wait, nil } -func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { +func (tc *TreeChunker) split(ctx context.Context, depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { // @@ -321,10 +314,10 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 secSize = treeSize } // the hash of that data - subTreeKey := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] + subTreeAddress := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] childrenWg.Add(1) - tc.split(depth-1, treeSize/tc.branches, subTreeKey, secSize, childrenWg) + tc.split(ctx, depth-1, treeSize/tc.branches, subTreeAddress, secSize, childrenWg) i++ pos += treeSize @@ -336,7 +329,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 worker := tc.getWorkerCount() if int64(len(tc.jobC)) > worker && worker < ChunkProcessors { - tc.runWorker() + tc.runWorker(ctx) } select { @@ -345,7 +338,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 } } -func (tc *TreeChunker) runWorker() { +func (tc *TreeChunker) runWorker(ctx context.Context) { tc.incrementWorkerCount() go func() { defer tc.decrementWorkerCount() @@ -357,7 +350,7 @@ func (tc *TreeChunker) runWorker() { return } - h, err := tc.putter.Put(tc.ctx, job.chunk) + h, err := tc.putter.Put(ctx, job.chunk) if err != nil { tc.errC <- err return @@ -377,8 +370,8 @@ func (tc *TreeChunker) Append() (Address, func(), error) { // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { - Ctx context.Context - key Address // root key + ctx context.Context + addr Address // root address chunkData ChunkData off int64 // offset chunkSize int64 // inherit from chunker @@ -390,18 +383,18 @@ type LazyChunkReader struct { func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader { return &LazyChunkReader{ - key: tc.addr, + addr: tc.addr, chunkSize: tc.chunkSize, branches: tc.branches, hashSize: tc.hashSize, depth: tc.depth, getter: tc.getter, - Ctx: tc.ctx, + ctx: tc.ctx, } } func (r *LazyChunkReader) Context() context.Context { - return r.Ctx + return r.ctx } // Size is meant to be called on the LazySectionReader @@ -415,23 +408,24 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e "lcr.size") defer sp.Finish() - log.Debug("lazychunkreader.size", "key", r.key) + log.Debug("lazychunkreader.size", "addr", r.addr) if r.chunkData == nil { - chunkData, err := r.getter.Get(cctx, Reference(r.key)) + chunkData, err := r.getter.Get(cctx, Reference(r.addr)) if err != nil { return 0, err } - if chunkData == nil { - select { - case <-quitC: - return 0, errors.New("aborted") - default: - return 0, fmt.Errorf("root chunk not found for %v", r.key.Hex()) - } - } r.chunkData = chunkData + s := r.chunkData.Size() + log.Debug("lazychunkreader.size", "key", r.addr, "size", s) + if s < 0 { + return 0, errors.New("corrupt size") + } + return int64(s), nil } - return r.chunkData.Size(), nil + s := r.chunkData.Size() + log.Debug("lazychunkreader.size", "key", r.addr, "size", s) + + return int64(s), nil } // read at can be called numerous times @@ -443,7 +437,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { var sp opentracing.Span var cctx context.Context cctx, sp = spancontext.StartSpan( - r.Ctx, + r.ctx, "lcr.read") defer sp.Finish() @@ -460,7 +454,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { quitC := make(chan bool) size, err := r.Size(cctx, quitC) if err != nil { - log.Error("lazychunkreader.readat.size", "size", size, "err", err) + log.Debug("lazychunkreader.readat.size", "size", size, "err", err) return 0, err } @@ -481,7 +475,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -489,20 +483,22 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { err = <-errC if err != nil { - log.Error("lazychunkreader.readat.errc", "err", err) + log.Debug("lazychunkreader.readat.errc", "err", err) close(quitC) return 0, err } if off+int64(len(b)) >= size { + log.Debug("lazychunkreader.readat.return at end", "size", size, "off", off) return int(size - off), io.EOF } + log.Debug("lazychunkreader.readat.errc", "buff", len(b)) return len(b), nil } -func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level - for chunkData.Size() < treeSize && depth > r.depth { + for chunkData.Size() < uint64(treeSize) && depth > r.depth { treeSize /= r.branches depth-- } @@ -545,19 +541,19 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in } wg.Add(1) go func(j int64) { - childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] - chunkData, err := r.getter.Get(ctx, Reference(childKey)) + childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) if err != nil { - log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) + log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { - case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childKey)): + case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): case <-quitC: } return } if l := len(chunkData); l < 9 { select { - case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childKey), l): + case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l): case <-quitC: } return @@ -565,26 +561,26 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in if soff < off { soff = off } - r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } // Read keeps a cursor so cannot be called simulateously, see ReadAt func (r *LazyChunkReader) Read(b []byte) (read int, err error) { - log.Debug("lazychunkreader.read", "key", r.key) + log.Debug("lazychunkreader.read", "key", r.addr) metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1) read, err = r.ReadAt(b, r.off) if err != nil && err != io.EOF { - log.Error("lazychunkreader.readat", "read", read, "err", err) + log.Debug("lazychunkreader.readat", "read", read, "err", err) metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1) } metrics.GetOrRegisterCounter("lazychunkreader.read.bytes", nil).Inc(int64(read)) r.off += int64(read) - return + return read, err } // completely analogous to standard SectionReader implementation @@ -592,7 +588,7 @@ var errWhence = errors.New("Seek: invalid whence") var errOffset = errors.New("Seek: invalid offset") func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { - log.Debug("lazychunkreader.seek", "key", r.key, "offset", offset) + log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset) switch whence { default: return 0, errWhence @@ -607,7 +603,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { return 0, fmt.Errorf("can't get size: %v", err) } } - offset += r.chunkData.Size() + offset += int64(r.chunkData.Size()) } if offset < 0 { diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index dbcc8700d..db719ca04 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -21,7 +21,6 @@ import ( "context" "crypto/rand" "encoding/binary" - "errors" "fmt" "io" "testing" @@ -43,27 +42,8 @@ type chunkerTester struct { t test } -// fakeChunkStore doesn't store anything, just implements the ChunkStore interface -// It can be used to inject into a hasherStore if you don't want to actually store data just do the -// hashing -type fakeChunkStore struct { -} - -// Put doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Put(context.Context, *Chunk) { -} - -// Gut doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Get(context.Context, Address) (*Chunk, error) { - return nil, errors.New("FakeChunkStore doesn't support Get") -} - -// Close doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Close() { -} - -func newTestHasherStore(chunkStore ChunkStore, hash string) *hasherStore { - return NewHasherStore(chunkStore, MakeHashFunc(hash), false) +func newTestHasherStore(store ChunkStore, hash string) *hasherStore { + return NewHasherStore(store, MakeHashFunc(hash), false) } func testRandomBrokenData(n int, tester *chunkerTester) { @@ -82,11 +62,12 @@ func testRandomBrokenData(n int, tester *chunkerTester) { putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash) expectedError := fmt.Errorf("Broken reader") - addr, _, err := TreeSplit(context.TODO(), brokendata, int64(n), putGetter) + ctx := context.Background() + key, _, err := TreeSplit(ctx, brokendata, int64(n), putGetter) if err == nil || err.Error() != expectedError.Error() { tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err) } - tester.t.Logf(" Key = %v\n", addr) + tester.t.Logf(" Address = %v\n", key) } func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) Address { @@ -96,7 +77,7 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) input, found := tester.inputs[uint64(n)] var data io.Reader if !found { - data, input = generateRandomData(n) + data, input = GenerateRandomData(n) tester.inputs[uint64(n)] = input } else { data = io.LimitReader(bytes.NewReader(input), int64(n)) @@ -116,13 +97,13 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) if err != nil { tester.t.Fatalf(err.Error()) } - tester.t.Logf(" Key = %v\n", addr) + tester.t.Logf(" Address = %v\n", addr) err = wait(ctx) if err != nil { tester.t.Fatalf(err.Error()) } - reader := TreeJoin(context.TODO(), addr, putGetter, 0) + reader := TreeJoin(ctx, addr, putGetter, 0) output := make([]byte, n) r, err := reader.Read(output) if r != n || err != io.EOF { @@ -196,14 +177,14 @@ func TestDataAppend(t *testing.T) { input, found := tester.inputs[uint64(n)] var data io.Reader if !found { - data, input = generateRandomData(n) + data, input = GenerateRandomData(n) tester.inputs[uint64(n)] = input } else { data = io.LimitReader(bytes.NewReader(input), int64(n)) } - chunkStore := NewMapChunkStore() - putGetter := newTestHasherStore(chunkStore, SHA3Hash) + store := NewMapChunkStore() + putGetter := newTestHasherStore(store, SHA3Hash) ctx := context.TODO() addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) @@ -214,18 +195,17 @@ func TestDataAppend(t *testing.T) { if err != nil { tester.t.Fatalf(err.Error()) } - //create a append data stream appendInput, found := tester.inputs[uint64(m)] var appendData io.Reader if !found { - appendData, appendInput = generateRandomData(m) + appendData, appendInput = GenerateRandomData(m) tester.inputs[uint64(m)] = appendInput } else { appendData = io.LimitReader(bytes.NewReader(appendInput), int64(m)) } - putGetter = newTestHasherStore(chunkStore, SHA3Hash) + putGetter = newTestHasherStore(store, SHA3Hash) newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter) if err != nil { tester.t.Fatalf(err.Error()) @@ -256,18 +236,18 @@ func TestRandomData(t *testing.T) { tester := &chunkerTester{t: t} for _, s := range sizes { - treeChunkerKey := testRandomData(false, SHA3Hash, s, tester) - pyramidChunkerKey := testRandomData(true, SHA3Hash, s, tester) - if treeChunkerKey.String() != pyramidChunkerKey.String() { - tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) + treeChunkerAddress := testRandomData(false, SHA3Hash, s, tester) + pyramidChunkerAddress := testRandomData(true, SHA3Hash, s, tester) + if treeChunkerAddress.String() != pyramidChunkerAddress.String() { + tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerAddress.String(), pyramidChunkerAddress.String()) } } for _, s := range sizes { - treeChunkerKey := testRandomData(false, BMTHash, s, tester) - pyramidChunkerKey := testRandomData(true, BMTHash, s, tester) - if treeChunkerKey.String() != pyramidChunkerKey.String() { - tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) + treeChunkerAddress := testRandomData(false, BMTHash, s, tester) + pyramidChunkerAddress := testRandomData(true, BMTHash, s, tester) + if treeChunkerAddress.String() != pyramidChunkerAddress.String() { + tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerAddress.String(), pyramidChunkerAddress.String()) } } } @@ -312,12 +292,18 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash) + putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash) - _, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter) + ctx := context.Background() + _, wait, err := TreeSplit(ctx, data, int64(n), putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } + } } @@ -325,36 +311,50 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash) + putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash) - _, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter) + ctx := context.Background() + _, wait, err := TreeSplit(ctx, data, int64(n), putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } } } -func benchmarkSplitPyramidSHA3(n int, t *testing.B) { +func benchmarkSplitPyramidBMT(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash) + putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash) - _, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter) + ctx := context.Background() + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } - } } -func benchmarkSplitPyramidBMT(n int, t *testing.B) { +func benchmarkSplitPyramidSHA3(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash) + putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash) - _, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter) + ctx := context.Background() + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } @@ -367,10 +367,10 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { data := testDataReader(n) data1 := testDataReader(m) - chunkStore := NewMapChunkStore() - putGetter := newTestHasherStore(chunkStore, SHA3Hash) + store := NewMapChunkStore() + putGetter := newTestHasherStore(store, SHA3Hash) - ctx := context.TODO() + ctx := context.Background() key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) if err != nil { t.Fatalf(err.Error()) @@ -380,7 +380,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { t.Fatalf(err.Error()) } - putGetter = newTestHasherStore(chunkStore, SHA3Hash) + putGetter = newTestHasherStore(store, SHA3Hash) _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter) if err != nil { t.Fatalf(err.Error()) diff --git a/swarm/storage/chunkstore.go b/swarm/storage/chunkstore.go deleted file mode 100644 index 3b4d97a7a..000000000 --- a/swarm/storage/chunkstore.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package storage - -import ( - "context" - "sync" -) - -/* -ChunkStore interface is implemented by : - -- MemStore: a memory cache -- DbStore: local disk/db store -- LocalStore: a combination (sequence of) memStore and dbStore -- NetStore: cloud storage abstraction layer -- FakeChunkStore: dummy store which doesn't store anything just implements the interface -*/ -type ChunkStore interface { - Put(context.Context, *Chunk) // effectively there is no error even if there is an error - Get(context.Context, Address) (*Chunk, error) - Close() -} - -// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory. -type MapChunkStore struct { - chunks map[string]*Chunk - mu sync.RWMutex -} - -func NewMapChunkStore() *MapChunkStore { - return &MapChunkStore{ - chunks: make(map[string]*Chunk), - } -} - -func (m *MapChunkStore) Put(ctx context.Context, chunk *Chunk) { - m.mu.Lock() - defer m.mu.Unlock() - m.chunks[chunk.Addr.Hex()] = chunk - chunk.markAsStored() -} - -func (m *MapChunkStore) Get(ctx context.Context, addr Address) (*Chunk, error) { - m.mu.RLock() - defer m.mu.RUnlock() - chunk := m.chunks[addr.Hex()] - if chunk == nil { - return nil, ErrChunkNotFound - } - return chunk, nil -} - -func (m *MapChunkStore) Close() { -} diff --git a/swarm/storage/common.go b/swarm/storage/common.go deleted file mode 100644 index d6352820e..000000000 --- a/swarm/storage/common.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -package storage - -import ( - "context" - "sync" - - "github.com/ethereum/go-ethereum/swarm/log" -) - -// PutChunks adds chunks to localstore -// It waits for receive on the stored channel -// It logs but does not fail on delivery error -func PutChunks(store *LocalStore, chunks ...*Chunk) { - wg := sync.WaitGroup{} - wg.Add(len(chunks)) - go func() { - for _, c := range chunks { - <-c.dbStoredC - if err := c.GetErrored(); err != nil { - log.Error("chunk store fail", "err", err, "key", c.Addr) - } - wg.Done() - } - }() - for _, c := range chunks { - go store.Put(context.TODO(), c) - } - wg.Wait() -} diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index dc1a3ab35..33133edd7 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -23,16 +23,20 @@ import ( "flag" "fmt" "io" + "io/ioutil" + "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/log" + ch "github.com/ethereum/go-ethereum/swarm/chunk" colorable "github.com/mattn/go-colorable" ) var ( - loglevel = flag.Int("loglevel", 3, "verbosity of logs") + loglevel = flag.Int("loglevel", 3, "verbosity of logs") + getTimeout = 30 * time.Second ) func init() { @@ -56,47 +60,73 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader } } -func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) { - return mput(store, processors, n, GenerateRandomChunk) +func newLDBStore(t *testing.T) (*LDBStore, func()) { + dir, err := ioutil.TempDir("", "bzz-storage-test") + if err != nil { + t.Fatal(err) + } + log.Trace("memstore.tempdir", "dir", dir) + + ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir) + db, err := NewLDBStore(ldbparams) + if err != nil { + t.Fatal(err) + } + + cleanup := func() { + db.Close() + err := os.RemoveAll(dir) + if err != nil { + t.Fatal(err) + } + } + + return db, cleanup } -func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) { - wg := sync.WaitGroup{} - wg.Add(processors) - c := make(chan *Chunk) - for i := 0; i < processors; i++ { +func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) { + return mput(store, n, GenerateRandomChunk) +} + +func mputChunks(store ChunkStore, chunks ...Chunk) error { + i := 0 + f := func(n int64) Chunk { + chunk := chunks[i] + i++ + return chunk + } + _, err := mput(store, len(chunks), f) + return err +} + +func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) { + // put to localstore and wait for stored channel + // does not check delivery error state + errc := make(chan error) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for i := int64(0); i < int64(n); i++ { + chunk := f(ch.DefaultSize) go func() { - defer wg.Done() - for chunk := range c { - wg.Add(1) - chunk := chunk - store.Put(context.TODO(), chunk) - go func() { - defer wg.Done() - <-chunk.dbStoredC - }() + select { + case errc <- store.Put(ctx, chunk): + case <-ctx.Done(): } }() + hs = append(hs, chunk) } - fa := f - if _, ok := store.(*MemStore); ok { - fa = func(i int64) *Chunk { - chunk := f(i) - chunk.markAsStored() - return chunk - } - } + + // wait for all chunks to be stored for i := 0; i < n; i++ { - chunk := fa(int64(i)) - hs = append(hs, chunk.Addr) - c <- chunk + err := <-errc + if err != nil { + return nil, err + } } - close(c) - wg.Wait() - return hs + return hs, nil } -func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error { +func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error) error { wg := sync.WaitGroup{} wg.Add(len(hs)) errc := make(chan error) @@ -104,6 +134,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) for _, k := range hs { go func(h Address) { defer wg.Done() + // TODO: write timeout with context chunk, err := store.Get(context.TODO(), h) if err != nil { errc <- err @@ -143,57 +174,54 @@ func (r *brokenLimitedReader) Read(buf []byte) (int, error) { return r.lr.Read(buf) } -func generateRandomData(l int) (r io.Reader, slice []byte) { - slice = make([]byte, l) - if _, err := rand.Read(slice); err != nil { - panic("rand error") +func testStoreRandom(m ChunkStore, n int, chunksize int64, t *testing.T) { + chunks, err := mputRandomChunks(m, n, chunksize) + if err != nil { + t.Fatalf("expected no error, got %v", err) } - r = io.LimitReader(bytes.NewReader(slice), int64(l)) - return -} - -func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { - hs := mputRandomChunks(m, processors, n, chunksize) - err := mget(m, hs, nil) + err = mget(m, chunkAddresses(chunks), nil) if err != nil { t.Fatalf("testStore failed: %v", err) } } -func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { - hs := mputRandomChunks(m, processors, n, chunksize) - f := func(h Address, chunk *Chunk) error { - if !bytes.Equal(h, chunk.Addr) { - return fmt.Errorf("key does not match retrieved chunk Key") +func testStoreCorrect(m ChunkStore, n int, chunksize int64, t *testing.T) { + chunks, err := mputRandomChunks(m, n, chunksize) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + f := func(h Address, chunk Chunk) error { + if !bytes.Equal(h, chunk.Address()) { + return fmt.Errorf("key does not match retrieved chunk Address") } hasher := MakeHashFunc(DefaultHash)() - hasher.ResetWithLength(chunk.SData[:8]) - hasher.Write(chunk.SData[8:]) + hasher.ResetWithLength(chunk.SpanBytes()) + hasher.Write(chunk.Payload()) exp := hasher.Sum(nil) if !bytes.Equal(h, exp) { return fmt.Errorf("key is not hash of chunk data") } return nil } - err := mget(m, hs, f) + err = mget(m, chunkAddresses(chunks), f) if err != nil { t.Fatalf("testStore failed: %v", err) } } -func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { - chunks := make([]*Chunk, n) +func benchmarkStorePut(store ChunkStore, n int, chunksize int64, b *testing.B) { + chunks := make([]Chunk, n) i := 0 - f := func(dataSize int64) *Chunk { + f := func(dataSize int64) Chunk { chunk := GenerateRandomChunk(dataSize) chunks[i] = chunk i++ return chunk } - mput(store, processors, n, f) + mput(store, n, f) - f = func(dataSize int64) *Chunk { + f = func(dataSize int64) Chunk { chunk := chunks[i] i++ return chunk @@ -204,18 +232,62 @@ func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, for j := 0; j < b.N; j++ { i = 0 - mput(store, processors, n, f) + mput(store, n, f) } } -func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { - hs := mputRandomChunks(store, processors, n, chunksize) +func benchmarkStoreGet(store ChunkStore, n int, chunksize int64, b *testing.B) { + chunks, err := mputRandomChunks(store, n, chunksize) + if err != nil { + b.Fatalf("expected no error, got %v", err) + } b.ReportAllocs() b.ResetTimer() + addrs := chunkAddresses(chunks) for i := 0; i < b.N; i++ { - err := mget(store, hs, nil) + err := mget(store, addrs, nil) if err != nil { b.Fatalf("mget failed: %v", err) } } } + +// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory. +type MapChunkStore struct { + chunks map[string]Chunk + mu sync.RWMutex +} + +func NewMapChunkStore() *MapChunkStore { + return &MapChunkStore{ + chunks: make(map[string]Chunk), + } +} + +func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error { + m.mu.Lock() + defer m.mu.Unlock() + m.chunks[ch.Address().Hex()] = ch + return nil +} + +func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) { + m.mu.RLock() + defer m.mu.RUnlock() + chunk := m.chunks[ref.Hex()] + if chunk == nil { + return nil, ErrChunkNotFound + } + return chunk, nil +} + +func (m *MapChunkStore) Close() { +} + +func chunkAddresses(chunks []Chunk) []Address { + addrs := make([]Address, len(chunks)) + for i, ch := range chunks { + addrs[i] = ch.Address() + } + return addrs +} diff --git a/swarm/storage/dbapi.go b/swarm/storage/dbapi.go deleted file mode 100644 index dd71752eb..000000000 --- a/swarm/storage/dbapi.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package storage - -import "context" - -// wrapper of db-s to provide mockable custom local chunk store access to syncer -type DBAPI struct { - db *LDBStore - loc *LocalStore -} - -func NewDBAPI(loc *LocalStore) *DBAPI { - return &DBAPI{loc.DbStore, loc} -} - -// to obtain the chunks from address or request db entry only -func (d *DBAPI) Get(ctx context.Context, addr Address) (*Chunk, error) { - return d.loc.Get(ctx, addr) -} - -// current storage counter of chunk db -func (d *DBAPI) CurrentBucketStorageIndex(po uint8) uint64 { - return d.db.CurrentBucketStorageIndex(po) -} - -// iteration storage counter and proximity order -func (d *DBAPI) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { - return d.db.SyncIterator(from, to, po, f) -} - -// to obtain the chunks from address or request db entry only -func (d *DBAPI) GetOrCreateRequest(ctx context.Context, addr Address) (*Chunk, bool) { - return d.loc.GetOrCreateRequest(ctx, addr) -} - -// to obtain the chunks from key or request db entry only -func (d *DBAPI) Put(ctx context.Context, chunk *Chunk) { - d.loc.Put(ctx, chunk) -} diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go index f3f597255..d79efb530 100644 --- a/swarm/storage/filestore_test.go +++ b/swarm/storage/filestore_test.go @@ -49,11 +49,11 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { fileStore := NewFileStore(localStore, NewFileStoreParams()) defer os.RemoveAll("/tmp/bzz") - reader, slice := generateRandomData(testDataSize) + reader, slice := GenerateRandomData(testDataSize) ctx := context.TODO() key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt) if err != nil { - t.Errorf("Store error: %v", err) + t.Fatalf("Store error: %v", err) } err = wait(ctx) if err != nil { @@ -66,13 +66,13 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { resultSlice := make([]byte, len(slice)) n, err := resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error: %v", err) + t.Fatalf("Retrieve error: %v", err) } if n != len(slice) { - t.Errorf("Slice size error got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error.") + t.Fatalf("Comparison error.") } ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666) ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666) @@ -86,13 +86,13 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { } n, err = resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error after removing memStore: %v", err) + t.Fatalf("Retrieve error after removing memStore: %v", err) } if n != len(slice) { - t.Errorf("Slice size error after removing memStore got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error after removing memStore got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error after removing memStore.") + t.Fatalf("Comparison error after removing memStore.") } } @@ -114,7 +114,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { DbStore: db, } fileStore := NewFileStore(localStore, NewFileStoreParams()) - reader, slice := generateRandomData(testDataSize) + reader, slice := GenerateRandomData(testDataSize) ctx := context.TODO() key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt) if err != nil { @@ -122,7 +122,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { } err = wait(ctx) if err != nil { - t.Errorf("Store error: %v", err) + t.Fatalf("Store error: %v", err) } resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key) if isEncrypted != toEncrypt { @@ -131,13 +131,13 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { resultSlice := make([]byte, len(slice)) n, err := resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error: %v", err) + t.Fatalf("Retrieve error: %v", err) } if n != len(slice) { - t.Errorf("Slice size error got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error.") + t.Fatalf("Comparison error.") } // Clear memStore memStore.setCapacity(0) @@ -148,7 +148,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted) } if _, err = resultReader.ReadAt(resultSlice, 0); err == nil { - t.Errorf("Was able to read %d bytes from an empty memStore.", len(slice)) + t.Fatalf("Was able to read %d bytes from an empty memStore.", len(slice)) } // check how it works with localStore fileStore.ChunkStore = localStore @@ -162,12 +162,12 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { } n, err = resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error after clearing memStore: %v", err) + t.Fatalf("Retrieve error after clearing memStore: %v", err) } if n != len(slice) { - t.Errorf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error after clearing memStore.") + t.Fatalf("Comparison error after clearing memStore.") } } diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index 766207eae..879622b9a 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -19,10 +19,10 @@ package storage import ( "context" "fmt" - "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/crypto/sha3" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage/encryption" ) @@ -30,31 +30,36 @@ type hasherStore struct { store ChunkStore toEncrypt bool hashFunc SwarmHasher - hashSize int // content hash size - refSize int64 // reference size (content hash + possibly encryption key) - wg *sync.WaitGroup - closed chan struct{} + hashSize int // content hash size + refSize int64 // reference size (content hash + possibly encryption key) + nrChunks uint64 // number of chunks to store + errC chan error // global error channel + doneC chan struct{} // closed by Close() call to indicate that count is the final number of chunks + quitC chan struct{} // closed to quit unterminated routines } // NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. // With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore // and the hasherStore will take core of encryption/decryption of data if necessary -func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { +func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { hashSize := hashFunc().Size() refSize := int64(hashSize) if toEncrypt { refSize += encryption.KeyLength } - return &hasherStore{ - store: chunkStore, + h := &hasherStore{ + store: store, toEncrypt: toEncrypt, hashFunc: hashFunc, hashSize: hashSize, refSize: refSize, - wg: &sync.WaitGroup{}, - closed: make(chan struct{}), + errC: make(chan error), + doneC: make(chan struct{}), + quitC: make(chan struct{}), } + + return h } // Put stores the chunkData into the ChunkStore of the hasherStore and returns the reference. @@ -62,7 +67,6 @@ func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) // Asynchronous function, the data will not necessarily be stored when it returns. func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, error) { c := chunkData - size := chunkData.Size() var encryptionKey encryption.Key if h.toEncrypt { var err error @@ -71,29 +75,28 @@ func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, return nil, err } } - chunk := h.createChunk(c, size) - + chunk := h.createChunk(c) h.storeChunk(ctx, chunk) - return Reference(append(chunk.Addr, encryptionKey...)), nil + return Reference(append(chunk.Address(), encryptionKey...)), nil } // Get returns data of the chunk with the given reference (retrieved from the ChunkStore of hasherStore). // If the data is encrypted and the reference contains an encryption key, it will be decrypted before // return. func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) { - key, encryptionKey, err := parseReference(ref, h.hashSize) + addr, encryptionKey, err := parseReference(ref, h.hashSize) if err != nil { return nil, err } - toDecrypt := (encryptionKey != nil) - chunk, err := h.store.Get(ctx, key) + chunk, err := h.store.Get(ctx, addr) if err != nil { return nil, err } - chunkData := chunk.SData + chunkData := ChunkData(chunk.Data()) + toDecrypt := (encryptionKey != nil) if toDecrypt { var err error chunkData, err = h.decryptChunkData(chunkData, encryptionKey) @@ -107,16 +110,40 @@ func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) // Close indicates that no more chunks will be put with the hasherStore, so the Wait // function can return when all the previously put chunks has been stored. func (h *hasherStore) Close() { - close(h.closed) + close(h.doneC) } // Wait returns when // 1) the Close() function has been called and // 2) all the chunks which has been Put has been stored func (h *hasherStore) Wait(ctx context.Context) error { - <-h.closed - h.wg.Wait() - return nil + defer close(h.quitC) + var nrStoredChunks uint64 // number of stored chunks + var done bool + doneC := h.doneC + for { + select { + // if context is done earlier, just return with the error + case <-ctx.Done(): + return ctx.Err() + // doneC is closed if all chunks have been submitted, from then we just wait until all of them are also stored + case <-doneC: + done = true + doneC = nil + // a chunk has been stored, if err is nil, then successfully, so increase the stored chunk counter + case err := <-h.errC: + if err != nil { + return err + } + nrStoredChunks++ + } + // if all the chunks have been submitted and all of them are stored, then we can return + if done { + if nrStoredChunks >= atomic.LoadUint64(&h.nrChunks) { + return nil + } + } + } } func (h *hasherStore) createHash(chunkData ChunkData) Address { @@ -126,12 +153,9 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address { return hasher.Sum(nil) } -func (h *hasherStore) createChunk(chunkData ChunkData, chunkSize int64) *Chunk { +func (h *hasherStore) createChunk(chunkData ChunkData) *chunk { hash := h.createHash(chunkData) - chunk := NewChunk(hash, nil) - chunk.SData = chunkData - chunk.Size = chunkSize - + chunk := NewChunk(hash, chunkData) return chunk } @@ -162,10 +186,10 @@ func (h *hasherStore) decryptChunkData(chunkData ChunkData, encryptionKey encryp // removing extra bytes which were just added for padding length := ChunkData(decryptedSpan).Size() - for length > chunk.DefaultSize { - length = length + (chunk.DefaultSize - 1) - length = length / chunk.DefaultSize - length *= h.refSize + for length > ch.DefaultSize { + length = length + (ch.DefaultSize - 1) + length = length / ch.DefaultSize + length *= uint64(h.refSize) } c := make(ChunkData, length+8) @@ -205,32 +229,32 @@ func (h *hasherStore) decrypt(chunkData ChunkData, key encryption.Key) ([]byte, } func (h *hasherStore) newSpanEncryption(key encryption.Key) encryption.Encryption { - return encryption.New(key, 0, uint32(chunk.DefaultSize/h.refSize), sha3.NewKeccak256) + return encryption.New(key, 0, uint32(ch.DefaultSize/h.refSize), sha3.NewKeccak256) } func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryption { - return encryption.New(key, int(chunk.DefaultSize), 0, sha3.NewKeccak256) + return encryption.New(key, int(ch.DefaultSize), 0, sha3.NewKeccak256) } -func (h *hasherStore) storeChunk(ctx context.Context, chunk *Chunk) { - h.wg.Add(1) +func (h *hasherStore) storeChunk(ctx context.Context, chunk *chunk) { + atomic.AddUint64(&h.nrChunks, 1) go func() { - <-chunk.dbStoredC - h.wg.Done() + select { + case h.errC <- h.store.Put(ctx, chunk): + case <-h.quitC: + } }() - h.store.Put(ctx, chunk) } func parseReference(ref Reference, hashSize int) (Address, encryption.Key, error) { - encryptedKeyLength := hashSize + encryption.KeyLength + encryptedRefLength := hashSize + encryption.KeyLength switch len(ref) { - case KeyLength: + case AddressLength: return Address(ref), nil, nil - case encryptedKeyLength: + case encryptedRefLength: encKeyIdx := len(ref) - encryption.KeyLength return Address(ref[:encKeyIdx]), encryption.Key(ref[encKeyIdx:]), nil default: - return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedKeyLength, len(ref)) + return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedRefLength, len(ref)) } - } diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go index ddf1c39b0..22cf98d0e 100644 --- a/swarm/storage/hasherstore_test.go +++ b/swarm/storage/hasherstore_test.go @@ -46,14 +46,16 @@ func TestHasherStore(t *testing.T) { hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt) // Put two random chunks into the hasherStore - chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key1, err := hasherStore.Put(context.TODO(), chunkData1) + chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data() + ctx, cancel := context.WithTimeout(context.Background(), getTimeout) + defer cancel() + key1, err := hasherStore.Put(ctx, chunkData1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } - chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key2, err := hasherStore.Put(context.TODO(), chunkData2) + chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).Data() + key2, err := hasherStore.Put(ctx, chunkData2) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } @@ -61,13 +63,13 @@ func TestHasherStore(t *testing.T) { hasherStore.Close() // Wait until chunks are really stored - err = hasherStore.Wait(context.TODO()) + err = hasherStore.Wait(ctx) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } // Get the first chunk - retrievedChunkData1, err := hasherStore.Get(context.TODO(), key1) + retrievedChunkData1, err := hasherStore.Get(ctx, key1) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -78,7 +80,7 @@ func TestHasherStore(t *testing.T) { } // Get the second chunk - retrievedChunkData2, err := hasherStore.Get(context.TODO(), key2) + retrievedChunkData2, err := hasherStore.Get(ctx, key2) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -105,12 +107,12 @@ func TestHasherStore(t *testing.T) { } // Check if chunk data in store is encrypted or not - chunkInStore, err := chunkStore.Get(context.TODO(), hash1) + chunkInStore, err := chunkStore.Get(ctx, hash1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } - chunkDataInStore := chunkInStore.SData + chunkDataInStore := chunkInStore.Data() if tt.toEncrypt && bytes.Equal(chunkData1, chunkDataInStore) { t.Fatalf("Chunk expected to be encrypted but it is stored without encryption") diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 675b5de01..8ab7e60b3 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -28,6 +28,7 @@ import ( "context" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -36,7 +37,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/syndtr/goleveldb/leveldb" @@ -62,6 +63,10 @@ var ( keyDistanceCnt = byte(7) ) +var ( + ErrDBClosed = errors.New("LDBStore closed") +) + type gcItem struct { idx uint64 value uint64 @@ -99,18 +104,29 @@ type LDBStore struct { batchC chan bool batchesC chan struct{} - batch *leveldb.Batch + closed bool + batch *dbBatch lock sync.RWMutex quit chan struct{} // Functions encodeDataFunc is used to bypass // the default functionality of DbStore with // mock.NodeStore for testing purposes. - encodeDataFunc func(chunk *Chunk) []byte + encodeDataFunc func(chunk Chunk) []byte // If getDataFunc is defined, it will be used for // retrieving the chunk data instead from the local // LevelDB database. - getDataFunc func(addr Address) (data []byte, err error) + getDataFunc func(key Address) (data []byte, err error) +} + +type dbBatch struct { + *leveldb.Batch + err error + c chan struct{} +} + +func newBatch() *dbBatch { + return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})} } // TODO: Instead of passing the distance function, just pass the address from which distances are calculated @@ -121,10 +137,9 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { s.hashfunc = params.Hash s.quit = make(chan struct{}) - s.batchC = make(chan bool) s.batchesC = make(chan struct{}, 1) go s.writeBatches() - s.batch = new(leveldb.Batch) + s.batch = newBatch() // associate encodeData with default functionality s.encodeDataFunc = encodeData @@ -143,7 +158,6 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { k[1] = uint8(i) cnt, _ := s.db.Get(k) s.bucketCnt[i] = BytesToU64(cnt) - s.bucketCnt[i]++ } data, _ := s.db.Get(keyEntryCnt) s.entryCnt = BytesToU64(data) @@ -202,14 +216,6 @@ func getIndexKey(hash Address) []byte { return key } -func getOldDataKey(idx uint64) []byte { - key := make([]byte, 9) - key[0] = keyOldData - binary.BigEndian.PutUint64(key[1:9], idx) - - return key -} - func getDataKey(idx uint64, po uint8) []byte { key := make([]byte, 10) key[0] = keyData @@ -224,12 +230,12 @@ func encodeIndex(index *dpaDBIndex) []byte { return data } -func encodeData(chunk *Chunk) []byte { +func encodeData(chunk Chunk) []byte { // Always create a new underlying array for the returned byte slice. - // The chunk.Key array may be used in the returned slice which + // The chunk.Address array may be used in the returned slice which // may be changed later in the code or by the LevelDB, resulting - // that the Key is changed as well. - return append(append([]byte{}, chunk.Addr[:]...), chunk.SData...) + // that the Address is changed as well. + return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...) } func decodeIndex(data []byte, index *dpaDBIndex) error { @@ -237,14 +243,8 @@ func decodeIndex(data []byte, index *dpaDBIndex) error { return dec.Decode(index) } -func decodeData(data []byte, chunk *Chunk) { - chunk.SData = data[32:] - chunk.Size = int64(binary.BigEndian.Uint64(data[32:40])) -} - -func decodeOldData(data []byte, chunk *Chunk) { - chunk.SData = data - chunk.Size = int64(binary.BigEndian.Uint64(data[0:8])) +func decodeData(addr Address, data []byte) (*chunk, error) { + return NewChunk(addr, data[32:]), nil } func (s *LDBStore) collectGarbage(ratio float32) { @@ -347,44 +347,75 @@ func (s *LDBStore) Export(out io.Writer) (int64, error) { func (s *LDBStore) Import(in io.Reader) (int64, error) { tr := tar.NewReader(in) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + countC := make(chan int64) + errC := make(chan error) var count int64 - var wg sync.WaitGroup - for { - hdr, err := tr.Next() - if err == io.EOF { - break - } else if err != nil { - return count, err - } + go func() { + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } else if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } - if len(hdr.Name) != 64 { - log.Warn("ignoring non-chunk file", "name", hdr.Name) - continue - } + if len(hdr.Name) != 64 { + log.Warn("ignoring non-chunk file", "name", hdr.Name) + continue + } - keybytes, err := hex.DecodeString(hdr.Name) - if err != nil { - log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) - continue + keybytes, err := hex.DecodeString(hdr.Name) + if err != nil { + log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) + continue + } + + data, err := ioutil.ReadAll(tr) + if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } + key := Address(keybytes) + chunk := NewChunk(key, data[32:]) + + go func() { + select { + case errC <- s.Put(ctx, chunk): + case <-ctx.Done(): + } + }() + + count++ } + countC <- count + }() - data, err := ioutil.ReadAll(tr) - if err != nil { - return count, err + // wait for all chunks to be stored + i := int64(0) + var total int64 + for { + select { + case err := <-errC: + if err != nil { + return count, err + } + i++ + case total = <-countC: + case <-ctx.Done(): + return i, ctx.Err() + } + if total > 0 && i == total { + return total, nil } - key := Address(keybytes) - chunk := NewChunk(key, nil) - chunk.SData = data[32:] - s.Put(context.TODO(), chunk) - wg.Add(1) - go func() { - defer wg.Done() - <-chunk.dbStoredC - }() - count++ } - wg.Wait() - return count, nil } func (s *LDBStore) Cleanup() { @@ -430,15 +461,18 @@ func (s *LDBStore) Cleanup() { } } - c := &Chunk{} ck := data[:32] - decodeData(data, c) + c, err := decodeData(ck, data) + if err != nil { + log.Error("decodeData error", "err", err) + continue + } - cs := int64(binary.LittleEndian.Uint64(c.SData[:8])) - log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + cs := int64(binary.LittleEndian.Uint64(c.sdata[:8])) + log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) - if len(c.SData) > chunk.DefaultSize+8 { - log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + if len(c.sdata) > ch.DefaultSize+8 { + log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) s.delete(index.Idx, getIndexKey(key[1:]), po) removed++ errorsFound++ @@ -511,7 +545,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { batch.Delete(getDataKey(idx, po)) s.entryCnt-- dbEntryCount.Dec(1) - s.bucketCnt[po]-- cntKey := make([]byte, 2) cntKey[0] = keyDistanceCnt cntKey[1] = po @@ -520,10 +553,9 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { s.db.Write(batch) } -func (s *LDBStore) CurrentBucketStorageIndex(po uint8) uint64 { +func (s *LDBStore) BinIndex(po uint8) uint64 { s.lock.RLock() defer s.lock.RUnlock() - return s.bucketCnt[po] } @@ -539,43 +571,53 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } -func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) { +func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) - log.Trace("ldbstore.put", "key", chunk.Addr) + log.Trace("ldbstore.put", "key", chunk.Address()) - ikey := getIndexKey(chunk.Addr) + ikey := getIndexKey(chunk.Address()) var index dpaDBIndex - po := s.po(chunk.Addr) + po := s.po(chunk.Address()) + s.lock.Lock() - defer s.lock.Unlock() - log.Trace("ldbstore.put: s.db.Get", "key", chunk.Addr, "ikey", fmt.Sprintf("%x", ikey)) + if s.closed { + s.lock.Unlock() + return ErrDBClosed + } + batch := s.batch + + log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey)) idata, err := s.db.Get(ikey) if err != nil { s.doPut(chunk, &index, po) - batchC := s.batchC - go func() { - <-batchC - chunk.markAsStored() - }() } else { - log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Addr) + log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address) decodeIndex(idata, &index) - chunk.markAsStored() } index.Access = s.accessCnt s.accessCnt++ idata = encodeIndex(&index) s.batch.Put(ikey, idata) + + s.lock.Unlock() + select { case s.batchesC <- struct{}{}: default: } + + select { + case <-batch.c: + return batch.err + case <-ctx.Done(): + return ctx.Err() + } } // force putting into db, does not check access index -func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) { +func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { data := s.encodeDataFunc(chunk) dkey := getDataKey(s.dataIdx, po) s.batch.Put(dkey, data) @@ -592,58 +634,64 @@ func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) { } func (s *LDBStore) writeBatches() { -mainLoop: for { select { case <-s.quit: - break mainLoop + log.Debug("DbStore: quit batch write loop") + return case <-s.batchesC: - s.lock.Lock() - b := s.batch - e := s.entryCnt - d := s.dataIdx - a := s.accessCnt - c := s.batchC - s.batchC = make(chan bool) - s.batch = new(leveldb.Batch) - err := s.writeBatch(b, e, d, a) - // TODO: set this error on the batch, then tell the chunk + err := s.writeCurrentBatch() if err != nil { - log.Error(fmt.Sprintf("spawn batch write (%d entries): %v", b.Len(), err)) + log.Debug("DbStore: quit batch write loop", "err", err.Error()) + return } - close(c) - for e > s.capacity { - log.Trace("for >", "e", e, "s.capacity", s.capacity) - // Collect garbage in a separate goroutine - // to be able to interrupt this loop by s.quit. - done := make(chan struct{}) - go func() { - s.collectGarbage(gcArrayFreeRatio) - log.Trace("collectGarbage closing done") - close(done) - }() + } + } - select { - case <-s.quit: - s.lock.Unlock() - break mainLoop - case <-done: - } - e = s.entryCnt - } - s.lock.Unlock() +} + +func (s *LDBStore) writeCurrentBatch() error { + s.lock.Lock() + defer s.lock.Unlock() + b := s.batch + l := b.Len() + if l == 0 { + return nil + } + e := s.entryCnt + d := s.dataIdx + a := s.accessCnt + s.batch = newBatch() + b.err = s.writeBatch(b, e, d, a) + close(b.c) + for e > s.capacity { + log.Trace("for >", "e", e, "s.capacity", s.capacity) + // Collect garbage in a separate goroutine + // to be able to interrupt this loop by s.quit. + done := make(chan struct{}) + go func() { + s.collectGarbage(gcArrayFreeRatio) + log.Trace("collectGarbage closing done") + close(done) + }() + + select { + case <-s.quit: + return errors.New("CollectGarbage terminated due to quit") + case <-done: } + e = s.entryCnt } - log.Trace(fmt.Sprintf("DbStore: quit batch write loop")) + return nil } // must be called non concurrently -func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uint64) error { +func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error { b.Put(keyEntryCnt, U64ToBytes(entryCnt)) b.Put(keyDataIdx, U64ToBytes(dataIdx)) b.Put(keyAccessCnt, U64ToBytes(accessCnt)) l := b.Len() - if err := s.db.Write(b); err != nil { + if err := s.db.Write(b.Batch); err != nil { return fmt.Errorf("unable to write batch: %v", err) } log.Trace(fmt.Sprintf("batch write (%d entries)", l)) @@ -654,12 +702,12 @@ func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uin // to a mock store to bypass the default functionality encodeData. // The constructed function always returns the nil data, as DbStore does // not need to store the data, but still need to create the index. -func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk *Chunk) []byte { - return func(chunk *Chunk) []byte { - if err := mockStore.Put(chunk.Addr, encodeData(chunk)); err != nil { - log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Addr.Log(), err)) +func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { + return func(chunk Chunk) []byte { + if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil { + log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err)) } - return chunk.Addr[:] + return chunk.Address()[:] } } @@ -682,7 +730,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { return true } -func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) @@ -691,9 +739,11 @@ func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err err return s.get(addr) } -func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { var indx dpaDBIndex - + if s.closed { + return nil, ErrDBClosed + } if s.tryAccessIdx(getIndexKey(addr), &indx) { var data []byte if s.getDataFunc != nil { @@ -716,9 +766,7 @@ func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) { } } - chunk = NewChunk(addr, nil) - chunk.markAsStored() - decodeData(data, chunk) + return decodeData(addr, data) } else { err = ErrChunkNotFound } @@ -772,6 +820,12 @@ func (s *LDBStore) setCapacity(c uint64) { func (s *LDBStore) Close() { close(s.quit) + s.lock.Lock() + s.closed = true + s.lock.Unlock() + // force writing out current batch + s.writeCurrentBatch() + close(s.batchesC) s.db.Close() } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 9694a724e..ae70ee259 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -22,13 +22,12 @@ import ( "fmt" "io/ioutil" "os" - "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" ldberrors "github.com/syndtr/goleveldb/leveldb/errors" @@ -86,70 +85,54 @@ func (db *testDbStore) close() { } } -func testDbStoreRandom(n int, processors int, chunksize int64, mock bool, t *testing.T) { +func testDbStoreRandom(n int, chunksize int64, mock bool, t *testing.T) { db, cleanup, err := newTestDbStore(mock, true) defer cleanup() if err != nil { t.Fatalf("init dbStore failed: %v", err) } - testStoreRandom(db, processors, n, chunksize, t) + testStoreRandom(db, n, chunksize, t) } -func testDbStoreCorrect(n int, processors int, chunksize int64, mock bool, t *testing.T) { +func testDbStoreCorrect(n int, chunksize int64, mock bool, t *testing.T) { db, cleanup, err := newTestDbStore(mock, false) defer cleanup() if err != nil { t.Fatalf("init dbStore failed: %v", err) } - testStoreCorrect(db, processors, n, chunksize, t) + testStoreCorrect(db, n, chunksize, t) } func TestDbStoreRandom_1(t *testing.T) { - testDbStoreRandom(1, 1, 0, false, t) + testDbStoreRandom(1, 0, false, t) } func TestDbStoreCorrect_1(t *testing.T) { - testDbStoreCorrect(1, 1, 4096, false, t) + testDbStoreCorrect(1, 4096, false, t) } -func TestDbStoreRandom_1_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, false, t) +func TestDbStoreRandom_5k(t *testing.T) { + testDbStoreRandom(5000, 0, false, t) } -func TestDbStoreRandom_8_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, false, t) -} - -func TestDbStoreCorrect_1_5k(t *testing.T) { - testDbStoreCorrect(1, 5000, 4096, false, t) -} - -func TestDbStoreCorrect_8_5k(t *testing.T) { - testDbStoreCorrect(8, 5000, 4096, false, t) +func TestDbStoreCorrect_5k(t *testing.T) { + testDbStoreCorrect(5000, 4096, false, t) } func TestMockDbStoreRandom_1(t *testing.T) { - testDbStoreRandom(1, 1, 0, true, t) + testDbStoreRandom(1, 0, true, t) } func TestMockDbStoreCorrect_1(t *testing.T) { - testDbStoreCorrect(1, 1, 4096, true, t) + testDbStoreCorrect(1, 4096, true, t) } -func TestMockDbStoreRandom_1_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, true, t) +func TestMockDbStoreRandom_5k(t *testing.T) { + testDbStoreRandom(5000, 0, true, t) } -func TestMockDbStoreRandom_8_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, true, t) -} - -func TestMockDbStoreCorrect_1_5k(t *testing.T) { - testDbStoreCorrect(1, 5000, 4096, true, t) -} - -func TestMockDbStoreCorrect_8_5k(t *testing.T) { - testDbStoreCorrect(8, 5000, 4096, true, t) +func TestMockDbStoreCorrect_5k(t *testing.T) { + testDbStoreCorrect(5000, 4096, true, t) } func testDbStoreNotFound(t *testing.T, mock bool) { @@ -185,26 +168,19 @@ func testIterator(t *testing.T, mock bool) { t.Fatalf("init dbStore failed: %v", err) } - chunks := GenerateRandomChunks(chunk.DefaultSize, chunkcount) + chunks := GenerateRandomChunks(ch.DefaultSize, chunkcount) - wg := &sync.WaitGroup{} - wg.Add(len(chunks)) for i = 0; i < len(chunks); i++ { - db.Put(context.TODO(), chunks[i]) - chunkkeys[i] = chunks[i].Addr - j := i - go func() { - defer wg.Done() - <-chunks[j].dbStoredC - }() + chunkkeys[i] = chunks[i].Address() + err := db.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatalf("dbStore.Put failed: %v", err) + } } - //testSplit(m, l, 128, chunkkeys, t) - for i = 0; i < len(chunkkeys); i++ { log.Trace(fmt.Sprintf("Chunk array pos %d/%d: '%v'", i, chunkcount, chunkkeys[i])) } - wg.Wait() i = 0 for poc = 0; poc <= 255; poc++ { err := db.SyncIterator(0, uint64(chunkkeys.Len()), uint8(poc), func(k Address, n uint64) bool { @@ -239,7 +215,7 @@ func benchmarkDbStorePut(n int, processors int, chunksize int64, mock bool, b *t if err != nil { b.Fatalf("init dbStore failed: %v", err) } - benchmarkStorePut(db, processors, n, chunksize, b) + benchmarkStorePut(db, n, chunksize, b) } func benchmarkDbStoreGet(n int, processors int, chunksize int64, mock bool, b *testing.B) { @@ -248,7 +224,7 @@ func benchmarkDbStoreGet(n int, processors int, chunksize int64, mock bool, b *t if err != nil { b.Fatalf("init dbStore failed: %v", err) } - benchmarkStoreGet(db, processors, n, chunksize, b) + benchmarkStoreGet(db, n, chunksize, b) } func BenchmarkDbStorePut_1_500(b *testing.B) { @@ -293,35 +269,22 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { ldb.setCapacity(uint64(capacity)) defer cleanup() - chunks := []*Chunk{} - for i := 0; i < n; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) - chunks = append(chunks, c) - log.Trace("generate random chunk", "idx", i, "chunk", c) - } - - for i := 0; i < n; i++ { - go ldb.Put(context.TODO(), chunks[i]) - } - - // wait for all chunks to be stored - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) - for i := 0; i < n; i++ { - ret, err := ldb.Get(context.TODO(), chunks[i].Addr) + for _, ch := range chunks { + ret, err := ldb.Get(context.TODO(), ch.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(ret.SData, chunks[i].SData) { + if !bytes.Equal(ret.Data(), ch.Data()) { t.Fatal("expected to get the same data back, but got smth else") } - - log.Info("got back chunk", "chunk", ret) } if ldb.entryCnt != uint64(n) { @@ -343,30 +306,18 @@ func TestLDBStoreCollectGarbage(t *testing.T) { ldb.setCapacity(uint64(capacity)) defer cleanup() - chunks := []*Chunk{} - for i := 0; i < n; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) - chunks = append(chunks, c) - log.Trace("generate random chunk", "idx", i, "chunk", c) - } - - for i := 0; i < n; i++ { - ldb.Put(context.TODO(), chunks[i]) - } - - // wait for all chunks to be stored - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) } - log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) // wait for garbage collection to kick in on the responsible actor time.Sleep(1 * time.Second) var missing int - for i := 0; i < n; i++ { - ret, err := ldb.Get(context.TODO(), chunks[i].Addr) + for _, ch := range chunks { + ret, err := ldb.Get(context.Background(), ch.Address()) if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { missing++ continue @@ -375,7 +326,7 @@ func TestLDBStoreCollectGarbage(t *testing.T) { t.Fatal(err) } - if !bytes.Equal(ret.SData, chunks[i].SData) { + if !bytes.Equal(ret.Data(), ch.Data()) { t.Fatal("expected to get the same data back, but got smth else") } @@ -396,38 +347,27 @@ func TestLDBStoreAddRemove(t *testing.T) { defer cleanup() n := 100 - - chunks := []*Chunk{} - for i := 0; i < n; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) - chunks = append(chunks, c) - log.Trace("generate random chunk", "idx", i, "chunk", c) - } - - for i := 0; i < n; i++ { - go ldb.Put(context.TODO(), chunks[i]) - } - - // wait for all chunks to be stored before continuing - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatalf(err.Error()) } for i := 0; i < n; i++ { // delete all even index chunks if i%2 == 0 { - ldb.Delete(chunks[i].Addr) + ldb.Delete(chunks[i].Address()) } } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) for i := 0; i < n; i++ { - ret, err := ldb.Get(context.TODO(), chunks[i].Addr) + ret, err := ldb.Get(nil, chunks[i].Address()) if i%2 == 0 { // expect even chunks to be missing - if err == nil || ret != nil { + if err == nil { + // if err != ErrChunkNotFound { t.Fatal("expected chunk to be missing, but got no error") } } else { @@ -436,7 +376,7 @@ func TestLDBStoreAddRemove(t *testing.T) { t.Fatalf("expected no error, but got %s", err) } - if !bytes.Equal(ret.SData, chunks[i].SData) { + if !bytes.Equal(ret.Data(), chunks[i].Data()) { t.Fatal("expected to get the same data back, but got smth else") } } @@ -446,15 +386,16 @@ func TestLDBStoreAddRemove(t *testing.T) { // TestLDBStoreRemoveThenCollectGarbage tests that we can delete chunks and that we can trigger garbage collection func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { capacity := 11 + surplus := 4 ldb, cleanup := newLDBStore(t) ldb.setCapacity(uint64(capacity)) - n := 11 + n := capacity - chunks := []*Chunk{} - for i := 0; i < capacity; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) + chunks := []Chunk{} + for i := 0; i < n+surplus; i++ { + c := GenerateRandomChunk(ch.DefaultSize) chunks = append(chunks, c) log.Trace("generate random chunk", "idx", i, "chunk", c) } @@ -463,53 +404,54 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { ldb.Put(context.TODO(), chunks[i]) } - // wait for all chunks to be stored before continuing - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC - } - // delete all chunks for i := 0; i < n; i++ { - ldb.Delete(chunks[i].Addr) + ldb.Delete(chunks[i].Address()) } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + if ldb.entryCnt != 0 { + t.Fatalf("ldb.entrCnt expected 0 got %v", ldb.entryCnt) + } + + expAccessCnt := uint64(n * 2) + if ldb.accessCnt != expAccessCnt { + t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.entryCnt) + } + cleanup() ldb, cleanup = newLDBStore(t) capacity = 10 ldb.setCapacity(uint64(capacity)) + defer cleanup() - n = 11 + n = capacity + surplus for i := 0; i < n; i++ { ldb.Put(context.TODO(), chunks[i]) } - // wait for all chunks to be stored before continuing - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC - } - // wait for garbage collection time.Sleep(1 * time.Second) - // expect for first chunk to be missing, because it has the smallest access value - idx := 0 - ret, err := ldb.Get(context.TODO(), chunks[idx].Addr) - if err == nil || ret != nil { - t.Fatal("expected first chunk to be missing, but got no error") - } - - // expect for last chunk to be present, as it has the largest access value - idx = 9 - ret, err = ldb.Get(context.TODO(), chunks[idx].Addr) - if err != nil { - t.Fatalf("expected no error, but got %s", err) + // expect first surplus chunks to be missing, because they have the smallest access value + for i := 0; i < surplus; i++ { + _, err := ldb.Get(context.TODO(), chunks[i].Address()) + if err == nil { + t.Fatal("expected surplus chunk to be missing, but got no error") + } } - if !bytes.Equal(ret.SData, chunks[idx].SData) { - t.Fatal("expected to get the same data back, but got smth else") + // expect last chunks to be present, as they have the largest access value + for i := surplus; i < surplus+capacity; i++ { + ret, err := ldb.Get(context.TODO(), chunks[i].Address()) + if err != nil { + t.Fatalf("chunk %v: expected no error, but got %s", i, err) + } + if !bytes.Equal(ret.Data(), chunks[i].Data()) { + t.Fatal("expected to get the same data back, but got smth else") + } } } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 9e3474979..04701ee69 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -18,8 +18,6 @@ package storage import ( "context" - "encoding/binary" - "fmt" "path/filepath" "sync" @@ -97,123 +95,89 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { // when the chunk is stored in memstore. // After the LDBStore.Put, it is ensured that the MemStore // contains the chunk with the same data, but nil ReqC channel. -func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) { +func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error { valid := true // ls.Validators contains a list of one validator per chunk type. // if one validator succeeds, then the chunk is valid for _, v := range ls.Validators { - if valid = v.Validate(chunk.Addr, chunk.SData); valid { + if valid = v.Validate(chunk.Address(), chunk.Data()); valid { break } } if !valid { - log.Trace("invalid chunk", "addr", chunk.Addr, "len", len(chunk.SData)) - chunk.SetErrored(ErrChunkInvalid) - chunk.markAsStored() - return + return ErrChunkInvalid } - log.Trace("localstore.put", "addr", chunk.Addr) - + log.Trace("localstore.put", "key", chunk.Address()) ls.mu.Lock() defer ls.mu.Unlock() - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - - memChunk, err := ls.memStore.Get(ctx, chunk.Addr) - switch err { - case nil: - if memChunk.ReqC == nil { - chunk.markAsStored() - return - } - case ErrChunkNotFound: - default: - chunk.SetErrored(err) - return + _, err := ls.memStore.Get(ctx, chunk.Address()) + if err == nil { + return nil } - - ls.DbStore.Put(ctx, chunk) - - // chunk is no longer a request, but a chunk with data, so replace it in memStore - newc := NewChunk(chunk.Addr, nil) - newc.SData = chunk.SData - newc.Size = chunk.Size - newc.dbStoredC = chunk.dbStoredC - - ls.memStore.Put(ctx, newc) - - if memChunk != nil && memChunk.ReqC != nil { - close(memChunk.ReqC) + if err != nil && err != ErrChunkNotFound { + return err } + ls.memStore.Put(ctx, chunk) + err = ls.DbStore.Put(ctx, chunk) + return err } // Get(chunk *Chunk) looks up a chunk in the local stores // This method is blocking until the chunk is retrieved // so additional timeout may be needed to wrap this call if // ChunkStores are remote and can have long latency -func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) { ls.mu.Lock() defer ls.mu.Unlock() return ls.get(ctx, addr) } -func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) { chunk, err = ls.memStore.Get(ctx, addr) + + if err != nil && err != ErrChunkNotFound { + metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) + return nil, err + } + if err == nil { - if chunk.ReqC != nil { - select { - case <-chunk.ReqC: - default: - metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1) - return chunk, ErrFetching - } - } metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) - return + return chunk, nil } + metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) chunk, err = ls.DbStore.Get(ctx, addr) if err != nil { metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) - return + return nil, err } - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) + ls.memStore.Put(ctx, chunk) - return + return chunk, nil } -// retrieve logic common for local and network chunk retrieval requests -func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1) - +func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error { ls.mu.Lock() defer ls.mu.Unlock() - var err error - chunk, err = ls.get(ctx, addr) - if err == nil && chunk.GetErrored() == nil { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr)) - return chunk, false + _, err := ls.get(ctx, addr) + if err == nil { + return nil } - if err == ErrFetching && chunk.GetErrored() == nil { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC)) - return chunk, false + return func(context.Context) error { + return err } - // no data and no request status - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr)) - chunk = NewChunk(addr, make(chan bool)) - ls.memStore.Put(ctx, chunk) - return chunk, true } -// RequestsCacheLen returns the current number of outgoing requests stored in the cache -func (ls *LocalStore) RequestsCacheLen() int { - return ls.memStore.requests.Len() +func (ls *LocalStore) BinIndex(po uint8) uint64 { + return ls.DbStore.BinIndex(po) +} + +func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { + return ls.DbStore.SyncIterator(from, to, po, f) } // Close the local store diff --git a/swarm/storage/localstore_test.go b/swarm/storage/localstore_test.go index ae62218fe..814d270d3 100644 --- a/swarm/storage/localstore_test.go +++ b/swarm/storage/localstore_test.go @@ -17,11 +17,12 @@ package storage import ( + "context" "io/ioutil" "os" "testing" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" ) var ( @@ -50,29 +51,29 @@ func TestValidator(t *testing.T) { chunks := GenerateRandomChunks(259, 2) goodChunk := chunks[0] badChunk := chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs := putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk in spite of no validation, but got: %s", err) } - if err := badChunk.GetErrored(); err != nil { + if errs[1] != nil { t.Fatalf("expected no error on bad content address chunk in spite of no validation, but got: %s", err) } // add content address validator and check puts // bad should fail, good should pass store.Validators = append(store.Validators, NewContentAddressValidator(hashfunc)) - chunks = GenerateRandomChunks(chunk.DefaultSize, 2) + chunks = GenerateRandomChunks(ch.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs = putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err) } - if err := badChunk.GetErrored(); err == nil { + if errs[1] == nil { t.Fatal("expected error on bad content address chunk with content address validator only, but got nil") } @@ -81,16 +82,16 @@ func TestValidator(t *testing.T) { var negV boolTestValidator store.Validators = append(store.Validators, negV) - chunks = GenerateRandomChunks(chunk.DefaultSize, 2) + chunks = GenerateRandomChunks(ch.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs = putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err) } - if err := badChunk.GetErrored(); err == nil { + if errs[1] == nil { t.Fatal("expected error on bad content address chunk with content address validator only, but got nil") } @@ -99,18 +100,19 @@ func TestValidator(t *testing.T) { var posV boolTestValidator = true store.Validators = append(store.Validators, posV) - chunks = GenerateRandomChunks(chunk.DefaultSize, 2) + chunks = GenerateRandomChunks(ch.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs = putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err) } - if err := badChunk.GetErrored(); err != nil { - t.Fatalf("expected no error on bad content address chunk with content address validator only, but got: %s", err) + if errs[1] != nil { + t.Fatalf("expected no error on bad content address chunk in spite of no validation, but got: %s", err) } + } type boolTestValidator bool @@ -118,3 +120,27 @@ type boolTestValidator bool func (self boolTestValidator) Validate(addr Address, data []byte) bool { return bool(self) } + +// putChunks adds chunks to localstore +// It waits for receive on the stored channel +// It logs but does not fail on delivery error +func putChunks(store *LocalStore, chunks ...Chunk) []error { + i := 0 + f := func(n int64) Chunk { + chunk := chunks[i] + i++ + return chunk + } + _, errs := put(store, len(chunks), f) + return errs +} + +func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs []error) { + for i := int64(0); i < int64(n); i++ { + chunk := f(ch.DefaultSize) + err := store.Put(context.TODO(), chunk) + errs = append(errs, err) + hs = append(hs, chunk.Address()) + } + return hs, errs +} diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 55cfcbfea..36b1e00d9 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -20,24 +20,17 @@ package storage import ( "context" - "sync" lru "github.com/hashicorp/golang-lru" ) type MemStore struct { cache *lru.Cache - requests *lru.Cache - mu sync.RWMutex disabled bool } -//NewMemStore is instantiating a MemStore cache. We are keeping a record of all outgoing requests for chunks, that -//should later be delivered by peer nodes, in the `requests` LRU cache. We are also keeping all frequently requested +//NewMemStore is instantiating a MemStore cache keeping all frequently requested //chunks in the `cache` LRU cache. -// -//`requests` LRU cache capacity should ideally never be reached, this is why for the time being it should be initialised -//with the same value as the LDBStore capacity. func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { if params.CacheCapacity == 0 { return &MemStore{ @@ -45,102 +38,48 @@ func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { } } - onEvicted := func(key interface{}, value interface{}) { - v := value.(*Chunk) - <-v.dbStoredC - } - c, err := lru.NewWithEvict(int(params.CacheCapacity), onEvicted) - if err != nil { - panic(err) - } - - requestEvicted := func(key interface{}, value interface{}) { - // temporary remove of the error log, until we figure out the problem, as it is too spamy - //log.Error("evict called on outgoing request") - } - r, err := lru.NewWithEvict(int(params.ChunkRequestsCacheCapacity), requestEvicted) + c, err := lru.New(int(params.CacheCapacity)) if err != nil { panic(err) } return &MemStore{ - cache: c, - requests: r, + cache: c, } } -func (m *MemStore) Get(ctx context.Context, addr Address) (*Chunk, error) { +func (m *MemStore) Get(_ context.Context, addr Address) (Chunk, error) { if m.disabled { return nil, ErrChunkNotFound } - m.mu.RLock() - defer m.mu.RUnlock() - - r, ok := m.requests.Get(string(addr)) - // it is a request - if ok { - return r.(*Chunk), nil - } - - // it is not a request c, ok := m.cache.Get(string(addr)) if !ok { return nil, ErrChunkNotFound } - return c.(*Chunk), nil + return c.(*chunk), nil } -func (m *MemStore) Put(ctx context.Context, c *Chunk) { +func (m *MemStore) Put(_ context.Context, c Chunk) error { if m.disabled { - return + return nil } - m.mu.Lock() - defer m.mu.Unlock() - - // it is a request - if c.ReqC != nil { - select { - case <-c.ReqC: - if c.GetErrored() != nil { - m.requests.Remove(string(c.Addr)) - return - } - m.cache.Add(string(c.Addr), c) - m.requests.Remove(string(c.Addr)) - default: - m.requests.Add(string(c.Addr), c) - } - return - } - - // it is not a request - m.cache.Add(string(c.Addr), c) - m.requests.Remove(string(c.Addr)) + m.cache.Add(string(c.Address()), c) + return nil } func (m *MemStore) setCapacity(n int) { if n <= 0 { m.disabled = true } else { - onEvicted := func(key interface{}, value interface{}) { - v := value.(*Chunk) - <-v.dbStoredC - } - c, err := lru.NewWithEvict(n, onEvicted) - if err != nil { - panic(err) - } - - r, err := lru.New(defaultChunkRequestsCacheCapacity) + c, err := lru.New(n) if err != nil { panic(err) } - m = &MemStore{ - cache: c, - requests: r, + *m = MemStore{ + cache: c, } } } diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go index 2c1b0e89e..6b370d2b4 100644 --- a/swarm/storage/memstore_test.go +++ b/swarm/storage/memstore_test.go @@ -18,11 +18,6 @@ package storage import ( "context" - "crypto/rand" - "encoding/binary" - "io/ioutil" - "os" - "sync" "testing" "github.com/ethereum/go-ethereum/swarm/log" @@ -33,40 +28,32 @@ func newTestMemStore() *MemStore { return NewMemStore(storeparams, nil) } -func testMemStoreRandom(n int, processors int, chunksize int64, t *testing.T) { +func testMemStoreRandom(n int, chunksize int64, t *testing.T) { m := newTestMemStore() defer m.Close() - testStoreRandom(m, processors, n, chunksize, t) + testStoreRandom(m, n, chunksize, t) } -func testMemStoreCorrect(n int, processors int, chunksize int64, t *testing.T) { +func testMemStoreCorrect(n int, chunksize int64, t *testing.T) { m := newTestMemStore() defer m.Close() - testStoreCorrect(m, processors, n, chunksize, t) + testStoreCorrect(m, n, chunksize, t) } func TestMemStoreRandom_1(t *testing.T) { - testMemStoreRandom(1, 1, 0, t) + testMemStoreRandom(1, 0, t) } func TestMemStoreCorrect_1(t *testing.T) { - testMemStoreCorrect(1, 1, 4104, t) + testMemStoreCorrect(1, 4104, t) } -func TestMemStoreRandom_1_1k(t *testing.T) { - testMemStoreRandom(1, 1000, 0, t) +func TestMemStoreRandom_1k(t *testing.T) { + testMemStoreRandom(1000, 0, t) } -func TestMemStoreCorrect_1_1k(t *testing.T) { - testMemStoreCorrect(1, 100, 4096, t) -} - -func TestMemStoreRandom_8_1k(t *testing.T) { - testMemStoreRandom(8, 1000, 0, t) -} - -func TestMemStoreCorrect_8_1k(t *testing.T) { - testMemStoreCorrect(8, 1000, 4096, t) +func TestMemStoreCorrect_1k(t *testing.T) { + testMemStoreCorrect(100, 4096, t) } func TestMemStoreNotFound(t *testing.T) { @@ -82,13 +69,13 @@ func TestMemStoreNotFound(t *testing.T) { func benchmarkMemStorePut(n int, processors int, chunksize int64, b *testing.B) { m := newTestMemStore() defer m.Close() - benchmarkStorePut(m, processors, n, chunksize, b) + benchmarkStorePut(m, n, chunksize, b) } func benchmarkMemStoreGet(n int, processors int, chunksize int64, b *testing.B) { m := newTestMemStore() defer m.Close() - benchmarkStoreGet(m, processors, n, chunksize, b) + benchmarkStoreGet(m, n, chunksize, b) } func BenchmarkMemStorePut_1_500(b *testing.B) { @@ -107,104 +94,70 @@ func BenchmarkMemStoreGet_8_500(b *testing.B) { benchmarkMemStoreGet(500, 8, 4096, b) } -func newLDBStore(t *testing.T) (*LDBStore, func()) { - dir, err := ioutil.TempDir("", "bzz-storage-test") - if err != nil { - t.Fatal(err) - } - log.Trace("memstore.tempdir", "dir", dir) - - ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir) - db, err := NewLDBStore(ldbparams) - if err != nil { - t.Fatal(err) - } - - cleanup := func() { - db.Close() - err := os.RemoveAll(dir) - if err != nil { - t.Fatal(err) - } - } - - return db, cleanup -} - func TestMemStoreAndLDBStore(t *testing.T) { ldb, cleanup := newLDBStore(t) ldb.setCapacity(4000) defer cleanup() cacheCap := 200 - requestsCap := 200 - memStore := NewMemStore(NewStoreParams(4000, 200, 200, nil, nil), nil) + memStore := NewMemStore(NewStoreParams(4000, 200, nil, nil), nil) tests := []struct { - n int // number of chunks to push to memStore - chunkSize uint64 // size of chunk (by default in Swarm - 4096) - request bool // whether or not to set the ReqC channel on the random chunks + n int // number of chunks to push to memStore + chunkSize int64 // size of chunk (by default in Swarm - 4096) }{ { n: 1, chunkSize: 4096, - request: false, }, { n: 201, chunkSize: 4096, - request: false, }, { n: 501, chunkSize: 4096, - request: false, }, { n: 3100, chunkSize: 4096, - request: false, }, { n: 100, chunkSize: 4096, - request: true, }, } for i, tt := range tests { log.Info("running test", "idx", i, "tt", tt) - var chunks []*Chunk + var chunks []Chunk for i := 0; i < tt.n; i++ { - var c *Chunk - if tt.request { - c = NewRandomRequestChunk(tt.chunkSize) - } else { - c = NewRandomChunk(tt.chunkSize) - } - + c := GenerateRandomChunk(tt.chunkSize) chunks = append(chunks, c) } for i := 0; i < tt.n; i++ { - go ldb.Put(context.TODO(), chunks[i]) - memStore.Put(context.TODO(), chunks[i]) + err := ldb.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatal(err) + } + err = memStore.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatal(err) + } if got := memStore.cache.Len(); got > cacheCap { t.Fatalf("expected to get cache capacity less than %v, but got %v", cacheCap, got) } - if got := memStore.requests.Len(); got > requestsCap { - t.Fatalf("expected to get requests capacity less than %v, but got %v", requestsCap, got) - } } for i := 0; i < tt.n; i++ { - _, err := memStore.Get(context.TODO(), chunks[i].Addr) + _, err := memStore.Get(context.TODO(), chunks[i].Address()) if err != nil { if err == ErrChunkNotFound { - _, err := ldb.Get(context.TODO(), chunks[i].Addr) + _, err := ldb.Get(context.TODO(), chunks[i].Address()) if err != nil { t.Fatalf("couldn't get chunk %v from ldb, got error: %v", i, err) } @@ -213,37 +166,5 @@ func TestMemStoreAndLDBStore(t *testing.T) { } } } - - // wait for all chunks to be stored before ending the test are cleaning up - for i := 0; i < tt.n; i++ { - <-chunks[i].dbStoredC - } - } -} - -func NewRandomChunk(chunkSize uint64) *Chunk { - c := &Chunk{ - Addr: make([]byte, 32), - ReqC: nil, - SData: make([]byte, chunkSize+8), // SData should be chunkSize + 8 bytes reserved for length - dbStoredC: make(chan bool), - dbStoredMu: &sync.Mutex{}, } - - rand.Read(c.SData) - - binary.LittleEndian.PutUint64(c.SData[:8], chunkSize) - - hasher := MakeHashFunc(SHA3Hash)() - hasher.Write(c.SData) - copy(c.Addr, hasher.Sum(nil)) - - return c -} - -func NewRandomRequestChunk(chunkSize uint64) *Chunk { - c := NewRandomChunk(chunkSize) - c.ReqC = make(chan bool) - - return c } diff --git a/swarm/storage/mru/handler.go b/swarm/storage/mru/handler.go index 57561fd14..18c667f14 100644 --- a/swarm/storage/mru/handler.go +++ b/swarm/storage/mru/handler.go @@ -187,12 +187,12 @@ func (h *Handler) New(ctx context.Context, request *Request) error { return err } if request.metaHash != nil && !bytes.Equal(request.metaHash, metaHash) || - request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Addr) { + request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Address()) { return NewError(ErrInvalidValue, "metaHash in UpdateRequest does not match actual metadata") } request.metaHash = metaHash - request.rootAddr = chunk.Addr + request.rootAddr = chunk.Address() h.chunkStore.Put(ctx, chunk) log.Debug("new resource", "name", request.metadata.Name, "startTime", request.metadata.StartTime, "frequency", request.metadata.Frequency, "owner", request.metadata.Owner) @@ -202,14 +202,14 @@ func (h *Handler) New(ctx context.Context, request *Request) error { resourceUpdate: resourceUpdate{ updateHeader: updateHeader{ UpdateLookup: UpdateLookup{ - rootAddr: chunk.Addr, + rootAddr: chunk.Address(), }, }, }, ResourceMetadata: request.metadata, updated: time.Now(), } - h.set(chunk.Addr, rsrc) + h.set(chunk.Address(), rsrc) return nil } @@ -348,7 +348,11 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error return nil, NewErrorf(ErrPeriodDepth, "Lookup exceeded max period hops (%d)", lp.Limit) } updateAddr := lp.UpdateAddr() - chunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) + defer cancel() + + chunk, err := h.chunkStore.Get(ctx, updateAddr) if err == nil { if specificversion { return h.updateIndex(rsrc, chunk) @@ -358,7 +362,11 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error for { newversion := lp.version + 1 updateAddr := lp.UpdateAddr() - newchunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) + defer cancel() + + newchunk, err := h.chunkStore.Get(ctx, updateAddr) if err != nil { return h.updateIndex(rsrc, chunk) } @@ -380,7 +388,10 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error // Load retrieves the Mutable Resource metadata chunk stored at rootAddr // Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource, error) { - chunk, err := h.chunkStore.GetWithTimeout(ctx, rootAddr, defaultRetrieveTimeout) + //TODO: Maybe add timeout to context, defaultRetrieveTimeout? + ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) + defer cancel() + chunk, err := h.chunkStore.Get(ctx, rootAddr) if err != nil { return nil, NewError(ErrNotFound, err.Error()) } @@ -388,11 +399,11 @@ func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource // create the index entry rsrc := &resource{} - if err := rsrc.ResourceMetadata.binaryGet(chunk.SData); err != nil { // Will fail if this is not really a metadata chunk + if err := rsrc.ResourceMetadata.binaryGet(chunk.Data()); err != nil { // Will fail if this is not really a metadata chunk return nil, err } - rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.SData) + rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.Data()) if !bytes.Equal(rsrc.rootAddr, rootAddr) { return nil, NewError(ErrCorruptData, "Corrupt metadata chunk") } @@ -402,17 +413,17 @@ func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource } // update mutable resource index map with specified content -func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, error) { +func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, error) { // retrieve metadata from chunk data and check that it matches this mutable resource var r SignedResourceUpdate - if err := r.fromChunk(chunk.Addr, chunk.SData); err != nil { + if err := r.fromChunk(chunk.Address(), chunk.Data()); err != nil { return nil, err } - log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Addr, "period", r.period, "version", r.version) + log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Address(), "period", r.period, "version", r.version) // update our rsrcs entry map - rsrc.lastKey = chunk.Addr + rsrc.lastKey = chunk.Address() rsrc.period = r.period rsrc.version = r.version rsrc.updated = time.Now() @@ -420,8 +431,8 @@ func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, rsrc.multihash = r.multihash copy(rsrc.data, r.data) rsrc.Reader = bytes.NewReader(rsrc.data) - log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Addr, "period", rsrc.period, "version", rsrc.version) - h.set(chunk.Addr, rsrc) + log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Address(), "period", rsrc.period, "version", rsrc.version) + h.set(chunk.Address(), rsrc) return rsrc, nil } @@ -457,7 +468,7 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // send the chunk h.chunkStore.Put(ctx, chunk) - log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.SData, "multihash", r.multihash) + log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.Data(), "multihash", r.multihash) // update our resources map entry if the new update is older than the one we have, if we have it. if rsrc != nil && (r.period > rsrc.period || (rsrc.period == r.period && r.version > rsrc.version)) { @@ -475,7 +486,7 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // Retrieves the resource index value for the given nameHash func (h *Handler) get(rootAddr storage.Address) *resource { - if len(rootAddr) < storage.KeyLength { + if len(rootAddr) < storage.AddressLength { log.Warn("Handler.get with invalid rootAddr") return nil } @@ -488,7 +499,7 @@ func (h *Handler) get(rootAddr storage.Address) *resource { // Sets the resource index value for the given nameHash func (h *Handler) set(rootAddr storage.Address, rsrc *resource) { - if len(rootAddr) < storage.KeyLength { + if len(rootAddr) < storage.AddressLength { log.Warn("Handler.set with invalid rootAddr") return } diff --git a/swarm/storage/mru/lookup.go b/swarm/storage/mru/lookup.go index eb28336e1..b52cd5b4f 100644 --- a/swarm/storage/mru/lookup.go +++ b/swarm/storage/mru/lookup.go @@ -72,7 +72,7 @@ type UpdateLookup struct { // 4 bytes period // 4 bytes version // storage.Keylength for rootAddr -const updateLookupLength = 4 + 4 + storage.KeyLength +const updateLookupLength = 4 + 4 + storage.AddressLength // UpdateAddr calculates the resource update chunk address corresponding to this lookup key func (u *UpdateLookup) UpdateAddr() (updateAddr storage.Address) { @@ -90,7 +90,7 @@ func (u *UpdateLookup) binaryPut(serializedData []byte) error { if len(serializedData) != updateLookupLength { return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize UpdateLookup. Expected %d, got %d", updateLookupLength, len(serializedData)) } - if len(u.rootAddr) != storage.KeyLength { + if len(u.rootAddr) != storage.AddressLength { return NewError(ErrInvalidValue, "UpdateLookup.binaryPut called without rootAddr set") } binary.LittleEndian.PutUint32(serializedData[:4], u.period) @@ -111,7 +111,7 @@ func (u *UpdateLookup) binaryGet(serializedData []byte) error { } u.period = binary.LittleEndian.Uint32(serializedData[:4]) u.version = binary.LittleEndian.Uint32(serializedData[4:8]) - u.rootAddr = storage.Address(make([]byte, storage.KeyLength)) + u.rootAddr = storage.Address(make([]byte, storage.AddressLength)) copy(u.rootAddr[:], serializedData[8:]) return nil } diff --git a/swarm/storage/mru/metadata.go b/swarm/storage/mru/metadata.go index 0ab0ed1d9..509114895 100644 --- a/swarm/storage/mru/metadata.go +++ b/swarm/storage/mru/metadata.go @@ -142,7 +142,7 @@ func (r *ResourceMetadata) serializeAndHash() (rootAddr, metaHash []byte, chunkD } // creates a metadata chunk out of a resourceMetadata structure -func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []byte, err error) { +func (metadata *ResourceMetadata) newChunk() (chunk storage.Chunk, metaHash []byte, err error) { // the metadata chunk contains a timestamp of when the resource starts to be valid // and also how frequently it is expected to be updated // from this we know at what time we should look for updates, and how often @@ -157,9 +157,7 @@ func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []b } // make the chunk and send it to swarm - chunk = storage.NewChunk(rootAddr, nil) - chunk.SData = chunkData - chunk.Size = int64(len(chunkData)) + chunk = storage.NewChunk(rootAddr, chunkData) return chunk, metaHash, nil } diff --git a/swarm/storage/mru/request.go b/swarm/storage/mru/request.go index dd71f855d..af2ccf5c7 100644 --- a/swarm/storage/mru/request.go +++ b/swarm/storage/mru/request.go @@ -182,7 +182,7 @@ func (r *Request) fromJSON(j *updateRequestJSON) error { var declaredRootAddr storage.Address var declaredMetaHash []byte - declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.KeyLength, "rootAddr") + declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.AddressLength, "rootAddr") if err != nil { return err } diff --git a/swarm/storage/mru/resource_test.go b/swarm/storage/mru/resource_test.go index 76d7c58a1..0fb465bb0 100644 --- a/swarm/storage/mru/resource_test.go +++ b/swarm/storage/mru/resource_test.go @@ -87,8 +87,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { resourceUpdate: resourceUpdate{ updateHeader: updateHeader{ UpdateLookup: UpdateLookup{ - - rootAddr: make([]byte, 79), // put the wrong length, should be storage.KeyLength + rootAddr: make([]byte, 79), // put the wrong length, should be storage.AddressLength }, metaHash: nil, multihash: false, @@ -99,8 +98,8 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { if err == nil { t.Fatal("Expected newUpdateChunk to fail when rootAddr or metaHash have the wrong length") } - r.rootAddr = make([]byte, storage.KeyLength) - r.metaHash = make([]byte, storage.KeyLength) + r.rootAddr = make([]byte, storage.AddressLength) + r.metaHash = make([]byte, storage.AddressLength) _, err = r.toChunk() if err == nil { t.Fatal("Expected newUpdateChunk to fail when there is no data") @@ -197,7 +196,7 @@ func TestReverse(t *testing.T) { // check that we can recover the owner account from the update chunk's signature var checkUpdate SignedResourceUpdate - if err := checkUpdate.fromChunk(chunk.Addr, chunk.SData); err != nil { + if err := checkUpdate.fromChunk(chunk.Address(), chunk.Data()); err != nil { t.Fatal(err) } checkdigest, err := checkUpdate.GetDigest() @@ -215,8 +214,8 @@ func TestReverse(t *testing.T) { t.Fatalf("addresses dont match: %x != %x", originaladdress, recoveredaddress) } - if !bytes.Equal(key[:], chunk.Addr[:]) { - t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Addr) + if !bytes.Equal(key[:], chunk.Address()[:]) { + t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Address()) } if period != checkUpdate.period { t.Fatalf("Expected period '%d', was '%d'", period, checkUpdate.period) @@ -270,16 +269,16 @@ func TestResourceHandler(t *testing.T) { t.Fatal(err) } - chunk, err := rh.chunkStore.Get(context.TODO(), storage.Address(request.rootAddr)) + chunk, err := rh.chunkStore.Get(ctx, storage.Address(request.rootAddr)) if err != nil { t.Fatal(err) - } else if len(chunk.SData) < 16 { - t.Fatalf("chunk data must be minimum 16 bytes, is %d", len(chunk.SData)) + } else if len(chunk.Data()) < 16 { + t.Fatalf("chunk data must be minimum 16 bytes, is %d", len(chunk.Data())) } var recoveredMetadata ResourceMetadata - recoveredMetadata.binaryGet(chunk.SData) + recoveredMetadata.binaryGet(chunk.Data()) if err != nil { t.Fatal(err) } @@ -704,7 +703,7 @@ func TestValidator(t *testing.T) { if err != nil { t.Fatal(err) } - if !rh.Validate(chunk.Addr, chunk.SData) { + if !rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator fail on update chunk") } @@ -724,7 +723,7 @@ func TestValidator(t *testing.T) { t.Fatal(err) } - if rh.Validate(chunk.Addr, chunk.SData) { + if rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator did not fail on update chunk with false address") } @@ -742,7 +741,7 @@ func TestValidator(t *testing.T) { t.Fatal(err) } - if !rh.Validate(chunk.Addr, chunk.SData) { + if !rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator fail on metadata chunk") } } @@ -783,8 +782,7 @@ func TestValidatorInStore(t *testing.T) { // create content addressed chunks, one good, one faulty chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2) goodChunk := chunks[0] - badChunk := chunks[1] - badChunk.SData = goodChunk.SData + badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data()) metadata := &ResourceMetadata{ StartTime: startTime, @@ -801,7 +799,7 @@ func TestValidatorInStore(t *testing.T) { updateLookup := UpdateLookup{ period: 42, version: 1, - rootAddr: rootChunk.Addr, + rootAddr: rootChunk.Address(), } updateAddr := updateLookup.UpdateAddr() @@ -826,16 +824,16 @@ func TestValidatorInStore(t *testing.T) { } // put the chunks in the store and check their error status - storage.PutChunks(store, goodChunk) - if goodChunk.GetErrored() == nil { + err = store.Put(context.Background(), goodChunk) + if err == nil { t.Fatal("expected error on good content address chunk with resource validator only, but got nil") } - storage.PutChunks(store, badChunk) - if badChunk.GetErrored() == nil { + err = store.Put(context.Background(), badChunk) + if err == nil { t.Fatal("expected error on bad content address chunk with resource validator only, but got nil") } - storage.PutChunks(store, uglyChunk) - if err := uglyChunk.GetErrored(); err != nil { + err = store.Put(context.Background(), uglyChunk) + if err != nil { t.Fatalf("expected no error on resource update chunk with resource validator only, but got: %s", err) } } @@ -897,7 +895,7 @@ func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { return nil, err } var r SignedResourceUpdate - if err := r.fromChunk(addr, chunk.SData); err != nil { + if err := r.fromChunk(addr, chunk.Data()); err != nil { return nil, err } return r.data, nil diff --git a/swarm/storage/mru/signedupdate.go b/swarm/storage/mru/signedupdate.go index 1c6d02e82..41a5a5e63 100644 --- a/swarm/storage/mru/signedupdate.go +++ b/swarm/storage/mru/signedupdate.go @@ -96,7 +96,7 @@ func (r *SignedResourceUpdate) Sign(signer Signer) error { } // create an update chunk. -func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) { +func (r *SignedResourceUpdate) toChunk() (storage.Chunk, error) { // Check that the update is signed and serialized // For efficiency, data is serialized during signature and cached in @@ -105,14 +105,11 @@ func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) { return nil, NewError(ErrInvalidSignature, "newUpdateChunk called without a valid signature or payload data. Call .Sign() first.") } - chunk := storage.NewChunk(r.updateAddr, nil) resourceUpdateLength := r.resourceUpdate.binaryLength() - chunk.SData = r.binaryData - // signature is the last item in the chunk data - copy(chunk.SData[resourceUpdateLength:], r.signature[:]) + copy(r.binaryData[resourceUpdateLength:], r.signature[:]) - chunk.Size = int64(len(chunk.SData)) + chunk := storage.NewChunk(r.updateAddr, r.binaryData) return chunk, nil } diff --git a/swarm/storage/mru/testutil.go b/swarm/storage/mru/testutil.go index 6efcba9ab..a30baaa1d 100644 --- a/swarm/storage/mru/testutil.go +++ b/swarm/storage/mru/testutil.go @@ -17,8 +17,12 @@ package mru import ( + "context" "fmt" "path/filepath" + "sync" + + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -35,6 +39,17 @@ func (t *TestHandler) Close() { t.chunkStore.Close() } +type mockNetFetcher struct{} + +func (m *mockNetFetcher) Request(ctx context.Context) { +} +func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) { +} + +func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher { + return &mockNetFetcher{} +} + // NewTestHandler creates Handler object to be used for testing purposes. func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) { path := filepath.Join(datadir, testDbDirName) @@ -47,7 +62,11 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) } localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(resourceHashAlgorithm))) localStore.Validators = append(localStore.Validators, rh) - netStore := storage.NewNetStore(localStore, nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, err + } + netStore.NewNetFetcherFunc = newFakeNetFetcher rh.SetStore(netStore) return &TestHandler{rh}, nil } diff --git a/swarm/storage/mru/updateheader.go b/swarm/storage/mru/updateheader.go index 3ac20c189..f0039eaf6 100644 --- a/swarm/storage/mru/updateheader.go +++ b/swarm/storage/mru/updateheader.go @@ -27,7 +27,7 @@ type updateHeader struct { metaHash []byte // SHA3 hash of the metadata chunk (less ownerAddr). Used to prove ownerhsip of the resource. } -const metaHashLength = storage.KeyLength +const metaHashLength = storage.AddressLength // updateLookupLength bytes // 1 byte flags (multihash bool for now) @@ -76,7 +76,7 @@ func (h *updateHeader) binaryGet(serializedData []byte) error { } cursor := updateLookupLength h.metaHash = make([]byte, metaHashLength) - copy(h.metaHash[:storage.KeyLength], serializedData[cursor:cursor+storage.KeyLength]) + copy(h.metaHash[:storage.AddressLength], serializedData[cursor:cursor+storage.AddressLength]) cursor += metaHashLength flags := serializedData[cursor] diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 96a7e51f7..de2d82d2b 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -18,181 +18,275 @@ package storage import ( "context" + "encoding/hex" + "fmt" + "sync" + "sync/atomic" "time" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/spancontext" - opentracing "github.com/opentracing/opentracing-go" + + lru "github.com/hashicorp/golang-lru" ) -var ( - // NetStore.Get timeout for get and get retries - // This is the maximum period that the Get will block. - // If it is reached, Get will return ErrChunkNotFound. - netStoreRetryTimeout = 30 * time.Second - // Minimal period between calling get method on NetStore - // on retry. It protects calling get very frequently if - // it returns ErrChunkNotFound very fast. - netStoreMinRetryDelay = 3 * time.Second - // Timeout interval before retrieval is timed out. - // It is used in NetStore.get on waiting for ReqC to be - // closed on a single retrieve request. - searchTimeout = 10 * time.Second +type ( + NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher ) -// NetStore implements the ChunkStore interface, -// this chunk access layer assumed 2 chunk stores -// local storage eg. LocalStore and network storage eg., NetStore -// access by calling network is blocking with a timeout +type NetFetcher interface { + Request(ctx context.Context) + Offer(ctx context.Context, source *discover.NodeID) +} + +// NetStore is an extension of local storage +// it implements the ChunkStore interface +// on request it initiates remote cloud retrieval using a fetcher +// fetchers are unique to a chunk and are stored in fetchers LRU memory cache +// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address type NetStore struct { - localStore *LocalStore - retrieve func(ctx context.Context, chunk *Chunk) error + mu sync.Mutex + store SyncChunkStore + fetchers *lru.Cache + NewNetFetcherFunc NewNetFetcherFunc + closeC chan struct{} } -func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore { - return &NetStore{localStore, retrieve} +// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a +// constructor function that can create a fetch function for a specific chunk address. +func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) { + fetchers, err := lru.New(defaultChunkRequestsCacheCapacity) + if err != nil { + return nil, err + } + return &NetStore{ + store: store, + fetchers: fetchers, + NewNetFetcherFunc: nnf, + closeC: make(chan struct{}), + }, nil } -// Get is the entrypoint for local retrieve requests -// waits for response or times out -// -// Get uses get method to retrieve request, but retries if the -// ErrChunkNotFound is returned by get, until the netStoreRetryTimeout -// is reached. -func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { - - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "netstore.get.global") - defer sp.Finish() - - timer := time.NewTimer(netStoreRetryTimeout) - defer timer.Stop() - - // result and resultC provide results from the goroutine - // where NetStore.get is called. - type result struct { - chunk *Chunk - err error +// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in +// the fetchers cache +func (n *NetStore) Put(ctx context.Context, ch Chunk) error { + n.mu.Lock() + defer n.mu.Unlock() + + // put to the chunk to the store, there should be no error + err := n.store.Put(ctx, ch) + if err != nil { + return err } - resultC := make(chan result) - - // quitC ensures that retring goroutine is terminated - // when this function returns. - quitC := make(chan struct{}) - defer close(quitC) - - // do retries in a goroutine so that the timer can - // force this method to return after the netStoreRetryTimeout. - go func() { - // limiter ensures that NetStore.get is not called more frequently - // then netStoreMinRetryDelay. If NetStore.get takes longer - // then netStoreMinRetryDelay, the next retry call will be - // without a delay. - limiter := time.NewTimer(netStoreMinRetryDelay) - defer limiter.Stop() - - for { - chunk, err := ns.get(ctx, addr, 0) - if err != ErrChunkNotFound { - // break retry only if the error is nil - // or other error then ErrChunkNotFound - select { - case <-quitC: - // Maybe NetStore.Get function has returned - // by the timer.C while we were waiting for the - // results. Terminate this goroutine. - case resultC <- result{chunk: chunk, err: err}: - // Send the result to the parrent goroutine. - } - return - - } - select { - case <-quitC: - // NetStore.Get function has returned, possibly - // by the timer.C, which makes this goroutine - // not needed. - return - case <-limiter.C: - } - // Reset the limiter for the next iteration. - limiter.Reset(netStoreMinRetryDelay) - log.Debug("NetStore.Get retry chunk", "key", addr) - } - }() - select { - case r := <-resultC: - return r.chunk, r.err - case <-timer.C: - return nil, ErrChunkNotFound + // if chunk is now put in the store, check if there was an active fetcher and call deliver on it + // (this delivers the chunk to requestors via the fetcher) + if f := n.getFetcher(ch.Address()); f != nil { + f.deliver(ctx, ch) + } + return nil +} + +// Get retrieves the chunk from the NetStore DPA synchronously. +// It calls NetStore.get, and if the chunk is not in local Storage +// it calls fetch with the request, which blocks until the chunk +// arrived or context is done +func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) { + chunk, fetch, err := n.get(rctx, ref) + if err != nil { + return nil, err + } + if chunk != nil { + return chunk, nil } + return fetch(rctx) } -// GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter -func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { - return ns.get(ctx, addr, timeout) +func (n *NetStore) BinIndex(po uint8) uint64 { + return n.store.BinIndex(po) } -func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { - if timeout == 0 { - timeout = searchTimeout +func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { + return n.store.Iterator(from, to, po, f) +} + +// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function, +// which returns after the chunk is available or the context is done +func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error { + chunk, fetch, _ := n.get(ctx, ref) + if chunk != nil { + return nil + } + return func(ctx context.Context) error { + _, err := fetch(ctx) + return err } +} - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "netstore.get") - defer sp.Finish() +// Close chunk store +func (n *NetStore) Close() { + close(n.closeC) + n.store.Close() + // TODO: loop through fetchers to cancel them +} - if ns.retrieve == nil { - chunk, err = ns.localStore.Get(ctx, addr) - if err == nil { - return chunk, nil - } - if err != ErrFetching { - return nil, err - } - } else { - var created bool - chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr) +// get attempts at retrieving the chunk from LocalStore +// If it is not found then using getOrCreateFetcher: +// 1. Either there is already a fetcher to retrieve it +// 2. A new fetcher is created and saved in the fetchers cache +// From here on, all Get will hit on this fetcher until the chunk is delivered +// or all fetcher contexts are done. +// It returns a chunk, a fetcher function and an error +// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk. +func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) { + n.mu.Lock() + defer n.mu.Unlock() - if chunk.ReqC == nil { - return chunk, nil + chunk, err := n.store.Get(ctx, ref) + if err != nil { + if err != ErrChunkNotFound { + log.Debug("Received error from LocalStore other than ErrNotFound", "err", err) } + // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one + // if it doesn't exist yet + f := n.getOrCreateFetcher(ref) + // If the caller needs the chunk, it has to use the returned fetch function to get it + return nil, f.Fetch, nil + } - if created { - err := ns.retrieve(ctx, chunk) - if err != nil { - // mark chunk request as failed so that we can retry it later - chunk.SetErrored(ErrChunkUnavailable) - return nil, err - } - } + return chunk, nil, nil +} + +// getOrCreateFetcher attempts at retrieving an existing fetchers +// if none exists, creates one and saves it in the fetchers cache +// caller must hold the lock +func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { + if f := n.getFetcher(ref); f != nil { + return f } - t := time.NewTicker(timeout) - defer t.Stop() + // no fetcher for the given address, we have to create a new one + key := hex.EncodeToString(ref) + // create the context during which fetching is kept alive + ctx, cancel := context.WithCancel(context.Background()) + // destroy is called when all requests finish + destroy := func() { + // remove fetcher from fetchers + n.fetchers.Remove(key) + // stop fetcher by cancelling context called when + // all requests cancelled/timedout or chunk is delivered + cancel() + } + // peers always stores all the peers which have an active request for the chunk. It is shared + // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because + // the peers which requested the chunk should not be requested to deliver it. + peers := &sync.Map{} - select { - case <-t.C: - // mark chunk request as failed so that we can retry - chunk.SetErrored(ErrChunkNotFound) - return nil, ErrChunkNotFound - case <-chunk.ReqC: + fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC) + n.fetchers.Add(key, fetcher) + + return fetcher +} + +// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists, +// otherwise it returns nil +func (n *NetStore) getFetcher(ref Address) *fetcher { + key := hex.EncodeToString(ref) + f, ok := n.fetchers.Get(key) + if ok { + return f.(*fetcher) } - chunk.SetErrored(nil) - return chunk, nil + return nil } -// Put is the entrypoint for local store requests coming from storeLoop -func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) { - ns.localStore.Put(ctx, chunk) +// RequestsCacheLen returns the current number of outgoing requests stored in the cache +func (n *NetStore) RequestsCacheLen() int { + return n.fetchers.Len() } -// Close chunk store -func (ns *NetStore) Close() { - ns.localStore.Close() +// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the +// peers who have requested it and did not receive it yet. +type fetcher struct { + addr Address // address of chunk + chunk Chunk // fetcher can set the chunk on the fetcher + deliveredC chan struct{} // chan signalling chunk delivery to requests + cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore) + netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context + cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called + peers *sync.Map // the peers which asked for the chunk + requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called + deliverOnce *sync.Once // guarantees that we only close deliveredC once +} + +// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually +// does the retrieval (in non-test cases this is coming from the network package). cancel function is +// called either +// 1. when the chunk has been fetched all peers have been either notified or their context has been done +// 2. the chunk has not been fetched but all context from all the requests has been done +// The peers map stores all the peers which have requested chunk. +func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { + cancelOnce := &sync.Once{} // cancel should only be called once + return &fetcher{ + addr: addr, + deliveredC: make(chan struct{}), + deliverOnce: &sync.Once{}, + cancelledC: closeC, + netFetcher: nf, + cancel: func() { + cancelOnce.Do(func() { + cancel() + }) + }, + peers: peers, + } +} + +// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available +// locally. +func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { + atomic.AddInt32(&f.requestCnt, 1) + defer func() { + // if all the requests are done the fetcher can be cancelled + if atomic.AddInt32(&f.requestCnt, -1) == 0 { + f.cancel() + } + }() + + // The peer asking for the chunk. Store in the shared peers map, but delete after the request + // has been delivered + peer := rctx.Value("peer") + if peer != nil { + f.peers.Store(peer, time.Now()) + defer f.peers.Delete(peer) + } + + // If there is a source in the context then it is an offer, otherwise a request + sourceIF := rctx.Value("source") + if sourceIF != nil { + var source *discover.NodeID + id := discover.MustHexID(sourceIF.(string)) + source = &id + f.netFetcher.Offer(rctx, source) + } else { + f.netFetcher.Request(rctx) + } + + // wait until either the chunk is delivered or the context is done + select { + case <-rctx.Done(): + return nil, rctx.Err() + case <-f.deliveredC: + return f.chunk, nil + case <-f.cancelledC: + return nil, fmt.Errorf("fetcher cancelled") + } +} + +// deliver is called by NetStore.Put to notify all pending requests +func (f *fetcher) deliver(ctx context.Context, ch Chunk) { + f.deliverOnce.Do(func() { + f.chunk = ch + // closing the deliveredC channel will terminate ongoing requests + close(f.deliveredC) + }) } diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 7babbf5e0..f08968f0e 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -17,107 +17,622 @@ package storage import ( + "bytes" "context" - "encoding/hex" - "errors" + "crypto/rand" "io/ioutil" + "sync" "testing" "time" - "github.com/ethereum/go-ethereum/swarm/network" -) + "github.com/ethereum/go-ethereum/p2p/discover" + ch "github.com/ethereum/go-ethereum/swarm/chunk" -var ( - errUnknown = errors.New("unknown error") + "github.com/ethereum/go-ethereum/common" ) -type mockRetrieve struct { - requests map[string]int +var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + +type mockNetFetcher struct { + peers *sync.Map + sources []*discover.NodeID + peersPerRequest [][]Address + requestCalled bool + offerCalled bool + quit <-chan struct{} + ctx context.Context +} + +func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) { + m.offerCalled = true + m.sources = append(m.sources, source) +} + +func (m *mockNetFetcher) Request(ctx context.Context) { + m.requestCalled = true + var peers []Address + m.peers.Range(func(key interface{}, _ interface{}) bool { + peers = append(peers, common.FromHex(key.(string))) + return true + }) + m.peersPerRequest = append(m.peersPerRequest, peers) +} + +type mockNetFetchFuncFactory struct { + fetcher *mockNetFetcher +} + +func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher { + m.fetcher.peers = peers + m.fetcher.quit = ctx.Done() + m.fetcher.ctx = ctx + return m.fetcher +} + +func mustNewNetStore(t *testing.T) *NetStore { + netStore, _ := mustNewNetStoreWithFetcher(t) + return netStore } -func NewMockRetrieve() *mockRetrieve { - return &mockRetrieve{requests: make(map[string]int)} +func mustNewNetStoreWithFetcher(t *testing.T) (*NetStore, *mockNetFetcher) { + t.Helper() + + datadir, err := ioutil.TempDir("", "netstore") + if err != nil { + t.Fatal(err) + } + naddr := make([]byte, 32) + params := NewDefaultLocalStoreParams() + params.Init(datadir) + params.BaseKey = naddr + localStore, err := NewTestLocalStoreForAddr(params) + if err != nil { + t.Fatal(err) + } + + fetcher := &mockNetFetcher{} + mockNetFetchFuncFactory := &mockNetFetchFuncFactory{ + fetcher: fetcher, + } + netStore, err := NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher) + if err != nil { + t.Fatal(err) + } + return netStore, fetcher } -func newDummyChunk(addr Address) *Chunk { - chunk := NewChunk(addr, make(chan bool)) - chunk.SData = []byte{3, 4, 5} - chunk.Size = 3 +// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put. +// After the Put there should no active fetchers, and the context created for the fetcher should +// be cancelled. +func TestNetStoreGetAndPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + }() + + close(c) + recChunk, err := netStore.Get(ctx, chunk.Address()) // this is blocked until the Put above is done + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // the chunk is already available locally, so there should be no active fetchers waiting for it + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // A fetcher was created when the Get was called (and the chunk was not available). The chunk + // was delivered with the Put call, so the fetcher should be cancelled now. + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } - return chunk } -func (m *mockRetrieve) retrieve(ctx context.Context, chunk *Chunk) error { - hkey := hex.EncodeToString(chunk.Addr) - m.requests[hkey] += 1 +// TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get. +// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore, +// there is no need to create fetchers. +func TestNetStoreGetAfterPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) - // on second call return error - if m.requests[hkey] == 2 { - return errUnknown + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // First we Put the chunk, so the chunk will be available locally + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) } - // on third call return data - if m.requests[hkey] == 3 { - *chunk = *newDummyChunk(chunk.Addr) + // Get should retrieve the chunk from LocalStore, without creating fetcher + recChunk, err := netStore.Get(ctx, chunk.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // no fetcher offer or request should be created for a locally available chunk + if fetcher.offerCalled || fetcher.requestCalled { + t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called") + } + // no fetchers should be created for a locally available chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } + +} + +// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout +func TestNetStoreGetTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + }() + + close(c) + // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will + // be a timeout + _, err := netStore.Get(ctx, chunk.Address()) + + // Check if the timeout happened + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // A fetcher was created, check if it has been removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // Check if the fetcher context has been cancelled after the timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks +// the errors +func TestNetStoreGetCancel(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + cancel() + }() + + close(c) + // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery + _, err := netStore.Get(ctx, chunk.Address()) + + // After the context is cancelled above Get should return with an error + if err != context.Canceled { + t.Fatalf("Expected context.Canceled err got %v", err) + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after cancel") + } + + // Check if the fetcher context has been cancelled after the request context cancel + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is +// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher +// for the chunk retrieval +func TestNetStoreMultipleGetAndPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go func() { + // sleep to make sure Put is called after all the Get + time.Sleep(500 * time.Millisecond) + // check if netStore created exactly one fetcher for all Get calls + if netStore.fetchers.Len() != 1 { + t.Fatal("Expected netStore to use one fetcher for all Get calls") + } + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + }() + + // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above. + getWG := sync.WaitGroup{} + for i := 0; i < 4; i++ { + getWG.Add(1) go func() { - time.Sleep(100 * time.Millisecond) - close(chunk.ReqC) + defer getWG.Done() + recChunk, err := netStore.Get(ctx, chunk.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } }() + } + + finishedC := make(chan struct{}) + go func() { + getWG.Wait() + close(finishedC) + }() + + // The Get calls should return after Put, so no timeout expected + select { + case <-finishedC: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout waiting for Get calls to return") + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } - return nil + // A fetcher was created, check if it has been removed after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") } - return nil } -func TestNetstoreFailedRequest(t *testing.T) { - searchTimeout = 300 * time.Millisecond +// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout +func TestNetStoreFetchFuncTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) - // setup - addr := network.RandomAddr() // tested peers peer address + chunk := GenerateRandomChunk(ch.DefaultSize) - // temp datadir - datadir, err := ioutil.TempDir("", "netstore") + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should an active fetcher for the chunk after the FetchFunc call + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // wait function should timeout because we don't deliver the chunk with a Put + err := wait(ctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // the fetcher should be removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // the fetcher context should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk +func TestNetStoreFetchFuncAfterPut(t *testing.T) { + netStore := mustNewNetStore(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // We deliver the created the chunk with a Put + err := netStore.Put(ctx, chunk) if err != nil { - t.Fatal(err) + t.Fatalf("Expected no err got %v", err) } - params := NewDefaultLocalStoreParams() - params.Init(datadir) - params.BaseKey = addr.Over() - localStore, err := NewTestLocalStoreForAddr(params) + + // FetchFunc should return nil, because the chunk is available locally, no need to fetch it + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait != nil { + t.Fatal("Expected wait to be nil") + } + + // No fetchers should be created at all + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } +} + +// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk +func TestNetStoreGetCallsRequest(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + _, err := netStore.Get(ctx, chunk.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadlineExceeded err got %v", err) + } + + // NetStore should call NetFetcher.Request and wait for the chunk + if !fetcher.requestCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } +} + +// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk +// in case of a source peer provided in the context. +func TestNetStoreGetCallsOffer(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + // If a source peer is added to the context, NetStore will handle it as an offer + ctx := context.WithValue(context.Background(), "source", sourcePeerID.String()) + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + chunk, err := netStore.Get(ctx, chunk.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err) + } + + // NetStore should call NetFetcher.Offer with the source peer + if !fetcher.offerCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } + + if len(fetcher.sources) != 1 { + t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources)) + } + + if fetcher.sources[0].String() != sourcePeerID.String() { + t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0]) + } + +} + +// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context. +// There is no Put call, so the Get calls timeout +func TestNetStoreFetcherCountPeers(t *testing.T) { + + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + addr := randomAddr() + peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + errC := make(chan error) + nrGets := 3 + + // Call Get 3 times with a peer in context + for i := 0; i < nrGets; i++ { + peer := peers[i] + go func() { + ctx := context.WithValue(ctx, "peer", peer) + _, err := netStore.Get(ctx, addr) + errC <- err + }() + } + + // All 3 Get calls should timeout + for i := 0; i < nrGets; i++ { + err := <-errC + if err != context.DeadlineExceeded { + t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err) + } + } + + // fetcher should be closed after timeout + select { + case <-fetcher.quit: + case <-time.After(3 * time.Second): + t.Fatalf("mockNetFetcher not closed after timeout") + } + + // All 3 peers should be given to NetFetcher after the 3 Get calls + if len(fetcher.peersPerRequest) != nrGets { + t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest)) + } + + for i, peers := range fetcher.peersPerRequest { + if len(peers) < i+1 { + t.Fatalf("Expected at least %v got %v", i+1, len(peers)) + } + } +} + +// TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times, +// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks +// if the fetcher is closed. +func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // Call wait three times parallelly + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + err := wait(ctx) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + wg.Done() + }() + } + + // sleep a little so the wait functions are called above + time.Sleep(100 * time.Millisecond) + + // there should be still only one fetcher, because all wait calls are for the same chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to have one fetcher for the requested chunk") + } + + // Deliver the chunk with a Put + err := netStore.Put(ctx, chunk) if err != nil { - t.Fatal(err) + t.Fatalf("Expected no err got %v", err) + } + + // wait until all wait calls return (because the chunk is delivered) + wg.Wait() + + // There should be no more fetchers for the delivered chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") } - r := NewMockRetrieve() - netStore := NewNetStore(localStore, r.retrieve) + // The context for the fetcher should be cancelled after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes, +// the only difference is that we don't deilver the chunk, just wait for timeout +func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) - key := Address{} + chunk := GenerateRandomChunk(ch.DefaultSize) - // first call is done by the retry on ErrChunkNotFound, no need to do it here - // _, err = netStore.Get(key) - // if err == nil || err != ErrChunkNotFound { - // t.Fatalf("expected to get ErrChunkNotFound, but got: %s", err) - // } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() - // second call - _, err = netStore.Get(context.TODO(), key) - if got := r.requests[hex.EncodeToString(key)]; got != 2 { - t.Fatalf("expected to have called retrieve two times, but got: %v", got) + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") } - if err != errUnknown { - t.Fatalf("expected to get an unknown error, but got: %s", err) + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") } - // third call - chunk, err := netStore.Get(context.TODO(), key) - if got := r.requests[hex.EncodeToString(key)]; got != 3 { - t.Fatalf("expected to have called retrieve three times, but got: %v", got) + // Call wait three times parallelly + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer rcancel() + err := wait(rctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected err %v got %v", context.DeadlineExceeded, err) + } + }() } - if err != nil || chunk == nil { - t.Fatalf("expected to get a chunk but got: %v, %s", chunk, err) + + // wait until all wait calls timeout + wg.Wait() + + // There should be no more fetchers after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") } - if len(chunk.SData) != 3 { - t.Fatalf("expected to get a chunk with size 3, but got: %v", chunk.SData) + + // The context for the fetcher should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") } } + +func randomAddr() Address { + addr := make([]byte, 32) + rand.Read(addr) + return Address(addr) +} diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 36ff66d04..f74eef06b 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" ) @@ -57,7 +57,7 @@ import ( When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one - tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file. + tree entry is present in certain level. The key of tree entry is given out as the rootAddress of the file. */ @@ -98,15 +98,15 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get } /* - When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes. New chunks to store are store using the putter which the caller provides. */ func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, ch.DefaultSize)).Split(ctx) } func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, ch.DefaultSize)).Append(ctx) } // Entry to create a tree node @@ -153,7 +153,7 @@ type PyramidChunker struct { wg *sync.WaitGroup errC chan error quitC chan bool - rootKey []byte + rootAddress []byte chunkLevel [][]*TreeEntry } @@ -171,14 +171,14 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { pc.wg = &sync.WaitGroup{} pc.errC = make(chan error) pc.quitC = make(chan bool) - pc.rootKey = make([]byte, pc.hashSize) + pc.rootAddress = make([]byte, pc.hashSize) pc.chunkLevel = make([][]*TreeEntry, pc.branches) return } func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader { return &LazyChunkReader{ - key: addr, + addr: addr, depth: depth, chunkSize: pc.chunkSize, branches: pc.branches, @@ -209,7 +209,7 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte log.Debug("pyramid.chunker: Split()") pc.wg.Add(1) - pc.prepareChunks(false) + pc.prepareChunks(ctx, false) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -231,19 +231,21 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte if err != nil { return nil, nil, err } - case <-time.NewTimer(splitTimeout).C: + case <-ctx.Done(): + _ = pc.putter.Wait(ctx) //??? + return nil, nil, ctx.Err() } - return pc.rootKey, pc.putter.Wait, nil + return pc.rootAddress, pc.putter.Wait, nil } func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) { log.Debug("pyramid.chunker: Append()") // Load the right most unfinished tree chunks in every level - pc.loadTree() + pc.loadTree(ctx) pc.wg.Add(1) - pc.prepareChunks(true) + pc.prepareChunks(ctx, true) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -265,11 +267,11 @@ func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(cont case <-time.NewTimer(splitTimeout).C: } - return pc.rootKey, pc.putter.Wait, nil + return pc.rootAddress, pc.putter.Wait, nil } -func (pc *PyramidChunker) processor(id int64) { +func (pc *PyramidChunker) processor(ctx context.Context, id int64) { defer pc.decrementWorkerCount() for { select { @@ -278,19 +280,22 @@ func (pc *PyramidChunker) processor(id int64) { if !ok { return } - pc.processChunk(id, job) + pc.processChunk(ctx, id, job) case <-pc.quitC: return } } } -func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { +func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) { log.Debug("pyramid.chunker: processChunk()", "id", id) - ref, err := pc.putter.Put(context.TODO(), job.chunk) + ref, err := pc.putter.Put(ctx, job.chunk) if err != nil { - pc.errC <- err + select { + case pc.errC <- err: + case <-pc.quitC: + } } // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) @@ -300,14 +305,14 @@ func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { job.parentWg.Done() } -func (pc *PyramidChunker) loadTree() error { +func (pc *PyramidChunker) loadTree(ctx context.Context) error { log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key)) + chunkData, err := pc.getter.Get(ctx, Reference(pc.key)) if err != nil { return errLoadingTreeRootChunk } - chunkSize := chunkData.Size() + chunkSize := int64(chunkData.Size()) log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize) //if data size is less than a chunk... add a parent with update as pending @@ -356,7 +361,7 @@ func (pc *PyramidChunker) loadTree() error { branchCount = int64(len(ent.chunk)-8) / pc.hashSize for i := int64(0); i < branchCount; i++ { key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] - newChunkData, err := pc.getter.Get(context.TODO(), Reference(key)) + newChunkData, err := pc.getter.Get(ctx, Reference(key)) if err != nil { return errLoadingTreeChunk } @@ -365,7 +370,7 @@ func (pc *PyramidChunker) loadTree() error { newEntry := &TreeEntry{ level: lvl - 1, branchCount: bewBranchCount, - subtreeSize: uint64(newChunkSize), + subtreeSize: newChunkSize, chunk: newChunkData, key: key, index: 0, @@ -385,7 +390,7 @@ func (pc *PyramidChunker) loadTree() error { return nil } -func (pc *PyramidChunker) prepareChunks(isAppend bool) { +func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) { log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend) defer pc.wg.Done() @@ -393,11 +398,11 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { pc.incrementWorkerCount() - go pc.processor(pc.workerCount) + go pc.processor(ctx, pc.workerCount) parent := NewTreeEntry(pc) var unfinishedChunkData ChunkData - var unfinishedChunkSize int64 + var unfinishedChunkSize uint64 if isAppend && len(pc.chunkLevel[0]) != 0 { lastIndex := len(pc.chunkLevel[0]) - 1 @@ -415,16 +420,16 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { } lastBranch := parent.branchCount - 1 - lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] + lastAddress := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] var err error - unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey) + unfinishedChunkData, err = pc.getter.Get(ctx, lastAddress) if err != nil { pc.errC <- err } unfinishedChunkSize = unfinishedChunkData.Size() - if unfinishedChunkSize < pc.chunkSize { - parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize) + if unfinishedChunkSize < uint64(pc.chunkSize) { + parent.subtreeSize = parent.subtreeSize - unfinishedChunkSize parent.branchCount = parent.branchCount - 1 } else { unfinishedChunkData = nil @@ -468,8 +473,8 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) { // Data is exactly one chunk.. pick the last chunk key as root chunkWG.Wait() - lastChunksKey := parent.chunk[8 : 8+pc.hashSize] - copy(pc.rootKey, lastChunksKey) + lastChunksAddress := parent.chunk[8 : 8+pc.hashSize] + copy(pc.rootAddress, lastChunksAddress) break } } else { @@ -502,7 +507,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { // No need to build the tree if the depth is 0 // or we are appending. // Just use the last key. - copy(pc.rootKey, pkey) + copy(pc.rootAddress, pkey) } else { // We need to build the tree and and provide the lonely // chunk key to replace the last tree chunk key. @@ -525,7 +530,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { workers := pc.getWorkerCount() if int64(len(pc.jobC)) > workers && workers < ChunkProcessors { pc.incrementWorkerCount() - go pc.processor(pc.workerCount) + go pc.processor(ctx, pc.workerCount) } } @@ -558,7 +563,7 @@ func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync lvlCount := int64(len(pc.chunkLevel[lvl])) if lvlCount == 1 && last { - copy(pc.rootKey, pc.chunkLevel[lvl][0].key) + copy(pc.rootAddress, pc.chunkLevel[lvl][0].key) return } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 53e3af485..bc2af2cd7 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -25,16 +25,16 @@ import ( "fmt" "hash" "io" - "sync" + "io/ioutil" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/swarm/bmt" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" ) const MaxPO = 16 -const KeyLength = 32 +const AddressLength = 32 type Hasher func() hash.Hash type SwarmHasher func() SwarmHash @@ -116,7 +116,7 @@ func MakeHashFunc(hash string) SwarmHasher { return func() SwarmHash { hasher := sha3.NewKeccak256 hasherSize := hasher().Size() - segmentCount := chunk.DefaultSize / hasherSize + segmentCount := ch.DefaultSize / hasherSize pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) return bmt.New(pool) } @@ -169,88 +169,88 @@ func (c AddressCollection) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -// Chunk also serves as a request object passed to ChunkStores -// in case it is a retrieval request, Data is nil and Size is 0 -// Note that Size is not the size of the data chunk, which is Data.Size() -// but the size of the subtree encoded in the chunk -// 0 if request, to be supplied by the dpa -type Chunk struct { - Addr Address // always - SData []byte // nil if request, to be supplied by dpa - Size int64 // size of the data covered by the subtree encoded in this chunk - //Source Peer // peer - C chan bool // to signal data delivery by the dpa - ReqC chan bool // to signal the request done - dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore - dbStored bool - dbStoredMu *sync.Mutex - errored error // flag which is set when the chunk request has errored or timeouted - erroredMu sync.Mutex +// Chunk interface implemented by context.Contexts and data chunks +type Chunk interface { + Address() Address + Payload() []byte + SpanBytes() []byte + Span() int64 + Data() []byte } -func (c *Chunk) SetErrored(err error) { - c.erroredMu.Lock() - defer c.erroredMu.Unlock() +type chunk struct { + addr Address + sdata []byte + span int64 +} - c.errored = err +func NewChunk(addr Address, data []byte) *chunk { + return &chunk{ + addr: addr, + sdata: data, + span: -1, + } } -func (c *Chunk) GetErrored() error { - c.erroredMu.Lock() - defer c.erroredMu.Unlock() +func (c *chunk) Address() Address { + return c.addr +} - return c.errored +func (c *chunk) SpanBytes() []byte { + return c.sdata[:8] } -func NewChunk(addr Address, reqC chan bool) *Chunk { - return &Chunk{ - Addr: addr, - ReqC: reqC, - dbStoredC: make(chan bool), - dbStoredMu: &sync.Mutex{}, +func (c *chunk) Span() int64 { + if c.span == -1 { + c.span = int64(binary.LittleEndian.Uint64(c.sdata[:8])) } + return c.span } -func (c *Chunk) markAsStored() { - c.dbStoredMu.Lock() - defer c.dbStoredMu.Unlock() - - if !c.dbStored { - close(c.dbStoredC) - c.dbStored = true - } +func (c *chunk) Data() []byte { + return c.sdata } -func (c *Chunk) WaitToStore() error { - <-c.dbStoredC - return c.GetErrored() +func (c *chunk) Payload() []byte { + return c.sdata[8:] } -func GenerateRandomChunk(dataSize int64) *Chunk { - return GenerateRandomChunks(dataSize, 1)[0] +// String() for pretty printing +func (self *chunk) String() string { + return fmt.Sprintf("Address: %v TreeSize: %v Chunksize: %v", self.addr.Log(), self.span, len(self.sdata)) } -func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) { - var i int +func GenerateRandomChunk(dataSize int64) Chunk { hasher := MakeHashFunc(DefaultHash)() - if dataSize > chunk.DefaultSize { - dataSize = chunk.DefaultSize - } + sdata := make([]byte, dataSize+8) + rand.Read(sdata[8:]) + binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize)) + hasher.ResetWithLength(sdata[:8]) + hasher.Write(sdata[8:]) + return NewChunk(hasher.Sum(nil), sdata) +} - for i = 0; i < count; i++ { - chunks = append(chunks, NewChunk(nil, nil)) - chunks[i].SData = make([]byte, dataSize+8) - rand.Read(chunks[i].SData) - binary.LittleEndian.PutUint64(chunks[i].SData[:8], uint64(dataSize)) - hasher.ResetWithLength(chunks[i].SData[:8]) - hasher.Write(chunks[i].SData[8:]) - chunks[i].Addr = make([]byte, 32) - copy(chunks[i].Addr, hasher.Sum(nil)) +func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) { + if dataSize > ch.DefaultSize { + dataSize = ch.DefaultSize + } + for i := 0; i < count; i++ { + ch := GenerateRandomChunk(ch.DefaultSize) + chunks = append(chunks, ch) } - return chunks } +func GenerateRandomData(l int) (r io.Reader, slice []byte) { + slice, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(l))) + if err != nil { + panic("rand error") + } + // log.Warn("generate random data", "len", len(slice), "data", common.Bytes2Hex(slice)) + r = io.LimitReader(bytes.NewReader(slice), int64(l)) + return r, slice +} + // Size, Seek, Read, ReadAt type LazySectionReader interface { Context() context.Context @@ -273,18 +273,17 @@ func (r *LazyTestSectionReader) Context() context.Context { } type StoreParams struct { - Hash SwarmHasher `toml:"-"` - DbCapacity uint64 - CacheCapacity uint - ChunkRequestsCacheCapacity uint - BaseKey []byte + Hash SwarmHasher `toml:"-"` + DbCapacity uint64 + CacheCapacity uint + BaseKey []byte } func NewDefaultStoreParams() *StoreParams { - return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, defaultChunkRequestsCacheCapacity, nil, nil) + return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, nil, nil) } -func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHasher, basekey []byte) *StoreParams { +func NewStoreParams(ldbCap uint64, cacheCap uint, hash SwarmHasher, basekey []byte) *StoreParams { if basekey == nil { basekey = make([]byte, 32) } @@ -292,11 +291,10 @@ func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHa hash = MakeHashFunc(DefaultHash) } return &StoreParams{ - Hash: hash, - DbCapacity: ldbCap, - CacheCapacity: cacheCap, - ChunkRequestsCacheCapacity: requestsCap, - BaseKey: basekey, + Hash: hash, + DbCapacity: ldbCap, + CacheCapacity: cacheCap, + BaseKey: basekey, } } @@ -321,8 +319,8 @@ type Getter interface { } // NOTE: this returns invalid data if chunk is encrypted -func (c ChunkData) Size() int64 { - return int64(binary.LittleEndian.Uint64(c[:8])) +func (c ChunkData) Size() uint64 { + return binary.LittleEndian.Uint64(c[:8]) } func (c ChunkData) Data() []byte { @@ -348,7 +346,8 @@ func NewContentAddressValidator(hasher SwarmHasher) *ContentAddressValidator { // Validate that the given key is a valid content address for the given data func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool { - if l := len(data); l < 9 || l > chunk.DefaultSize+8 { + if l := len(data); l < 9 || l > ch.DefaultSize+8 { + // log.Error("invalid chunk size", "chunk", addr.Hex(), "size", l) return false } @@ -359,3 +358,37 @@ func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool { return bytes.Equal(hash, addr[:]) } + +type ChunkStore interface { + Put(ctx context.Context, ch Chunk) (err error) + Get(rctx context.Context, ref Address) (ch Chunk, err error) + Close() +} + +// SyncChunkStore is a ChunkStore which supports syncing +type SyncChunkStore interface { + ChunkStore + BinIndex(po uint8) uint64 + Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error + FetchFunc(ctx context.Context, ref Address) func(context.Context) error +} + +// FakeChunkStore doesn't store anything, just implements the ChunkStore interface +// It can be used to inject into a hasherStore if you don't want to actually store data just do the +// hashing +type FakeChunkStore struct { +} + +// Put doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Put(_ context.Context, ch Chunk) error { + return nil +} + +// Gut doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Get(_ context.Context, ref Address) (Chunk, error) { + panic("FakeChunkStore doesn't support Get") +} + +// Close doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Close() { +} |