aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/api
diff options
context:
space:
mode:
authorElad <theman@elad.im>2019-05-06 02:34:22 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2019-05-10 18:26:52 +0800
commitad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1 (patch)
tree396077f7f33307321110f17bc3a19abe115e5d47 /swarm/api
parent3030893a21b17a0e90ddd0047d0f310fee8335a0 (diff)
downloadgo-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.gz
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.bz2
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.lz
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.xz
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.zst
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.zip
swarm: push tags integration - request flow
swarm/api: integrate tags to count chunks being split and stored swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package
Diffstat (limited to 'swarm/api')
-rw-r--r--swarm/api/api.go32
-rw-r--r--swarm/api/api_test.go108
-rw-r--r--swarm/api/client/client.go32
-rw-r--r--swarm/api/client/client_test.go21
-rw-r--r--swarm/api/filesystem_test.go3
-rw-r--r--swarm/api/http/middleware.go49
-rw-r--r--swarm/api/http/response.go2
-rw-r--r--swarm/api/http/server.go60
-rw-r--r--swarm/api/http/server_test.go152
-rw-r--r--swarm/api/http/test_server.go9
-rw-r--r--swarm/api/manifest_test.go7
-rw-r--r--swarm/api/storage.go85
-rw-r--r--swarm/api/storage_test.go56
13 files changed, 385 insertions, 231 deletions
diff --git a/swarm/api/api.go b/swarm/api/api.go
index 86c111923..96fb86e1c 100644
--- a/swarm/api/api.go
+++ b/swarm/api/api.go
@@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -53,8 +54,6 @@ import (
var (
apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil)
apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil)
- apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil)
- apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil)
apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil)
apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil)
apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil)
@@ -188,15 +187,17 @@ type API struct {
feed *feed.Handler
fileStore *storage.FileStore
dns Resolver
+ Tags *chunk.Tags
Decryptor func(context.Context, string) DecryptFunc
}
// NewAPI the api constructor initialises a new API instance.
-func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) {
+func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) {
self = &API{
fileStore: fileStore,
dns: dns,
feed: feedHandler,
+ Tags: tags,
Decryptor: func(ctx context.Context, credentials string) DecryptFunc {
return self.doDecrypt(ctx, credentials, pk)
},
@@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto
return addr, nil
}
-// Put provides singleton manifest creation on top of FileStore store
-func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
- apiPutCount.Inc(1)
- r := strings.NewReader(content)
- key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
- if err != nil {
- apiPutFail.Inc(1)
- return nil, nil, err
- }
- manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
- r = strings.NewReader(manifest)
- key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
- if err != nil {
- apiPutFail.Inc(1)
- return nil, nil, err
- }
- return key, func(ctx context.Context) error {
- err := waitContent(ctx)
- if err != nil {
- return err
- }
- return waitManifest(ctx)
- }, nil
-}
-
// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
diff --git a/swarm/api/api_test.go b/swarm/api/api_test.go
index eb896f32a..4a5f92362 100644
--- a/swarm/api/api_test.go
+++ b/swarm/api/api_test.go
@@ -19,6 +19,7 @@ package api
import (
"bytes"
"context"
+ crand "crypto/rand"
"errors"
"flag"
"fmt"
@@ -26,13 +27,16 @@ import (
"io/ioutil"
"math/big"
"os"
+ "strings"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/testutil"
)
func init() {
@@ -41,19 +45,21 @@ func init() {
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))))
}
-func testAPI(t *testing.T, f func(*API, bool)) {
- datadir, err := ioutil.TempDir("", "bzz-test")
- if err != nil {
- t.Fatalf("unable to create temp dir: %v", err)
- }
- defer os.RemoveAll(datadir)
- fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
- if err != nil {
- return
+func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
+ for _, v := range []bool{true, false} {
+ datadir, err := ioutil.TempDir("", "bzz-test")
+ if err != nil {
+ t.Fatalf("unable to create temp dir: %v", err)
+ }
+ defer os.RemoveAll(datadir)
+ tags := chunk.NewTags()
+ fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
+ if err != nil {
+ return
+ }
+ api := NewAPI(fileStore, nil, nil, nil, tags)
+ f(api, tags, v)
}
- api := NewAPI(fileStore, nil, nil, nil)
- f(api, false)
- f(api, true)
}
type testResponse struct {
@@ -61,6 +67,13 @@ type testResponse struct {
*Response
}
+type Response struct {
+ MimeType string
+ Status int
+ Size int64
+ Content string
+}
+
func checkResponse(t *testing.T, resp *testResponse, exp *Response) {
if resp.MimeType != exp.MimeType {
@@ -111,15 +124,14 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
}
reader.Seek(0, 0)
return &testResponse{reader, &Response{mimeType, status, size, string(s)}}
- // return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}}
}
func TestApiPut(t *testing.T) {
- testAPI(t, func(api *API, toEncrypt bool) {
+ testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
ctx := context.TODO()
- addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
+ addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -129,6 +141,40 @@ func TestApiPut(t *testing.T) {
}
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
+ tag := tags.All()[0]
+ testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
+ })
+}
+
+// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
+func TestApiTagLarge(t *testing.T) {
+ const contentLength = 4096 * 4095
+ testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
+ randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
+ tag, err := api.Tags.New("unnamed-tag", 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ctx := sctx.SetTag(context.Background(), tag.Uid)
+ key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = waitContent(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tag.DoneSplit(key)
+
+ if toEncrypt {
+ tag := tags.All()[0]
+ expect := int64(4095 + 64 + 1)
+ testutil.CheckTag(t, tag, expect, expect, 0, expect)
+ } else {
+ tag := tags.All()[0]
+ expect := int64(4095 + 32 + 1)
+ testutil.CheckTag(t, tag, expect, expect, 0, expect)
+ }
})
}
@@ -391,7 +437,7 @@ func TestDecryptOriginForbidden(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}
- api := NewAPI(nil, nil, nil, nil)
+ api := NewAPI(nil, nil, nil, nil, chunk.NewTags())
f := api.Decryptor(ctx, "")
err := f(me)
@@ -425,7 +471,7 @@ func TestDecryptOrigin(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}
- api := NewAPI(nil, nil, nil, nil)
+ api := NewAPI(nil, nil, nil, nil, chunk.NewTags())
f := api.Decryptor(ctx, "")
err := f(me)
@@ -500,3 +546,31 @@ func TestDetectContentType(t *testing.T) {
})
}
}
+
+// putString provides singleton manifest creation on top of api.API
+func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
+ r := strings.NewReader(content)
+ tag, err := a.Tags.New("unnamed-tag", 0)
+
+ log.Trace("created new tag", "uid", tag.Uid)
+
+ cCtx := sctx.SetTag(ctx, tag.Uid)
+ key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
+ if err != nil {
+ return nil, nil, err
+ }
+ manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
+ r = strings.NewReader(manifest)
+ key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
+ if err != nil {
+ return nil, nil, err
+ }
+ tag.DoneSplit(key)
+ return key, func(ctx context.Context) error {
+ err := waitContent(ctx)
+ if err != nil {
+ return err
+ }
+ return waitManifest(ctx)
+ }, nil
+}
diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go
index 5e293cca7..9ad0948f4 100644
--- a/swarm/api/client/client.go
+++ b/swarm/api/client/client.go
@@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
+ swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/pborman/uuid"
@@ -75,6 +76,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err
return "", err
}
req.ContentLength = size
+ req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))
+
res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
@@ -111,6 +114,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) {
type File struct {
io.ReadCloser
api.ManifestEntry
+ Tag string
}
// Open opens a local file which can then be passed to client.Upload to upload
@@ -139,6 +143,7 @@ func Open(path string) (*File, error) {
Size: stat.Size(),
ModTime: stat.ModTime(),
},
+ Tag: filepath.Base(path),
}, nil
}
@@ -422,6 +427,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro
// Uploader uploads files to swarm using a provided UploadFn
type Uploader interface {
Upload(UploadFn) error
+ Tag() string
}
type UploaderFunc func(UploadFn) error
@@ -430,12 +436,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error {
return u(upload)
}
+func (u UploaderFunc) Tag() string {
+ return fmt.Sprintf("multipart_upload_%d", time.Now().Unix())
+}
+
+// DirectoryUploader implements Uploader
+var _ Uploader = &DirectoryUploader{}
+
// DirectoryUploader uploads all files in a directory, optionally uploading
// a file to the default path
type DirectoryUploader struct {
Dir string
}
+func (d *DirectoryUploader) Tag() string {
+ return filepath.Base(d.Dir)
+}
+
// Upload performs the upload of the directory and default path
func (d *DirectoryUploader) Upload(upload UploadFn) error {
return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error {
@@ -458,11 +475,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error {
})
}
+var _ Uploader = &FileUploader{}
+
// FileUploader uploads a single file
type FileUploader struct {
File *File
}
+func (f *FileUploader) Tag() string {
+ return f.File.Tag
+}
+
// Upload performs the upload of the file
func (f *FileUploader) Upload(upload UploadFn) error {
return upload(f.File)
@@ -509,6 +532,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
req.URL.RawQuery = q.Encode()
}
+ tag := uploader.Tag()
+ if tag == "" {
+ tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix())
+ }
+ log.Trace("setting upload tag", "tag", tag)
+
+ req.Header.Set(swarmhttp.SwarmTagHeaderName, tag)
+
// use 'Expect: 100-continue' so we don't send the request body if
// the server refuses the request
req.Header.Set("Expect", "100-continue")
@@ -574,6 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error)
mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
+ req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))
// define an UploadFn which adds files to the multipart form
uploadFn := func(file *File) error {
diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go
index 9c9bde5d6..92489849c 100644
--- a/swarm/api/client/client_test.go
+++ b/swarm/api/client/client_test.go
@@ -25,16 +25,14 @@ import (
"sort"
"testing"
- "github.com/ethereum/go-ethereum/swarm/testutil"
-
- "github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/swarm/api"
swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
+ "github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
+ "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
+ "github.com/ethereum/go-ethereum/swarm/testutil"
)
func serverFunc(api *api.API) swarmhttp.TestServer {
@@ -68,6 +66,10 @@ func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) {
t.Fatal(err)
}
+ // check the tag was created successfully
+ tag := srv.Tags.All()[0]
+ testutil.CheckTag(t, tag, 1, 1, 0, 1)
+
// check we can download the same data
res, isEncrypted, err := client.DownloadRaw(hash)
if err != nil {
@@ -209,6 +211,10 @@ func TestClientUploadDownloadDirectory(t *testing.T) {
t.Fatalf("error uploading directory: %s", err)
}
+ // check the tag was created successfully
+ tag := srv.Tags.All()[0]
+ testutil.CheckTag(t, tag, 9, 9, 0, 9)
+
// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
file, err := client.Download(hash, path)
@@ -323,6 +329,7 @@ func TestClientMultipartUpload(t *testing.T) {
defer srv.Close()
// define an uploader which uploads testDirFiles with some data
+ // note: this test should result in SEEN chunks. assert accordingly
data := []byte("some-data")
uploader := UploaderFunc(func(upload UploadFn) error {
for _, name := range testDirFiles {
@@ -348,6 +355,10 @@ func TestClientMultipartUpload(t *testing.T) {
t.Fatal(err)
}
+ // check the tag was created successfully
+ tag := srv.Tags.All()[0]
+ testutil.CheckTag(t, tag, 9, 9, 7, 9)
+
// check we can download the individual files
checkDownloadFile := func(path string) {
file, err := client.Download(hash, path)
diff --git a/swarm/api/filesystem_test.go b/swarm/api/filesystem_test.go
index 02f5bff65..b8f37fdd5 100644
--- a/swarm/api/filesystem_test.go
+++ b/swarm/api/filesystem_test.go
@@ -25,13 +25,14 @@ import (
"testing"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
)
var testDownloadDir, _ = ioutil.TempDir(os.TempDir(), "bzz-test")
func testFileSystem(t *testing.T, f func(*FileSystem, bool)) {
- testAPI(t, func(api *API, toEncrypt bool) {
+ testAPI(t, func(api *API, _ *chunk.Tags, toEncrypt bool) {
f(NewFileSystem(api), toEncrypt)
})
}
diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go
index 320da3046..e6e263f4c 100644
--- a/swarm/api/http/middleware.go
+++ b/swarm/api/http/middleware.go
@@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -86,6 +87,54 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler {
})
}
+// InitUploadTag creates a new tag for an upload to the local HTTP proxy
+// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used
+// when the Content-Length header is set, an ETA on chunking will be available since the
+// number of chunks to be split is known in advance (not including enclosing manifest chunks)
+// the tag can later be accessed using the appropriate identifier in the request context
+func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var (
+ tagName string
+ err error
+ estimatedTotal int64 = 0
+ contentType = r.Header.Get("Content-Type")
+ headerTag = r.Header.Get(SwarmTagHeaderName)
+ )
+ if headerTag != "" {
+ tagName = headerTag
+ log.Trace("got tag name from http header", "tagName", tagName)
+ } else {
+ tagName = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
+ }
+
+ if !strings.Contains(contentType, "multipart") && r.ContentLength > 0 {
+ log.Trace("calculating tag size", "contentType", contentType, "contentLength", r.ContentLength)
+ uri := GetURI(r.Context())
+ if uri != nil {
+ log.Debug("got uri from context")
+ if uri.Addr == "encrypt" {
+ estimatedTotal = calculateNumberOfChunks(r.ContentLength, true)
+ } else {
+ estimatedTotal = calculateNumberOfChunks(r.ContentLength, false)
+ }
+ }
+ }
+
+ log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)
+
+ t, err := tags.New(tagName, estimatedTotal)
+ if err != nil {
+ log.Error("error creating tag", "err", err, "tagName", tagName)
+ }
+
+ log.Trace("setting tag id to context", "uid", t.Uid)
+ ctx := sctx.SetTag(r.Context(), t.Uid)
+
+ h.ServeHTTP(w, r.WithContext(ctx))
+ })
+}
+
func InstrumentOpenTracing(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
uri := GetURI(r.Context())
diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go
index d4e81d7f6..c851a3992 100644
--- a/swarm/api/http/response.go
+++ b/swarm/api/http/response.go
@@ -79,7 +79,7 @@ func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s
}
func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) {
- log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code)
+ log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code, "msg", msg)
respondTemplate(w, r, "error", msg, code)
}
diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go
index 3c6735a73..a336bd82f 100644
--- a/swarm/api/http/server.go
+++ b/swarm/api/http/server.go
@@ -26,6 +26,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "math"
"mime"
"mime/multipart"
"net/http"
@@ -38,7 +39,9 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/rs/cors"
@@ -60,6 +63,8 @@ var (
getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil)
)
+const SwarmTagHeaderName = "x-swarm-tag"
+
type methodHandler map[string]http.Handler
func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
@@ -94,6 +99,12 @@ func NewServer(api *api.API, corsString string) *Server {
InstrumentOpenTracing,
}
+ tagAdapter := Adapter(func(h http.Handler) http.Handler {
+ return InitUploadTag(h, api.Tags)
+ })
+
+ defaultPostMiddlewares := append(defaultMiddlewares, tagAdapter)
+
mux := http.NewServeMux()
mux.Handle("/bzz:/", methodHandler{
"GET": Adapt(
@@ -102,7 +113,7 @@ func NewServer(api *api.API, corsString string) *Server {
),
"POST": Adapt(
http.HandlerFunc(server.HandlePostFiles),
- defaultMiddlewares...,
+ defaultPostMiddlewares...,
),
"DELETE": Adapt(
http.HandlerFunc(server.HandleDelete),
@@ -116,7 +127,7 @@ func NewServer(api *api.API, corsString string) *Server {
),
"POST": Adapt(
http.HandlerFunc(server.HandlePostRaw),
- defaultMiddlewares...,
+ defaultPostMiddlewares...,
),
})
mux.Handle("/bzz-immutable:/", methodHandler{
@@ -230,6 +241,12 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
ruid := GetRUID(r.Context())
log.Debug("handle.post.raw", "ruid", ruid)
+ tagUid := sctx.GetTag(r.Context())
+ tag, err := s.api.Tags.Get(tagUid)
+ if err != nil {
+ log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
+ }
+
postRawCount.Inc(1)
toEncrypt := false
@@ -256,13 +273,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
return
}
- addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
+ addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
if err != nil {
postRawFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
+ wait(r.Context())
+ tag.DoneSplit(addr)
+
log.Debug("stored content", "ruid", ruid, "key", addr)
w.Header().Set("Content-Type", "text/plain")
@@ -311,7 +331,6 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
}
-
newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
switch contentType {
case "application/x-tar":
@@ -334,6 +353,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
return
}
+ tagUid := sctx.GetTag(r.Context())
+ tag, err := s.api.Tags.Get(tagUid)
+ if err != nil {
+ log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
+ }
+
+ log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total())
+ tag.DoneSplit(newAddr)
+
log.Debug("stored content", "ruid", ruid, "key", newAddr)
w.Header().Set("Content-Type", "text/plain")
@@ -342,7 +370,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (storage.Address, error) {
- log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()))
+ log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context()))
defaultPath := r.URL.Query().Get("defaultpath")
@@ -837,6 +865,28 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize))
}
+// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length
+func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
+ if contentLength < 4096 {
+ return 1
+ }
+ branchingFactor := 128
+ if isEncrypted {
+ branchingFactor = 64
+ }
+
+ dataChunks := math.Ceil(float64(contentLength) / float64(4096))
+ totalChunks := dataChunks
+ intermediate := dataChunks / float64(branchingFactor)
+
+ for intermediate > 1 {
+ totalChunks += math.Ceil(intermediate)
+ intermediate = intermediate / float64(branchingFactor)
+ }
+
+ return int64(totalChunks) + 1
+}
+
// The size of buffer used for bufio.Reader on LazyChunkReader passed to
// http.ServeContent in HandleGetFile.
// Warning: This value influences the number of chunk requests and chunker join goroutines
diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go
index e82762ce0..1de41d18d 100644
--- a/swarm/api/http/server_test.go
+++ b/swarm/api/http/server_test.go
@@ -44,7 +44,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/api"
- swarm "github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
@@ -755,6 +754,7 @@ func testBzzTar(encrypted bool, t *testing.T) {
t.Fatal(err)
}
req.Header.Add("Content-Type", "application/x-tar")
+ req.Header.Add(SwarmTagHeaderName, "test-upload")
client := &http.Client{}
resp2, err := client.Do(req)
if err != nil {
@@ -763,6 +763,11 @@ func testBzzTar(encrypted bool, t *testing.T) {
if resp2.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp2.Status)
}
+
+ // check that the tag was written correctly
+ tag := srv.Tags.All()[0]
+ testutil.CheckTag(t, tag, 4, 4, 0, 4)
+
swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
if err != nil {
@@ -834,6 +839,75 @@ func testBzzTar(encrypted bool, t *testing.T) {
t.Fatalf("file %s did not pass content assertion", hdr.Name)
}
}
+
+ // now check the tags endpoint
+}
+
+// TestBzzCorrectTagEstimate checks that the HTTP middleware sets the total number of chunks
+// in the tag according to an estimate from the HTTP request Content-Length header divided
+// by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore
+// concurrency was introduced to slow down the HTTP request
+func TestBzzCorrectTagEstimate(t *testing.T) {
+ srv := NewTestSwarmServer(t, serverFunc, nil)
+ defer srv.Close()
+
+ for _, v := range []struct {
+ toEncrypt bool
+ expChunks int64
+ }{
+ {toEncrypt: false, expChunks: 248},
+ {toEncrypt: true, expChunks: 250},
+ } {
+ pr, pw := io.Pipe()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ addr := ""
+ if v.toEncrypt {
+ addr = "encrypt"
+ }
+ req, err := http.NewRequest("POST", srv.URL+"/bzz:/"+addr, pr)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ req = req.WithContext(ctx)
+ req.ContentLength = 1000000
+ req.Header.Add(SwarmTagHeaderName, "1000000")
+
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(1 * time.Millisecond):
+ _, err := pw.Write([]byte{0})
+ if err != nil {
+ t.Error(err)
+ }
+ }
+ }
+ }()
+ go func() {
+ transport := http.DefaultTransport
+ _, err := transport.RoundTrip(req)
+ if err != nil {
+ t.Error(err)
+ }
+ }()
+ done := false
+ for !done {
+ switch len(srv.Tags.All()) {
+ case 0:
+ <-time.After(10 * time.Millisecond)
+ case 1:
+ tag := srv.Tags.All()[0]
+ testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks)
+ srv.Tags.Delete(tag.Uid)
+ done = true
+ }
+ }
+ }
}
// TestBzzRootRedirect tests that getting the root path of a manifest without
@@ -851,19 +925,11 @@ func testBzzRootRedirect(toEncrypt bool, t *testing.T) {
defer srv.Close()
// create a manifest with some data at the root path
- client := swarm.NewClient(srv.URL)
data := []byte("data")
- file := &swarm.File{
- ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
- ManifestEntry: api.ManifestEntry{
- Path: "",
- ContentType: "text/plain",
- Size: int64(len(data)),
- },
- }
- hash, err := client.Upload(file, "", toEncrypt)
- if err != nil {
- t.Fatal(err)
+ headers := map[string]string{"Content-Type": "text/plain"}
+ res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader(data), headers, false, t)
+ if res.StatusCode != http.StatusOK {
+ t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK)
}
// define a CheckRedirect hook which ensures there is only a single
@@ -1046,21 +1112,10 @@ func TestGet(t *testing.T) {
func TestModify(t *testing.T) {
srv := NewTestSwarmServer(t, serverFunc, nil)
defer srv.Close()
-
- swarmClient := swarm.NewClient(srv.URL)
- data := []byte("data")
- file := &swarm.File{
- ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
- ManifestEntry: api.ManifestEntry{
- Path: "",
- ContentType: "text/plain",
- Size: int64(len(data)),
- },
- }
-
- hash, err := swarmClient.Upload(file, "", false)
- if err != nil {
- t.Fatal(err)
+ headers := map[string]string{"Content-Type": "text/plain"}
+ res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader([]byte("data")), headers, false, t)
+ if res.StatusCode != http.StatusOK {
+ t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK)
}
for _, testCase := range []struct {
@@ -1283,6 +1338,46 @@ func TestBzzGetFileWithResolver(t *testing.T) {
}
}
+// TestCalculateNumberOfChunks is a unit test for the chunk-number-according-to-content-length
+// calculation
+func TestCalculateNumberOfChunks(t *testing.T) {
+
+ //test cases:
+ for _, tc := range []struct{ len, chunks int64 }{
+ {len: 1000, chunks: 1},
+ {len: 5000, chunks: 3},
+ {len: 10000, chunks: 4},
+ {len: 100000, chunks: 26},
+ {len: 1000000, chunks: 248},
+ {len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1},
+ } {
+ res := calculateNumberOfChunks(tc.len, false)
+ if res != tc.chunks {
+ t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
+ }
+ }
+}
+
+// TestCalculateNumberOfChunksEncrypted is a unit test for the chunk-number-according-to-content-length
+// calculation with encryption (branching factor=64)
+func TestCalculateNumberOfChunksEncrypted(t *testing.T) {
+
+ //test cases:
+ for _, tc := range []struct{ len, chunks int64 }{
+ {len: 1000, chunks: 1},
+ {len: 5000, chunks: 3},
+ {len: 10000, chunks: 4},
+ {len: 100000, chunks: 26},
+ {len: 1000000, chunks: 245 + 4 + 1},
+ {len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1},
+ } {
+ res := calculateNumberOfChunks(tc.len, true)
+ if res != tc.chunks {
+ t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
+ }
+ }
+}
+
// testResolver implements the Resolver interface and either returns the given
// hash if it is set, or returns a "name not found" error
type testResolveValidator struct {
@@ -1308,6 +1403,7 @@ func (t *testResolveValidator) Resolve(addr string) (common.Hash, error) {
func (t *testResolveValidator) Owner(node [32]byte) (addr common.Address, err error) {
return
}
+
func (t *testResolveValidator) HeaderByNumber(context.Context, *big.Int) (header *types.Header, err error) {
return
}
diff --git a/swarm/api/http/test_server.go b/swarm/api/http/test_server.go
index 928a6e972..fbb3366e2 100644
--- a/swarm/api/http/test_server.go
+++ b/swarm/api/http/test_server.go
@@ -24,6 +24,7 @@ import (
"testing"
"github.com/ethereum/go-ethereum/swarm/api"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
@@ -44,7 +45,9 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
t.Fatal(err)
}
- fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
+ tags := chunk.NewTags()
+ fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams(), tags)
+
// Swarm feeds test setup
feedsDir, err := ioutil.TempDir("", "swarm-feeds-test")
if err != nil {
@@ -56,12 +59,13 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
t.Fatal(err)
}
- swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil)
+ swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil, tags)
apiServer := httptest.NewServer(serverFunc(swarmApi))
tss := &TestSwarmServer{
Server: apiServer,
FileStore: fileStore,
+ Tags: tags,
dir: swarmDir,
Hasher: storage.MakeHashFunc(storage.DefaultHash)(),
cleanup: func() {
@@ -81,6 +85,7 @@ type TestSwarmServer struct {
*httptest.Server
Hasher storage.SwarmHash
FileStore *storage.FileStore
+ Tags *chunk.Tags
dir string
cleanup func()
CurrentTime uint64
diff --git a/swarm/api/manifest_test.go b/swarm/api/manifest_test.go
index 1c8e53c43..c193ebcb4 100644
--- a/swarm/api/manifest_test.go
+++ b/swarm/api/manifest_test.go
@@ -25,6 +25,7 @@ import (
"strings"
"testing"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@@ -42,7 +43,7 @@ func manifest(paths ...string) (manifestReader storage.LazySectionReader) {
func testGetEntry(t *testing.T, path, match string, multiple bool, paths ...string) *manifestTrie {
quitC := make(chan bool)
- fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags())
ref := make([]byte, fileStore.HashSize())
trie, err := readManifest(manifest(paths...), ref, fileStore, false, quitC, NOOPDecrypt)
if err != nil {
@@ -99,7 +100,7 @@ func TestGetEntry(t *testing.T) {
func TestExactMatch(t *testing.T) {
quitC := make(chan bool)
mf := manifest("shouldBeExactMatch.css", "shouldBeExactMatch.css.map")
- fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags())
ref := make([]byte, fileStore.HashSize())
trie, err := readManifest(mf, ref, fileStore, false, quitC, nil)
if err != nil {
@@ -132,7 +133,7 @@ func TestAddFileWithManifestPath(t *testing.T) {
reader := &storage.LazyTestSectionReader{
SectionReader: io.NewSectionReader(bytes.NewReader(manifest), 0, int64(len(manifest))),
}
- fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags())
ref := make([]byte, fileStore.HashSize())
trie, err := readManifest(reader, ref, fileStore, false, nil, NOOPDecrypt)
if err != nil {
diff --git a/swarm/api/storage.go b/swarm/api/storage.go
deleted file mode 100644
index 254375b77..000000000
--- a/swarm/api/storage.go
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package api
-
-import (
- "context"
- "path"
-
- "github.com/ethereum/go-ethereum/swarm/storage"
-)
-
-type Response struct {
- MimeType string
- Status int
- Size int64
- // Content []byte
- Content string
-}
-
-// implements a service
-//
-// DEPRECATED: Use the HTTP API instead
-type Storage struct {
- api *API
-}
-
-func NewStorage(api *API) *Storage {
- return &Storage{api}
-}
-
-// Put uploads the content to the swarm with a simple manifest speficying
-// its content type
-//
-// DEPRECATED: Use the HTTP API instead
-func (s *Storage) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (storage.Address, func(context.Context) error, error) {
- return s.api.Put(ctx, content, contentType, toEncrypt)
-}
-
-// Get retrieves the content from bzzpath and reads the response in full
-// It returns the Response object, which serialises containing the
-// response body as the value of the Content field
-// NOTE: if error is non-nil, sResponse may still have partial content
-// the actual size of which is given in len(resp.Content), while the expected
-// size is resp.Size
-//
-// DEPRECATED: Use the HTTP API instead
-func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) {
- uri, err := Parse(path.Join("bzz:/", bzzpath))
- if err != nil {
- return nil, err
- }
- addr, err := s.api.Resolve(ctx, uri.Addr)
- if err != nil {
- return nil, err
- }
- reader, mimeType, status, _, err := s.api.Get(ctx, nil, addr, uri.Path)
- if err != nil {
- return nil, err
- }
- quitC := make(chan bool)
- expsize, err := reader.Size(ctx, quitC)
- if err != nil {
- return nil, err
- }
- body := make([]byte, expsize)
- size, err := reader.Read(body)
- if int64(size) == expsize {
- err = nil
- }
- return &Response{mimeType, status, expsize, string(body[:size])}, err
-}
diff --git a/swarm/api/storage_test.go b/swarm/api/storage_test.go
deleted file mode 100644
index ef96972b6..000000000
--- a/swarm/api/storage_test.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package api
-
-import (
- "context"
- "testing"
-)
-
-func testStorage(t *testing.T, f func(*Storage, bool)) {
- testAPI(t, func(api *API, toEncrypt bool) {
- f(NewStorage(api), toEncrypt)
- })
-}
-
-func TestStoragePutGet(t *testing.T) {
- testStorage(t, func(api *Storage, toEncrypt bool) {
- content := "hello"
- exp := expResponse(content, "text/plain", 0)
- // exp := expResponse([]byte(content), "text/plain", 0)
- ctx := context.TODO()
- bzzkey, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- err = wait(ctx)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- bzzhash := bzzkey.Hex()
- // to check put against the API#Get
- resp0 := testGet(t, api.api, bzzhash, "")
- checkResponse(t, resp0, exp)
-
- // check storage#Get
- resp, err := api.Get(context.TODO(), bzzhash)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- checkResponse(t, &testResponse{nil, resp}, exp)
- })
-}