aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2018-07-09 20:11:49 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-09 20:11:49 +0800
commitb3711af05176f446fad5ee90e2be4bd09c4086a2 (patch)
tree036eb23e423c385c0be00e3f8d3d97dea7040f8c /swarm/storage
parent30bdf817a0d0afb33f3635f1de877f9caf09be05 (diff)
downloaddexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.gz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.bz2
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.lz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.xz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.zst
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.zip
swarm: ctx propagation; bmt fixes; pss generic notification framework (#17150)
* cmd/swarm: minor cli flag text adjustments * swarm/api/http: sticky footer for swarm landing page using flex * swarm/api/http: sticky footer for error pages and fix for multiple choices * cmd/swarm, swarm/storage, swarm: fix mingw on windows test issues * cmd/swarm: update description of swarm cmd * swarm: added network ID test * cmd/swarm: support for smoke tests on the production swarm cluster * cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion * swarm: propagate ctx to internal apis (#754) * swarm/metrics: collect disk measurements * swarm/bmt: fix io.Writer interface * Write now tolerates arbitrary variable buffers * added variable buffer tests * Write loop and finalise optimisation * refactor / rename * add tests for empty input * swarm/pss: (UPDATE) Generic notifications package (#744) swarm/pss: Generic package for creating pss notification svcs * swarm: Adding context to more functions * swarm/api: change colour of landing page in templates * swarm/api: change landing page to react to enter keypress
Diffstat (limited to 'swarm/storage')
-rw-r--r--swarm/storage/chunker.go13
-rw-r--r--swarm/storage/chunker_test.go67
-rw-r--r--swarm/storage/filestore.go9
-rw-r--r--swarm/storage/filestore_test.go27
-rw-r--r--swarm/storage/hasherstore.go4
-rw-r--r--swarm/storage/hasherstore_test.go6
-rw-r--r--swarm/storage/ldbstore_test.go4
-rw-r--r--swarm/storage/pyramid.go13
-rw-r--r--swarm/storage/types.go3
9 files changed, 94 insertions, 52 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index 5780742e3..2d197fefa 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -16,6 +16,7 @@
package storage
import (
+ "context"
"encoding/binary"
"errors"
"fmt"
@@ -126,7 +127,7 @@ type TreeChunker struct {
The chunks are not meant to be validated by the chunker when joining. This
is because it is left to the DPA to decide which sources are trusted.
*/
-func TreeJoin(addr Address, getter Getter, depth int) *LazyChunkReader {
+func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader {
jp := &JoinerParams{
ChunkerParams: ChunkerParams{
chunkSize: DefaultChunkSize,
@@ -137,14 +138,14 @@ func TreeJoin(addr Address, getter Getter, depth int) *LazyChunkReader {
depth: depth,
}
- return NewTreeJoiner(jp).Join()
+ return NewTreeJoiner(jp).Join(ctx)
}
/*
When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
New chunks to store are store using the putter which the caller provides.
*/
-func TreeSplit(data io.Reader, size int64, putter Putter) (k Address, wait func(), err error) {
+func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) (k Address, wait func(context.Context) error, err error) {
tsp := &TreeSplitterParams{
SplitterParams: SplitterParams{
ChunkerParams: ChunkerParams{
@@ -156,7 +157,7 @@ func TreeSplit(data io.Reader, size int64, putter Putter) (k Address, wait func(
},
size: size,
}
- return NewTreeSplitter(tsp).Split()
+ return NewTreeSplitter(tsp).Split(ctx)
}
func NewTreeJoiner(params *JoinerParams) *TreeChunker {
@@ -224,7 +225,7 @@ func (tc *TreeChunker) decrementWorkerCount() {
tc.workerCount -= 1
}
-func (tc *TreeChunker) Split() (k Address, wait func(), err error) {
+func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
if tc.chunkSize <= 0 {
panic("chunker must be initialised")
}
@@ -380,7 +381,7 @@ type LazyChunkReader struct {
getter Getter
}
-func (tc *TreeChunker) Join() *LazyChunkReader {
+func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader {
return &LazyChunkReader{
key: tc.addr,
chunkSize: tc.chunkSize,
diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go
index d8be13ef6..69c388b39 100644
--- a/swarm/storage/chunker_test.go
+++ b/swarm/storage/chunker_test.go
@@ -18,6 +18,7 @@ package storage
import (
"bytes"
+ "context"
"crypto/rand"
"encoding/binary"
"errors"
@@ -81,7 +82,7 @@ func testRandomBrokenData(n int, tester *chunkerTester) {
putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
expectedError := fmt.Errorf("Broken reader")
- addr, _, err := TreeSplit(brokendata, int64(n), putGetter)
+ addr, _, err := TreeSplit(context.TODO(), brokendata, int64(n), putGetter)
if err == nil || err.Error() != expectedError.Error() {
tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err)
}
@@ -104,20 +105,24 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester)
putGetter := newTestHasherStore(NewMapChunkStore(), hash)
var addr Address
- var wait func()
+ var wait func(context.Context) error
var err error
+ ctx := context.TODO()
if usePyramid {
- addr, wait, err = PyramidSplit(data, putGetter, putGetter)
+ addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter)
} else {
- addr, wait, err = TreeSplit(data, int64(n), putGetter)
+ addr, wait, err = TreeSplit(ctx, data, int64(n), putGetter)
}
if err != nil {
tester.t.Fatalf(err.Error())
}
tester.t.Logf(" Key = %v\n", addr)
- wait()
+ err = wait(ctx)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
- reader := TreeJoin(addr, putGetter, 0)
+ reader := TreeJoin(context.TODO(), addr, putGetter, 0)
output := make([]byte, n)
r, err := reader.Read(output)
if r != n || err != io.EOF {
@@ -200,11 +205,15 @@ func TestDataAppend(t *testing.T) {
chunkStore := NewMapChunkStore()
putGetter := newTestHasherStore(chunkStore, SHA3Hash)
- addr, wait, err := PyramidSplit(data, putGetter, putGetter)
+ ctx := context.TODO()
+ addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ err = wait(ctx)
if err != nil {
tester.t.Fatalf(err.Error())
}
- wait()
//create a append data stream
appendInput, found := tester.inputs[uint64(m)]
@@ -217,13 +226,16 @@ func TestDataAppend(t *testing.T) {
}
putGetter = newTestHasherStore(chunkStore, SHA3Hash)
- newAddr, wait, err := PyramidAppend(addr, appendData, putGetter, putGetter)
+ newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ err = wait(ctx)
if err != nil {
tester.t.Fatalf(err.Error())
}
- wait()
- reader := TreeJoin(newAddr, putGetter, 0)
+ reader := TreeJoin(ctx, newAddr, putGetter, 0)
newOutput := make([]byte, n+m)
r, err := reader.Read(newOutput)
if r != (n + m) {
@@ -282,12 +294,16 @@ func benchmarkSplitJoin(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
- key, wait, err := PyramidSplit(data, putGetter, putGetter)
+ ctx := context.TODO()
+ key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
- wait()
- reader := TreeJoin(key, putGetter, 0)
+ err = wait(ctx)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ reader := TreeJoin(ctx, key, putGetter, 0)
benchReadAll(reader)
}
}
@@ -298,7 +314,7 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash)
- _, _, err := TreeSplit(data, int64(n), putGetter)
+ _, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@@ -311,7 +327,7 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash)
- _, _, err := TreeSplit(data, int64(n), putGetter)
+ _, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@@ -324,7 +340,7 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash)
- _, _, err := PyramidSplit(data, putGetter, putGetter)
+ _, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@@ -338,7 +354,7 @@ func benchmarkSplitPyramidBMT(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash)
- _, _, err := PyramidSplit(data, putGetter, putGetter)
+ _, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@@ -354,18 +370,25 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
chunkStore := NewMapChunkStore()
putGetter := newTestHasherStore(chunkStore, SHA3Hash)
- key, wait, err := PyramidSplit(data, putGetter, putGetter)
+ ctx := context.TODO()
+ key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
- wait()
putGetter = newTestHasherStore(chunkStore, SHA3Hash)
- _, wait, err = PyramidAppend(key, data1, putGetter, putGetter)
+ _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
- wait()
}
}
diff --git a/swarm/storage/filestore.go b/swarm/storage/filestore.go
index c0b463deb..2d8d82d95 100644
--- a/swarm/storage/filestore.go
+++ b/swarm/storage/filestore.go
@@ -17,6 +17,7 @@
package storage
import (
+ "context"
"io"
)
@@ -78,18 +79,18 @@ func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
// Chunk retrieval blocks on netStore requests with a timeout so reader will
// report error if retrieval of chunks within requested range time out.
// It returns a reader with the chunk data and whether the content was encrypted
-func (f *FileStore) Retrieve(addr Address) (reader *LazyChunkReader, isEncrypted bool) {
+func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) {
isEncrypted = len(addr) > f.hashFunc().Size()
getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted)
- reader = TreeJoin(addr, getter, 0)
+ reader = TreeJoin(ctx, addr, getter, 0)
return
}
// Public API. Main entry point for document storage directly. Used by the
// FS-aware API and httpaccess
-func (f *FileStore) Store(data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(), err error) {
+func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) {
putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt)
- return PyramidSplit(data, putter, putter)
+ return PyramidSplit(ctx, data, putter, putter)
}
func (f *FileStore) HashSize() int {
diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go
index 1aaec5e5c..f3f597255 100644
--- a/swarm/storage/filestore_test.go
+++ b/swarm/storage/filestore_test.go
@@ -18,6 +18,7 @@ package storage
import (
"bytes"
+ "context"
"io"
"io/ioutil"
"os"
@@ -49,12 +50,16 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
defer os.RemoveAll("/tmp/bzz")
reader, slice := generateRandomData(testDataSize)
- key, wait, err := fileStore.Store(reader, testDataSize, toEncrypt)
+ ctx := context.TODO()
+ key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt)
if err != nil {
t.Errorf("Store error: %v", err)
}
- wait()
- resultReader, isEncrypted := fileStore.Retrieve(key)
+ err = wait(ctx)
+ if err != nil {
+ t.Fatalf("Store waitt error: %v", err.Error())
+ }
+ resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@@ -72,7 +77,7 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666)
ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666)
localStore.memStore = NewMemStore(NewDefaultStoreParams(), db)
- resultReader, isEncrypted = fileStore.Retrieve(key)
+ resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@@ -110,12 +115,16 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
}
fileStore := NewFileStore(localStore, NewFileStoreParams())
reader, slice := generateRandomData(testDataSize)
- key, wait, err := fileStore.Store(reader, testDataSize, toEncrypt)
+ ctx := context.TODO()
+ key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt)
+ if err != nil {
+ t.Errorf("Store error: %v", err)
+ }
+ err = wait(ctx)
if err != nil {
t.Errorf("Store error: %v", err)
}
- wait()
- resultReader, isEncrypted := fileStore.Retrieve(key)
+ resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@@ -134,7 +143,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
memStore.setCapacity(0)
// check whether it is, indeed, empty
fileStore.ChunkStore = memStore
- resultReader, isEncrypted = fileStore.Retrieve(key)
+ resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@@ -144,7 +153,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
// check how it works with localStore
fileStore.ChunkStore = localStore
// localStore.dbStore.setCapacity(0)
- resultReader, isEncrypted = fileStore.Retrieve(key)
+ resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go
index e659c3681..e18b66ddc 100644
--- a/swarm/storage/hasherstore.go
+++ b/swarm/storage/hasherstore.go
@@ -17,6 +17,7 @@
package storage
import (
+ "context"
"fmt"
"sync"
@@ -126,9 +127,10 @@ func (h *hasherStore) Close() {
// Wait returns when
// 1) the Close() function has been called and
// 2) all the chunks which has been Put has been stored
-func (h *hasherStore) Wait() {
+func (h *hasherStore) Wait(ctx context.Context) error {
<-h.closed
h.wg.Wait()
+ return nil
}
func (h *hasherStore) createHash(chunkData ChunkData) Address {
diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go
index ccb37524a..cf7b0dcc3 100644
--- a/swarm/storage/hasherstore_test.go
+++ b/swarm/storage/hasherstore_test.go
@@ -18,6 +18,7 @@ package storage
import (
"bytes"
+ "context"
"testing"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
@@ -60,7 +61,10 @@ func TestHasherStore(t *testing.T) {
hasherStore.Close()
// Wait until chunks are really stored
- hasherStore.Wait()
+ err = hasherStore.Wait(context.TODO())
+ if err != nil {
+ t.Fatalf("Expected no error got \"%v\"", err)
+ }
// Get the first chunk
retrievedChunkData1, err := hasherStore.Get(key1)
diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go
index 2c706a75b..2453d2f30 100644
--- a/swarm/storage/ldbstore_test.go
+++ b/swarm/storage/ldbstore_test.go
@@ -59,12 +59,12 @@ func newTestDbStore(mock bool, trusted bool) (*testDbStore, func(), error) {
}
cleanup := func() {
- if err != nil {
+ if db != nil {
db.Close()
}
err = os.RemoveAll(dir)
if err != nil {
- panic("db cleanup failed")
+ panic(fmt.Sprintf("db cleanup failed: %v", err))
}
}
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
index 01172cb77..6643e989a 100644
--- a/swarm/storage/pyramid.go
+++ b/swarm/storage/pyramid.go
@@ -17,6 +17,7 @@
package storage
import (
+ "context"
"encoding/binary"
"errors"
"io"
@@ -99,12 +100,12 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get
When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
New chunks to store are store using the putter which the caller provides.
*/
-func PyramidSplit(reader io.Reader, putter Putter, getter Getter) (Address, func(), error) {
- return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split()
+func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
+ return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split(ctx)
}
-func PyramidAppend(addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(), error) {
- return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append()
+func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
+ return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append(ctx)
}
// Entry to create a tree node
@@ -203,7 +204,7 @@ func (pc *PyramidChunker) decrementWorkerCount() {
pc.workerCount -= 1
}
-func (pc *PyramidChunker) Split() (k Address, wait func(), err error) {
+func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
log.Debug("pyramid.chunker: Split()")
pc.wg.Add(1)
@@ -235,7 +236,7 @@ func (pc *PyramidChunker) Split() (k Address, wait func(), err error) {
}
-func (pc *PyramidChunker) Append() (k Address, wait func(), err error) {
+func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
log.Debug("pyramid.chunker: Append()")
// Load the right most unfinished tree chunks in every level
pc.loadTree()
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index b75f64205..32880ead7 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -18,6 +18,7 @@ package storage
import (
"bytes"
+ "context"
"crypto"
"crypto/rand"
"encoding/binary"
@@ -303,7 +304,7 @@ type Putter interface {
// Close is to indicate that no more chunk data will be Put on this Putter
Close()
// Wait returns if all data has been store and the Close() was called.
- Wait()
+ Wait(context.Context) error
}
// Getter is an interface to retrieve a chunk's data by its reference