aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/common_test.go
diff options
context:
space:
mode:
authorBalint Gabor <balint.g@gmail.com>2018-09-13 17:42:19 +0800
committerGitHub <noreply@github.com>2018-09-13 17:42:19 +0800
commit3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch)
tree62a2896b3b824449595272f0b92dda877ba1c58d /swarm/storage/common_test.go
parentff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff)
downloadgo-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.bz2
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.lz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.xz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org> Co-authored-by: Balint Gabor <balint.g@gmail.com> Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com> Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/storage/common_test.go')
-rw-r--r--swarm/storage/common_test.go192
1 files changed, 132 insertions, 60 deletions
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index dc1a3ab35..33133edd7 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -23,16 +23,20 @@ import (
"flag"
"fmt"
"io"
+ "io/ioutil"
+ "os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
colorable "github.com/mattn/go-colorable"
)
var (
- loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+ loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+ getTimeout = 30 * time.Second
)
func init() {
@@ -56,47 +60,73 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader
}
}
-func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) {
- return mput(store, processors, n, GenerateRandomChunk)
+func newLDBStore(t *testing.T) (*LDBStore, func()) {
+ dir, err := ioutil.TempDir("", "bzz-storage-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ log.Trace("memstore.tempdir", "dir", dir)
+
+ ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir)
+ db, err := NewLDBStore(ldbparams)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cleanup := func() {
+ db.Close()
+ err := os.RemoveAll(dir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ return db, cleanup
}
-func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) {
- wg := sync.WaitGroup{}
- wg.Add(processors)
- c := make(chan *Chunk)
- for i := 0; i < processors; i++ {
+func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) {
+ return mput(store, n, GenerateRandomChunk)
+}
+
+func mputChunks(store ChunkStore, chunks ...Chunk) error {
+ i := 0
+ f := func(n int64) Chunk {
+ chunk := chunks[i]
+ i++
+ return chunk
+ }
+ _, err := mput(store, len(chunks), f)
+ return err
+}
+
+func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) {
+ // put to localstore and wait for stored channel
+ // does not check delivery error state
+ errc := make(chan error)
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+ for i := int64(0); i < int64(n); i++ {
+ chunk := f(ch.DefaultSize)
go func() {
- defer wg.Done()
- for chunk := range c {
- wg.Add(1)
- chunk := chunk
- store.Put(context.TODO(), chunk)
- go func() {
- defer wg.Done()
- <-chunk.dbStoredC
- }()
+ select {
+ case errc <- store.Put(ctx, chunk):
+ case <-ctx.Done():
}
}()
+ hs = append(hs, chunk)
}
- fa := f
- if _, ok := store.(*MemStore); ok {
- fa = func(i int64) *Chunk {
- chunk := f(i)
- chunk.markAsStored()
- return chunk
- }
- }
+
+ // wait for all chunks to be stored
for i := 0; i < n; i++ {
- chunk := fa(int64(i))
- hs = append(hs, chunk.Addr)
- c <- chunk
+ err := <-errc
+ if err != nil {
+ return nil, err
+ }
}
- close(c)
- wg.Wait()
- return hs
+ return hs, nil
}
-func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error {
+func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error) error {
wg := sync.WaitGroup{}
wg.Add(len(hs))
errc := make(chan error)
@@ -104,6 +134,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error)
for _, k := range hs {
go func(h Address) {
defer wg.Done()
+ // TODO: write timeout with context
chunk, err := store.Get(context.TODO(), h)
if err != nil {
errc <- err
@@ -143,57 +174,54 @@ func (r *brokenLimitedReader) Read(buf []byte) (int, error) {
return r.lr.Read(buf)
}
-func generateRandomData(l int) (r io.Reader, slice []byte) {
- slice = make([]byte, l)
- if _, err := rand.Read(slice); err != nil {
- panic("rand error")
+func testStoreRandom(m ChunkStore, n int, chunksize int64, t *testing.T) {
+ chunks, err := mputRandomChunks(m, n, chunksize)
+ if err != nil {
+ t.Fatalf("expected no error, got %v", err)
}
- r = io.LimitReader(bytes.NewReader(slice), int64(l))
- return
-}
-
-func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
- hs := mputRandomChunks(m, processors, n, chunksize)
- err := mget(m, hs, nil)
+ err = mget(m, chunkAddresses(chunks), nil)
if err != nil {
t.Fatalf("testStore failed: %v", err)
}
}
-func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
- hs := mputRandomChunks(m, processors, n, chunksize)
- f := func(h Address, chunk *Chunk) error {
- if !bytes.Equal(h, chunk.Addr) {
- return fmt.Errorf("key does not match retrieved chunk Key")
+func testStoreCorrect(m ChunkStore, n int, chunksize int64, t *testing.T) {
+ chunks, err := mputRandomChunks(m, n, chunksize)
+ if err != nil {
+ t.Fatalf("expected no error, got %v", err)
+ }
+ f := func(h Address, chunk Chunk) error {
+ if !bytes.Equal(h, chunk.Address()) {
+ return fmt.Errorf("key does not match retrieved chunk Address")
}
hasher := MakeHashFunc(DefaultHash)()
- hasher.ResetWithLength(chunk.SData[:8])
- hasher.Write(chunk.SData[8:])
+ hasher.ResetWithLength(chunk.SpanBytes())
+ hasher.Write(chunk.Payload())
exp := hasher.Sum(nil)
if !bytes.Equal(h, exp) {
return fmt.Errorf("key is not hash of chunk data")
}
return nil
}
- err := mget(m, hs, f)
+ err = mget(m, chunkAddresses(chunks), f)
if err != nil {
t.Fatalf("testStore failed: %v", err)
}
}
-func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
- chunks := make([]*Chunk, n)
+func benchmarkStorePut(store ChunkStore, n int, chunksize int64, b *testing.B) {
+ chunks := make([]Chunk, n)
i := 0
- f := func(dataSize int64) *Chunk {
+ f := func(dataSize int64) Chunk {
chunk := GenerateRandomChunk(dataSize)
chunks[i] = chunk
i++
return chunk
}
- mput(store, processors, n, f)
+ mput(store, n, f)
- f = func(dataSize int64) *Chunk {
+ f = func(dataSize int64) Chunk {
chunk := chunks[i]
i++
return chunk
@@ -204,18 +232,62 @@ func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64,
for j := 0; j < b.N; j++ {
i = 0
- mput(store, processors, n, f)
+ mput(store, n, f)
}
}
-func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
- hs := mputRandomChunks(store, processors, n, chunksize)
+func benchmarkStoreGet(store ChunkStore, n int, chunksize int64, b *testing.B) {
+ chunks, err := mputRandomChunks(store, n, chunksize)
+ if err != nil {
+ b.Fatalf("expected no error, got %v", err)
+ }
b.ReportAllocs()
b.ResetTimer()
+ addrs := chunkAddresses(chunks)
for i := 0; i < b.N; i++ {
- err := mget(store, hs, nil)
+ err := mget(store, addrs, nil)
if err != nil {
b.Fatalf("mget failed: %v", err)
}
}
}
+
+// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory.
+type MapChunkStore struct {
+ chunks map[string]Chunk
+ mu sync.RWMutex
+}
+
+func NewMapChunkStore() *MapChunkStore {
+ return &MapChunkStore{
+ chunks: make(map[string]Chunk),
+ }
+}
+
+func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.chunks[ch.Address().Hex()] = ch
+ return nil
+}
+
+func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ chunk := m.chunks[ref.Hex()]
+ if chunk == nil {
+ return nil, ErrChunkNotFound
+ }
+ return chunk, nil
+}
+
+func (m *MapChunkStore) Close() {
+}
+
+func chunkAddresses(chunks []Chunk) []Address {
+ addrs := make([]Address, len(chunks))
+ for i, ch := range chunks {
+ addrs[i] = ch.Address()
+ }
+ return addrs
+}