aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/common_test.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/common_test.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloaddexon-e187711c6545487d4cac3701f0f506bb536234e2.tar
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/common_test.go')
-rw-r--r--swarm/storage/common_test.go204
1 files changed, 154 insertions, 50 deletions
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index cd4c2ef13..c6e97d68f 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -19,14 +19,27 @@ package storage
import (
"bytes"
"crypto/rand"
+ "flag"
"fmt"
"io"
"sync"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ colorable "github.com/mattn/go-colorable"
)
+var (
+ loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+)
+
+func init() {
+ flag.Parse()
+ log.PrintOrigins(true)
+ log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
+}
+
type brokenLimitedReader struct {
lr io.Reader
errAt int
@@ -42,19 +55,94 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader
}
}
+func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) {
+ return mput(store, processors, n, GenerateRandomChunk)
+}
+
+func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) {
+ wg := sync.WaitGroup{}
+ wg.Add(processors)
+ c := make(chan *Chunk)
+ for i := 0; i < processors; i++ {
+ go func() {
+ defer wg.Done()
+ for chunk := range c {
+ wg.Add(1)
+ chunk := chunk
+ store.Put(chunk)
+ go func() {
+ defer wg.Done()
+ <-chunk.dbStoredC
+ }()
+ }
+ }()
+ }
+ fa := f
+ if _, ok := store.(*MemStore); ok {
+ fa = func(i int64) *Chunk {
+ chunk := f(i)
+ chunk.markAsStored()
+ return chunk
+ }
+ }
+ for i := 0; i < n; i++ {
+ chunk := fa(int64(i))
+ hs = append(hs, chunk.Addr)
+ c <- chunk
+ }
+ close(c)
+ wg.Wait()
+ return hs
+}
+
+func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error {
+ wg := sync.WaitGroup{}
+ wg.Add(len(hs))
+ errc := make(chan error)
+
+ for _, k := range hs {
+ go func(h Address) {
+ defer wg.Done()
+ chunk, err := store.Get(h)
+ if err != nil {
+ errc <- err
+ return
+ }
+ if f != nil {
+ err = f(h, chunk)
+ if err != nil {
+ errc <- err
+ return
+ }
+ }
+ }(k)
+ }
+ go func() {
+ wg.Wait()
+ close(errc)
+ }()
+ var err error
+ select {
+ case err = <-errc:
+ case <-time.NewTimer(5 * time.Second).C:
+ err = fmt.Errorf("timed out after 5 seconds")
+ }
+ return err
+}
+
func testDataReader(l int) (r io.Reader) {
return io.LimitReader(rand.Reader, int64(l))
}
-func (self *brokenLimitedReader) Read(buf []byte) (int, error) {
- if self.off+len(buf) > self.errAt {
+func (r *brokenLimitedReader) Read(buf []byte) (int, error) {
+ if r.off+len(buf) > r.errAt {
return 0, fmt.Errorf("Broken reader")
}
- self.off += len(buf)
- return self.lr.Read(buf)
+ r.off += len(buf)
+ return r.lr.Read(buf)
}
-func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) {
+func generateRandomData(l int) (r io.Reader, slice []byte) {
slice = make([]byte, l)
if _, err := rand.Read(slice); err != nil {
panic("rand error")
@@ -63,54 +151,70 @@ func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) {
return
}
-func testStore(m ChunkStore, l int64, branches int64, t *testing.T) {
+func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
+ hs := mputRandomChunks(m, processors, n, chunksize)
+ err := mget(m, hs, nil)
+ if err != nil {
+ t.Fatalf("testStore failed: %v", err)
+ }
+}
- chunkC := make(chan *Chunk)
- go func() {
- for chunk := range chunkC {
- m.Put(chunk)
- if chunk.wg != nil {
- chunk.wg.Done()
- }
+func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
+ hs := mputRandomChunks(m, processors, n, chunksize)
+ f := func(h Address, chunk *Chunk) error {
+ if !bytes.Equal(h, chunk.Addr) {
+ return fmt.Errorf("key does not match retrieved chunk Key")
}
- }()
- chunker := NewTreeChunker(&ChunkerParams{
- Branches: branches,
- Hash: SHA3Hash,
- })
- swg := &sync.WaitGroup{}
- key, _ := chunker.Split(rand.Reader, l, chunkC, swg, nil)
- swg.Wait()
- close(chunkC)
- chunkC = make(chan *Chunk)
-
- quit := make(chan bool)
-
- go func() {
- for ch := range chunkC {
- go func(chunk *Chunk) {
- storedChunk, err := m.Get(chunk.Key)
- if err == notFound {
- log.Trace(fmt.Sprintf("chunk '%v' not found", chunk.Key.Log()))
- } else if err != nil {
- log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err))
- } else {
- chunk.SData = storedChunk.SData
- chunk.Size = storedChunk.Size
- }
- log.Trace(fmt.Sprintf("chunk '%v' not found", chunk.Key.Log()))
- close(chunk.C)
- }(ch)
+ hasher := MakeHashFunc(DefaultHash)()
+ hasher.ResetWithLength(chunk.SData[:8])
+ hasher.Write(chunk.SData[8:])
+ exp := hasher.Sum(nil)
+ if !bytes.Equal(h, exp) {
+ return fmt.Errorf("key is not hash of chunk data")
}
- close(quit)
- }()
- r := chunker.Join(key, chunkC)
+ return nil
+ }
+ err := mget(m, hs, f)
+ if err != nil {
+ t.Fatalf("testStore failed: %v", err)
+ }
+}
+
+func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
+ chunks := make([]*Chunk, n)
+ i := 0
+ f := func(dataSize int64) *Chunk {
+ chunk := GenerateRandomChunk(dataSize)
+ chunks[i] = chunk
+ i++
+ return chunk
+ }
+
+ mput(store, processors, n, f)
+
+ f = func(dataSize int64) *Chunk {
+ chunk := chunks[i]
+ i++
+ return chunk
+ }
- b := make([]byte, l)
- n, err := r.ReadAt(b, 0)
- if err != io.EOF {
- t.Fatalf("read error (%v/%v) %v", n, l, err)
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ for j := 0; j < b.N; j++ {
+ i = 0
+ mput(store, processors, n, f)
+ }
+}
+
+func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
+ hs := mputRandomChunks(store, processors, n, chunksize)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := mget(store, hs, nil)
+ if err != nil {
+ b.Fatalf("mget failed: %v", err)
+ }
}
- close(chunkC)
- <-quit
}