diff options
author | Elad <theman@elad.im> | 2019-05-06 02:34:22 +0800 |
---|---|---|
committer | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-05-10 18:26:52 +0800 |
commit | ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1 (patch) | |
tree | 396077f7f33307321110f17bc3a19abe115e5d47 /swarm/api | |
parent | 3030893a21b17a0e90ddd0047d0f310fee8335a0 (diff) | |
download | go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.gz go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.bz2 go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.lz go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.xz go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.zst go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.zip |
swarm: push tags integration - request flow
swarm/api: integrate tags to count chunks being split and stored
swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly
swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package
Diffstat (limited to 'swarm/api')
-rw-r--r-- | swarm/api/api.go | 32 | ||||
-rw-r--r-- | swarm/api/api_test.go | 108 | ||||
-rw-r--r-- | swarm/api/client/client.go | 32 | ||||
-rw-r--r-- | swarm/api/client/client_test.go | 21 | ||||
-rw-r--r-- | swarm/api/filesystem_test.go | 3 | ||||
-rw-r--r-- | swarm/api/http/middleware.go | 49 | ||||
-rw-r--r-- | swarm/api/http/response.go | 2 | ||||
-rw-r--r-- | swarm/api/http/server.go | 60 | ||||
-rw-r--r-- | swarm/api/http/server_test.go | 152 | ||||
-rw-r--r-- | swarm/api/http/test_server.go | 9 | ||||
-rw-r--r-- | swarm/api/manifest_test.go | 7 | ||||
-rw-r--r-- | swarm/api/storage.go | 85 | ||||
-rw-r--r-- | swarm/api/storage_test.go | 56 |
13 files changed, 385 insertions, 231 deletions
diff --git a/swarm/api/api.go b/swarm/api/api.go index 86c111923..96fb86e1c 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/contracts/ens" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" @@ -53,8 +54,6 @@ import ( var ( apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil) apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil) - apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil) - apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil) apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil) apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil) apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil) @@ -188,15 +187,17 @@ type API struct { feed *feed.Handler fileStore *storage.FileStore dns Resolver + Tags *chunk.Tags Decryptor func(context.Context, string) DecryptFunc } // NewAPI the api constructor initialises a new API instance. -func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) { +func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) { self = &API{ fileStore: fileStore, dns: dns, feed: feedHandler, + Tags: tags, Decryptor: func(ctx context.Context, credentials string) DecryptFunc { return self.doDecrypt(ctx, credentials, pk) }, @@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto return addr, nil } -// Put provides singleton manifest creation on top of FileStore store -func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { - apiPutCount.Inc(1) - r := strings.NewReader(content) - key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt) - if err != nil { - apiPutFail.Inc(1) - return nil, nil, err - } - manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) - r = strings.NewReader(manifest) - key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt) - if err != nil { - apiPutFail.Inc(1) - return nil, nil, err - } - return key, func(ctx context.Context) error { - err := waitContent(ctx) - if err != nil { - return err - } - return waitManifest(ctx) - }, nil -} - // Get uses iterative manifest retrieval and prefix matching // to resolve basePath to content using FileStore retrieve // it returns a section reader, mimeType, status, the key of the actual content and an error diff --git a/swarm/api/api_test.go b/swarm/api/api_test.go index eb896f32a..4a5f92362 100644 --- a/swarm/api/api_test.go +++ b/swarm/api/api_test.go @@ -19,6 +19,7 @@ package api import ( "bytes" "context" + crand "crypto/rand" "errors" "flag" "fmt" @@ -26,13 +27,16 @@ import ( "io/ioutil" "math/big" "os" + "strings" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/testutil" ) func init() { @@ -41,19 +45,21 @@ func init() { log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true))))) } -func testAPI(t *testing.T, f func(*API, bool)) { - datadir, err := ioutil.TempDir("", "bzz-test") - if err != nil { - t.Fatalf("unable to create temp dir: %v", err) - } - defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) - if err != nil { - return +func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) { + for _, v := range []bool{true, false} { + datadir, err := ioutil.TempDir("", "bzz-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + tags := chunk.NewTags() + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags) + if err != nil { + return + } + api := NewAPI(fileStore, nil, nil, nil, tags) + f(api, tags, v) } - api := NewAPI(fileStore, nil, nil, nil) - f(api, false) - f(api, true) } type testResponse struct { @@ -61,6 +67,13 @@ type testResponse struct { *Response } +type Response struct { + MimeType string + Status int + Size int64 + Content string +} + func checkResponse(t *testing.T, resp *testResponse, exp *Response) { if resp.MimeType != exp.MimeType { @@ -111,15 +124,14 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse { } reader.Seek(0, 0) return &testResponse{reader, &Response{mimeType, status, size, string(s)}} - // return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}} } func TestApiPut(t *testing.T) { - testAPI(t, func(api *API, toEncrypt bool) { + testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) { content := "hello" exp := expResponse(content, "text/plain", 0) ctx := context.TODO() - addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) + addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -129,6 +141,40 @@ func TestApiPut(t *testing.T) { } resp := testGet(t, api, addr.Hex(), "") checkResponse(t, resp, exp) + tag := tags.All()[0] + testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest + }) +} + +// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input +func TestApiTagLarge(t *testing.T) { + const contentLength = 4096 * 4095 + testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) { + randomContentReader := io.LimitReader(crand.Reader, int64(contentLength)) + tag, err := api.Tags.New("unnamed-tag", 0) + if err != nil { + t.Fatal(err) + } + ctx := sctx.SetTag(context.Background(), tag.Uid) + key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt) + if err != nil { + t.Fatal(err) + } + err = waitContent(ctx) + if err != nil { + t.Fatal(err) + } + tag.DoneSplit(key) + + if toEncrypt { + tag := tags.All()[0] + expect := int64(4095 + 64 + 1) + testutil.CheckTag(t, tag, expect, expect, 0, expect) + } else { + tag := tags.All()[0] + expect := int64(4095 + 32 + 1) + testutil.CheckTag(t, tag, expect, expect, 0, expect) + } }) } @@ -391,7 +437,7 @@ func TestDecryptOriginForbidden(t *testing.T) { Access: &AccessEntry{Type: AccessTypePass}, } - api := NewAPI(nil, nil, nil, nil) + api := NewAPI(nil, nil, nil, nil, chunk.NewTags()) f := api.Decryptor(ctx, "") err := f(me) @@ -425,7 +471,7 @@ func TestDecryptOrigin(t *testing.T) { Access: &AccessEntry{Type: AccessTypePass}, } - api := NewAPI(nil, nil, nil, nil) + api := NewAPI(nil, nil, nil, nil, chunk.NewTags()) f := api.Decryptor(ctx, "") err := f(me) @@ -500,3 +546,31 @@ func TestDetectContentType(t *testing.T) { }) } } + +// putString provides singleton manifest creation on top of api.API +func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { + r := strings.NewReader(content) + tag, err := a.Tags.New("unnamed-tag", 0) + + log.Trace("created new tag", "uid", tag.Uid) + + cCtx := sctx.SetTag(ctx, tag.Uid) + key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt) + if err != nil { + return nil, nil, err + } + manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) + r = strings.NewReader(manifest) + key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt) + if err != nil { + return nil, nil, err + } + tag.DoneSplit(key) + return key, func(ctx context.Context) error { + err := waitContent(ctx) + if err != nil { + return err + } + return waitManifest(ctx) + }, nil +} diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index 5e293cca7..9ad0948f4 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/pborman/uuid" @@ -75,6 +76,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err return "", err } req.ContentLength = size + req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix())) + res, err := http.DefaultClient.Do(req) if err != nil { return "", err @@ -111,6 +114,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) { type File struct { io.ReadCloser api.ManifestEntry + Tag string } // Open opens a local file which can then be passed to client.Upload to upload @@ -139,6 +143,7 @@ func Open(path string) (*File, error) { Size: stat.Size(), ModTime: stat.ModTime(), }, + Tag: filepath.Base(path), }, nil } @@ -422,6 +427,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro // Uploader uploads files to swarm using a provided UploadFn type Uploader interface { Upload(UploadFn) error + Tag() string } type UploaderFunc func(UploadFn) error @@ -430,12 +436,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error { return u(upload) } +func (u UploaderFunc) Tag() string { + return fmt.Sprintf("multipart_upload_%d", time.Now().Unix()) +} + +// DirectoryUploader implements Uploader +var _ Uploader = &DirectoryUploader{} + // DirectoryUploader uploads all files in a directory, optionally uploading // a file to the default path type DirectoryUploader struct { Dir string } +func (d *DirectoryUploader) Tag() string { + return filepath.Base(d.Dir) +} + // Upload performs the upload of the directory and default path func (d *DirectoryUploader) Upload(upload UploadFn) error { return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error { @@ -458,11 +475,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error { }) } +var _ Uploader = &FileUploader{} + // FileUploader uploads a single file type FileUploader struct { File *File } +func (f *FileUploader) Tag() string { + return f.File.Tag +} + // Upload performs the upload of the file func (f *FileUploader) Upload(upload UploadFn) error { return upload(f.File) @@ -509,6 +532,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t req.URL.RawQuery = q.Encode() } + tag := uploader.Tag() + if tag == "" { + tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix()) + } + log.Trace("setting upload tag", "tag", tag) + + req.Header.Set(swarmhttp.SwarmTagHeaderName, tag) + // use 'Expect: 100-continue' so we don't send the request body if // the server refuses the request req.Header.Set("Expect", "100-continue") @@ -574,6 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error) mw := multipart.NewWriter(reqW) req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary())) + req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix())) // define an UploadFn which adds files to the multipart form uploadFn := func(file *File) error { diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go index 9c9bde5d6..92489849c 100644 --- a/swarm/api/client/client_test.go +++ b/swarm/api/client/client_test.go @@ -25,16 +25,14 @@ import ( "sort" "testing" - "github.com/ethereum/go-ethereum/swarm/testutil" - - "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/swarm/api" swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" + "github.com/ethereum/go-ethereum/swarm/testutil" ) func serverFunc(api *api.API) swarmhttp.TestServer { @@ -68,6 +66,10 @@ func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) { t.Fatal(err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 1, 1, 0, 1) + // check we can download the same data res, isEncrypted, err := client.DownloadRaw(hash) if err != nil { @@ -209,6 +211,10 @@ func TestClientUploadDownloadDirectory(t *testing.T) { t.Fatalf("error uploading directory: %s", err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 9, 9, 0, 9) + // check we can download the individual files checkDownloadFile := func(path string, expected []byte) { file, err := client.Download(hash, path) @@ -323,6 +329,7 @@ func TestClientMultipartUpload(t *testing.T) { defer srv.Close() // define an uploader which uploads testDirFiles with some data + // note: this test should result in SEEN chunks. assert accordingly data := []byte("some-data") uploader := UploaderFunc(func(upload UploadFn) error { for _, name := range testDirFiles { @@ -348,6 +355,10 @@ func TestClientMultipartUpload(t *testing.T) { t.Fatal(err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 9, 9, 7, 9) + // check we can download the individual files checkDownloadFile := func(path string) { file, err := client.Download(hash, path) diff --git a/swarm/api/filesystem_test.go b/swarm/api/filesystem_test.go index 02f5bff65..b8f37fdd5 100644 --- a/swarm/api/filesystem_test.go +++ b/swarm/api/filesystem_test.go @@ -25,13 +25,14 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) var testDownloadDir, _ = ioutil.TempDir(os.TempDir(), "bzz-test") func testFileSystem(t *testing.T, f func(*FileSystem, bool)) { - testAPI(t, func(api *API, toEncrypt bool) { + testAPI(t, func(api *API, _ *chunk.Tags, toEncrypt bool) { f(NewFileSystem(api), toEncrypt) }) } diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 320da3046..e6e263f4c 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -86,6 +87,54 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler { }) } +// InitUploadTag creates a new tag for an upload to the local HTTP proxy +// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used +// when the Content-Length header is set, an ETA on chunking will be available since the +// number of chunks to be split is known in advance (not including enclosing manifest chunks) +// the tag can later be accessed using the appropriate identifier in the request context +func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var ( + tagName string + err error + estimatedTotal int64 = 0 + contentType = r.Header.Get("Content-Type") + headerTag = r.Header.Get(SwarmTagHeaderName) + ) + if headerTag != "" { + tagName = headerTag + log.Trace("got tag name from http header", "tagName", tagName) + } else { + tagName = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix()) + } + + if !strings.Contains(contentType, "multipart") && r.ContentLength > 0 { + log.Trace("calculating tag size", "contentType", contentType, "contentLength", r.ContentLength) + uri := GetURI(r.Context()) + if uri != nil { + log.Debug("got uri from context") + if uri.Addr == "encrypt" { + estimatedTotal = calculateNumberOfChunks(r.ContentLength, true) + } else { + estimatedTotal = calculateNumberOfChunks(r.ContentLength, false) + } + } + } + + log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal) + + t, err := tags.New(tagName, estimatedTotal) + if err != nil { + log.Error("error creating tag", "err", err, "tagName", tagName) + } + + log.Trace("setting tag id to context", "uid", t.Uid) + ctx := sctx.SetTag(r.Context(), t.Uid) + + h.ServeHTTP(w, r.WithContext(ctx)) + }) +} + func InstrumentOpenTracing(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { uri := GetURI(r.Context()) diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go index d4e81d7f6..c851a3992 100644 --- a/swarm/api/http/response.go +++ b/swarm/api/http/response.go @@ -79,7 +79,7 @@ func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s } func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) { - log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) + log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code, "msg", msg) respondTemplate(w, r, "error", msg, code) } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index 3c6735a73..a336bd82f 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "mime" "mime/multipart" "net/http" @@ -38,7 +39,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/rs/cors" @@ -60,6 +63,8 @@ var ( getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil) ) +const SwarmTagHeaderName = "x-swarm-tag" + type methodHandler map[string]http.Handler func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { @@ -94,6 +99,12 @@ func NewServer(api *api.API, corsString string) *Server { InstrumentOpenTracing, } + tagAdapter := Adapter(func(h http.Handler) http.Handler { + return InitUploadTag(h, api.Tags) + }) + + defaultPostMiddlewares := append(defaultMiddlewares, tagAdapter) + mux := http.NewServeMux() mux.Handle("/bzz:/", methodHandler{ "GET": Adapt( @@ -102,7 +113,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostFiles), - defaultMiddlewares..., + defaultPostMiddlewares..., ), "DELETE": Adapt( http.HandlerFunc(server.HandleDelete), @@ -116,7 +127,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostRaw), - defaultMiddlewares..., + defaultPostMiddlewares..., ), }) mux.Handle("/bzz-immutable:/", methodHandler{ @@ -230,6 +241,12 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { ruid := GetRUID(r.Context()) log.Debug("handle.post.raw", "ruid", ruid) + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + postRawCount.Inc(1) toEncrypt := false @@ -256,13 +273,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { return } - addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) + addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) respondError(w, r, err.Error(), http.StatusInternalServerError) return } + wait(r.Context()) + tag.DoneSplit(addr) + log.Debug("stored content", "ruid", ruid, "key", addr) w.Header().Set("Content-Type", "text/plain") @@ -311,7 +331,6 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } log.Debug("new manifest", "ruid", ruid, "key", addr) } - newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error { switch contentType { case "application/x-tar": @@ -334,6 +353,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { return } + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + + log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total()) + tag.DoneSplit(newAddr) + log.Debug("stored content", "ruid", ruid, "key", newAddr) w.Header().Set("Content-Type", "text/plain") @@ -342,7 +370,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (storage.Address, error) { - log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context())) + log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context())) defaultPath := r.URL.Query().Get("defaultpath") @@ -837,6 +865,28 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize)) } +// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length +func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 { + if contentLength < 4096 { + return 1 + } + branchingFactor := 128 + if isEncrypted { + branchingFactor = 64 + } + + dataChunks := math.Ceil(float64(contentLength) / float64(4096)) + totalChunks := dataChunks + intermediate := dataChunks / float64(branchingFactor) + + for intermediate > 1 { + totalChunks += math.Ceil(intermediate) + intermediate = intermediate / float64(branchingFactor) + } + + return int64(totalChunks) + 1 +} + // The size of buffer used for bufio.Reader on LazyChunkReader passed to // http.ServeContent in HandleGetFile. // Warning: This value influences the number of chunk requests and chunker join goroutines diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index e82762ce0..1de41d18d 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -44,7 +44,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" - swarm "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" @@ -755,6 +754,7 @@ func testBzzTar(encrypted bool, t *testing.T) { t.Fatal(err) } req.Header.Add("Content-Type", "application/x-tar") + req.Header.Add(SwarmTagHeaderName, "test-upload") client := &http.Client{} resp2, err := client.Do(req) if err != nil { @@ -763,6 +763,11 @@ func testBzzTar(encrypted bool, t *testing.T) { if resp2.StatusCode != http.StatusOK { t.Fatalf("err %s", resp2.Status) } + + // check that the tag was written correctly + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 4, 4, 0, 4) + swarmHash, err := ioutil.ReadAll(resp2.Body) resp2.Body.Close() if err != nil { @@ -834,6 +839,75 @@ func testBzzTar(encrypted bool, t *testing.T) { t.Fatalf("file %s did not pass content assertion", hdr.Name) } } + + // now check the tags endpoint +} + +// TestBzzCorrectTagEstimate checks that the HTTP middleware sets the total number of chunks +// in the tag according to an estimate from the HTTP request Content-Length header divided +// by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore +// concurrency was introduced to slow down the HTTP request +func TestBzzCorrectTagEstimate(t *testing.T) { + srv := NewTestSwarmServer(t, serverFunc, nil) + defer srv.Close() + + for _, v := range []struct { + toEncrypt bool + expChunks int64 + }{ + {toEncrypt: false, expChunks: 248}, + {toEncrypt: true, expChunks: 250}, + } { + pr, pw := io.Pipe() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addr := "" + if v.toEncrypt { + addr = "encrypt" + } + req, err := http.NewRequest("POST", srv.URL+"/bzz:/"+addr, pr) + if err != nil { + t.Fatal(err) + } + + req = req.WithContext(ctx) + req.ContentLength = 1000000 + req.Header.Add(SwarmTagHeaderName, "1000000") + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Millisecond): + _, err := pw.Write([]byte{0}) + if err != nil { + t.Error(err) + } + } + } + }() + go func() { + transport := http.DefaultTransport + _, err := transport.RoundTrip(req) + if err != nil { + t.Error(err) + } + }() + done := false + for !done { + switch len(srv.Tags.All()) { + case 0: + <-time.After(10 * time.Millisecond) + case 1: + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks) + srv.Tags.Delete(tag.Uid) + done = true + } + } + } } // TestBzzRootRedirect tests that getting the root path of a manifest without @@ -851,19 +925,11 @@ func testBzzRootRedirect(toEncrypt bool, t *testing.T) { defer srv.Close() // create a manifest with some data at the root path - client := swarm.NewClient(srv.URL) data := []byte("data") - file := &swarm.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), - ManifestEntry: api.ManifestEntry{ - Path: "", - ContentType: "text/plain", - Size: int64(len(data)), - }, - } - hash, err := client.Upload(file, "", toEncrypt) - if err != nil { - t.Fatal(err) + headers := map[string]string{"Content-Type": "text/plain"} + res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader(data), headers, false, t) + if res.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK) } // define a CheckRedirect hook which ensures there is only a single @@ -1046,21 +1112,10 @@ func TestGet(t *testing.T) { func TestModify(t *testing.T) { srv := NewTestSwarmServer(t, serverFunc, nil) defer srv.Close() - - swarmClient := swarm.NewClient(srv.URL) - data := []byte("data") - file := &swarm.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), - ManifestEntry: api.ManifestEntry{ - Path: "", - ContentType: "text/plain", - Size: int64(len(data)), - }, - } - - hash, err := swarmClient.Upload(file, "", false) - if err != nil { - t.Fatal(err) + headers := map[string]string{"Content-Type": "text/plain"} + res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader([]byte("data")), headers, false, t) + if res.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK) } for _, testCase := range []struct { @@ -1283,6 +1338,46 @@ func TestBzzGetFileWithResolver(t *testing.T) { } } +// TestCalculateNumberOfChunks is a unit test for the chunk-number-according-to-content-length +// calculation +func TestCalculateNumberOfChunks(t *testing.T) { + + //test cases: + for _, tc := range []struct{ len, chunks int64 }{ + {len: 1000, chunks: 1}, + {len: 5000, chunks: 3}, + {len: 10000, chunks: 4}, + {len: 100000, chunks: 26}, + {len: 1000000, chunks: 248}, + {len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1}, + } { + res := calculateNumberOfChunks(tc.len, false) + if res != tc.chunks { + t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res) + } + } +} + +// TestCalculateNumberOfChunksEncrypted is a unit test for the chunk-number-according-to-content-length +// calculation with encryption (branching factor=64) +func TestCalculateNumberOfChunksEncrypted(t *testing.T) { + + //test cases: + for _, tc := range []struct{ len, chunks int64 }{ + {len: 1000, chunks: 1}, + {len: 5000, chunks: 3}, + {len: 10000, chunks: 4}, + {len: 100000, chunks: 26}, + {len: 1000000, chunks: 245 + 4 + 1}, + {len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1}, + } { + res := calculateNumberOfChunks(tc.len, true) + if res != tc.chunks { + t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res) + } + } +} + // testResolver implements the Resolver interface and either returns the given // hash if it is set, or returns a "name not found" error type testResolveValidator struct { @@ -1308,6 +1403,7 @@ func (t *testResolveValidator) Resolve(addr string) (common.Hash, error) { func (t *testResolveValidator) Owner(node [32]byte) (addr common.Address, err error) { return } + func (t *testResolveValidator) HeaderByNumber(context.Context, *big.Int) (header *types.Header, err error) { return } diff --git a/swarm/api/http/test_server.go b/swarm/api/http/test_server.go index 928a6e972..fbb3366e2 100644 --- a/swarm/api/http/test_server.go +++ b/swarm/api/http/test_server.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/storage/localstore" @@ -44,7 +45,9 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso t.Fatal(err) } - fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) + tags := chunk.NewTags() + fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams(), tags) + // Swarm feeds test setup feedsDir, err := ioutil.TempDir("", "swarm-feeds-test") if err != nil { @@ -56,12 +59,13 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso t.Fatal(err) } - swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil) + swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil, tags) apiServer := httptest.NewServer(serverFunc(swarmApi)) tss := &TestSwarmServer{ Server: apiServer, FileStore: fileStore, + Tags: tags, dir: swarmDir, Hasher: storage.MakeHashFunc(storage.DefaultHash)(), cleanup: func() { @@ -81,6 +85,7 @@ type TestSwarmServer struct { *httptest.Server Hasher storage.SwarmHash FileStore *storage.FileStore + Tags *chunk.Tags dir string cleanup func() CurrentTime uint64 diff --git a/swarm/api/manifest_test.go b/swarm/api/manifest_test.go index 1c8e53c43..c193ebcb4 100644 --- a/swarm/api/manifest_test.go +++ b/swarm/api/manifest_test.go @@ -25,6 +25,7 @@ import ( "strings" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -42,7 +43,7 @@ func manifest(paths ...string) (manifestReader storage.LazySectionReader) { func testGetEntry(t *testing.T, path, match string, multiple bool, paths ...string) *manifestTrie { quitC := make(chan bool) - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(manifest(paths...), ref, fileStore, false, quitC, NOOPDecrypt) if err != nil { @@ -99,7 +100,7 @@ func TestGetEntry(t *testing.T) { func TestExactMatch(t *testing.T) { quitC := make(chan bool) mf := manifest("shouldBeExactMatch.css", "shouldBeExactMatch.css.map") - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(mf, ref, fileStore, false, quitC, nil) if err != nil { @@ -132,7 +133,7 @@ func TestAddFileWithManifestPath(t *testing.T) { reader := &storage.LazyTestSectionReader{ SectionReader: io.NewSectionReader(bytes.NewReader(manifest), 0, int64(len(manifest))), } - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(reader, ref, fileStore, false, nil, NOOPDecrypt) if err != nil { diff --git a/swarm/api/storage.go b/swarm/api/storage.go deleted file mode 100644 index 254375b77..000000000 --- a/swarm/api/storage.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package api - -import ( - "context" - "path" - - "github.com/ethereum/go-ethereum/swarm/storage" -) - -type Response struct { - MimeType string - Status int - Size int64 - // Content []byte - Content string -} - -// implements a service -// -// DEPRECATED: Use the HTTP API instead -type Storage struct { - api *API -} - -func NewStorage(api *API) *Storage { - return &Storage{api} -} - -// Put uploads the content to the swarm with a simple manifest speficying -// its content type -// -// DEPRECATED: Use the HTTP API instead -func (s *Storage) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (storage.Address, func(context.Context) error, error) { - return s.api.Put(ctx, content, contentType, toEncrypt) -} - -// Get retrieves the content from bzzpath and reads the response in full -// It returns the Response object, which serialises containing the -// response body as the value of the Content field -// NOTE: if error is non-nil, sResponse may still have partial content -// the actual size of which is given in len(resp.Content), while the expected -// size is resp.Size -// -// DEPRECATED: Use the HTTP API instead -func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) { - uri, err := Parse(path.Join("bzz:/", bzzpath)) - if err != nil { - return nil, err - } - addr, err := s.api.Resolve(ctx, uri.Addr) - if err != nil { - return nil, err - } - reader, mimeType, status, _, err := s.api.Get(ctx, nil, addr, uri.Path) - if err != nil { - return nil, err - } - quitC := make(chan bool) - expsize, err := reader.Size(ctx, quitC) - if err != nil { - return nil, err - } - body := make([]byte, expsize) - size, err := reader.Read(body) - if int64(size) == expsize { - err = nil - } - return &Response{mimeType, status, expsize, string(body[:size])}, err -} diff --git a/swarm/api/storage_test.go b/swarm/api/storage_test.go deleted file mode 100644 index ef96972b6..000000000 --- a/swarm/api/storage_test.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package api - -import ( - "context" - "testing" -) - -func testStorage(t *testing.T, f func(*Storage, bool)) { - testAPI(t, func(api *API, toEncrypt bool) { - f(NewStorage(api), toEncrypt) - }) -} - -func TestStoragePutGet(t *testing.T) { - testStorage(t, func(api *Storage, toEncrypt bool) { - content := "hello" - exp := expResponse(content, "text/plain", 0) - // exp := expResponse([]byte(content), "text/plain", 0) - ctx := context.TODO() - bzzkey, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - err = wait(ctx) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - bzzhash := bzzkey.Hex() - // to check put against the API#Get - resp0 := testGet(t, api.api, bzzhash, "") - checkResponse(t, resp0, exp) - - // check storage#Get - resp, err := api.Get(context.TODO(), bzzhash) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - checkResponse(t, &testResponse{nil, resp}, exp) - }) -} |