diff options
author | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-06-22 05:00:43 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-22 05:00:43 +0800 |
commit | eaff89291ce998ba4bf9b9816ca8a15c8b85f440 (patch) | |
tree | c77d7a06627a1a7f578d0fec8e39788e66672e53 /swarm/storage/common_test.go | |
parent | d926bf2c7e3182d694c15829a37a0ca7331cd03c (diff) | |
parent | e187711c6545487d4cac3701f0f506bb536234e2 (diff) | |
download | dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.gz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.bz2 dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.lz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.xz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.zst dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.zip |
Merge pull request #17041 from ethersphere/swarm-network-rewrite-merge
Swarm POC3 - happy solstice
Diffstat (limited to 'swarm/storage/common_test.go')
-rw-r--r-- | swarm/storage/common_test.go | 204 |
1 files changed, 154 insertions, 50 deletions
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index cd4c2ef13..c6e97d68f 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -19,14 +19,27 @@ package storage import ( "bytes" "crypto/rand" + "flag" "fmt" "io" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/log" + colorable "github.com/mattn/go-colorable" ) +var ( + loglevel = flag.Int("loglevel", 3, "verbosity of logs") +) + +func init() { + flag.Parse() + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) +} + type brokenLimitedReader struct { lr io.Reader errAt int @@ -42,19 +55,94 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader } } +func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) { + return mput(store, processors, n, GenerateRandomChunk) +} + +func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) { + wg := sync.WaitGroup{} + wg.Add(processors) + c := make(chan *Chunk) + for i := 0; i < processors; i++ { + go func() { + defer wg.Done() + for chunk := range c { + wg.Add(1) + chunk := chunk + store.Put(chunk) + go func() { + defer wg.Done() + <-chunk.dbStoredC + }() + } + }() + } + fa := f + if _, ok := store.(*MemStore); ok { + fa = func(i int64) *Chunk { + chunk := f(i) + chunk.markAsStored() + return chunk + } + } + for i := 0; i < n; i++ { + chunk := fa(int64(i)) + hs = append(hs, chunk.Addr) + c <- chunk + } + close(c) + wg.Wait() + return hs +} + +func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error { + wg := sync.WaitGroup{} + wg.Add(len(hs)) + errc := make(chan error) + + for _, k := range hs { + go func(h Address) { + defer wg.Done() + chunk, err := store.Get(h) + if err != nil { + errc <- err + return + } + if f != nil { + err = f(h, chunk) + if err != nil { + errc <- err + return + } + } + }(k) + } + go func() { + wg.Wait() + close(errc) + }() + var err error + select { + case err = <-errc: + case <-time.NewTimer(5 * time.Second).C: + err = fmt.Errorf("timed out after 5 seconds") + } + return err +} + func testDataReader(l int) (r io.Reader) { return io.LimitReader(rand.Reader, int64(l)) } -func (self *brokenLimitedReader) Read(buf []byte) (int, error) { - if self.off+len(buf) > self.errAt { +func (r *brokenLimitedReader) Read(buf []byte) (int, error) { + if r.off+len(buf) > r.errAt { return 0, fmt.Errorf("Broken reader") } - self.off += len(buf) - return self.lr.Read(buf) + r.off += len(buf) + return r.lr.Read(buf) } -func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { +func generateRandomData(l int) (r io.Reader, slice []byte) { slice = make([]byte, l) if _, err := rand.Read(slice); err != nil { panic("rand error") @@ -63,54 +151,70 @@ func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { return } -func testStore(m ChunkStore, l int64, branches int64, t *testing.T) { +func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { + hs := mputRandomChunks(m, processors, n, chunksize) + err := mget(m, hs, nil) + if err != nil { + t.Fatalf("testStore failed: %v", err) + } +} - chunkC := make(chan *Chunk) - go func() { - for chunk := range chunkC { - m.Put(chunk) - if chunk.wg != nil { - chunk.wg.Done() - } +func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { + hs := mputRandomChunks(m, processors, n, chunksize) + f := func(h Address, chunk *Chunk) error { + if !bytes.Equal(h, chunk.Addr) { + return fmt.Errorf("key does not match retrieved chunk Key") } - }() - chunker := NewTreeChunker(&ChunkerParams{ - Branches: branches, - Hash: SHA3Hash, - }) - swg := &sync.WaitGroup{} - key, _ := chunker.Split(rand.Reader, l, chunkC, swg, nil) - swg.Wait() - close(chunkC) - chunkC = make(chan *Chunk) - - quit := make(chan bool) - - go func() { - for ch := range chunkC { - go func(chunk *Chunk) { - storedChunk, err := m.Get(chunk.Key) - if err == notFound { - log.Trace(fmt.Sprintf("chunk '%v' not found", chunk.Key.Log())) - } else if err != nil { - log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err)) - } else { - chunk.SData = storedChunk.SData - chunk.Size = storedChunk.Size - } - log.Trace(fmt.Sprintf("chunk '%v' not found", chunk.Key.Log())) - close(chunk.C) - }(ch) + hasher := MakeHashFunc(DefaultHash)() + hasher.ResetWithLength(chunk.SData[:8]) + hasher.Write(chunk.SData[8:]) + exp := hasher.Sum(nil) + if !bytes.Equal(h, exp) { + return fmt.Errorf("key is not hash of chunk data") } - close(quit) - }() - r := chunker.Join(key, chunkC) + return nil + } + err := mget(m, hs, f) + if err != nil { + t.Fatalf("testStore failed: %v", err) + } +} + +func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { + chunks := make([]*Chunk, n) + i := 0 + f := func(dataSize int64) *Chunk { + chunk := GenerateRandomChunk(dataSize) + chunks[i] = chunk + i++ + return chunk + } + + mput(store, processors, n, f) + + f = func(dataSize int64) *Chunk { + chunk := chunks[i] + i++ + return chunk + } - b := make([]byte, l) - n, err := r.ReadAt(b, 0) - if err != io.EOF { - t.Fatalf("read error (%v/%v) %v", n, l, err) + b.ReportAllocs() + b.ResetTimer() + + for j := 0; j < b.N; j++ { + i = 0 + mput(store, processors, n, f) + } +} + +func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { + hs := mputRandomChunks(store, processors, n, chunksize) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := mget(store, hs, nil) + if err != nil { + b.Fatalf("mget failed: %v", err) + } } - close(chunkC) - <-quit } |