diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-02-20 16:48:12 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-20 16:48:12 +0800 |
commit | c942700427557e3ff6de3aaf6b916e2f056c1ec2 (patch) | |
tree | cadf68e7206d6de42b1eefc6967214cf86e35ff2 /cmd/swarm/swarm-smoke | |
parent | 7fa3509e2eaf1a4ebc12344590e5699406690f15 (diff) | |
parent | cde35439e058b4f9579830fec9fb65ae0b998346 (diff) | |
download | go-tangerine-1.8.23.tar go-tangerine-1.8.23.tar.gz go-tangerine-1.8.23.tar.bz2 go-tangerine-1.8.23.tar.lz go-tangerine-1.8.23.tar.xz go-tangerine-1.8.23.tar.zst go-tangerine-1.8.23.zip |
Merge pull request #19029 from holiman/update1.8v1.8.23
Update1.8
Diffstat (limited to 'cmd/swarm/swarm-smoke')
-rw-r--r-- | cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 127 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/main.go | 100 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/sliding_window.go | 131 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/upload_and_sync.go | 245 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/upload_speed.go | 73 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/util.go | 235 |
6 files changed, 606 insertions, 305 deletions
diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 2c5e3fd23..6b3fed0c7 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -2,13 +2,10 @@ package main import ( "bytes" - "context" "crypto/md5" "fmt" "io" "io/ioutil" - "net/http" - "net/http/httptrace" "os" "os/exec" "strings" @@ -19,12 +16,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/api/client" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" - colorable "github.com/mattn/go-colorable" - opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -33,34 +26,28 @@ const ( feedRandomDataLength = 8 ) -func cliFeedUploadAndSync(c *cli.Context) error { - metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1) - log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) - +func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { errc := make(chan error) + go func() { - errc <- feedUploadAndSync(c) + errc <- feedUploadAndSync(ctx, tuid) }() select { case err := <-errc: if err != nil { - metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) } return err case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) + return fmt.Errorf("timeout after %v sec", timeout) } } -// TODO: retrieve with manifest + extract repeating code -func feedUploadAndSync(c *cli.Context) error { - defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now()) - - generateEndpoints(scheme, cluster, appName, from, to) - - log.Info("generating and uploading feeds to " + endpoints[0] + " and syncing") +func feedUploadAndSync(c *cli.Context, tuid string) error { + log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing") // create a random private key to sign updates with and derive the address pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test") @@ -114,7 +101,7 @@ func feedUploadAndSync(c *cli.Context) error { // create feed manifest, topic only var out bytes.Buffer - cmd := exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--user", userHex) + cmd := exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--user", userHex) cmd.Stdout = &out log.Debug("create feed manifest topic cmd", "cmd", cmd) err = cmd.Run() @@ -129,7 +116,7 @@ func feedUploadAndSync(c *cli.Context) error { out.Reset() // create feed manifest, subtopic only - cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--name", subTopicHex, "--user", userHex) + cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--name", subTopicHex, "--user", userHex) cmd.Stdout = &out log.Debug("create feed manifest subtopic cmd", "cmd", cmd) err = cmd.Run() @@ -144,7 +131,7 @@ func feedUploadAndSync(c *cli.Context) error { out.Reset() // create feed manifest, merged topic - cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex) + cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex) cmd.Stdout = &out log.Debug("create feed manifest mergetopic cmd", "cmd", cmd) err = cmd.Run() @@ -170,7 +157,7 @@ func feedUploadAndSync(c *cli.Context) error { dataHex := hexutil.Encode(data) // update with topic - cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, dataHex) + cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, dataHex) cmd.Stdout = &out log.Debug("update feed manifest topic cmd", "cmd", cmd) err = cmd.Run() @@ -181,7 +168,7 @@ func feedUploadAndSync(c *cli.Context) error { out.Reset() // update with subtopic - cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, dataHex) + cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, dataHex) cmd.Stdout = &out log.Debug("update feed manifest subtopic cmd", "cmd", cmd) err = cmd.Run() @@ -192,7 +179,7 @@ func feedUploadAndSync(c *cli.Context) error { out.Reset() // update with merged topic - cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex) + cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex) cmd.Stdout = &out log.Debug("update feed manifest merged topic cmd", "cmd", cmd) err = cmd.Run() @@ -206,14 +193,14 @@ func feedUploadAndSync(c *cli.Context) error { // retrieve the data wg := sync.WaitGroup{} - for _, endpoint := range endpoints { + for _, host := range hosts { // raw retrieve, topic only for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} { wg.Add(1) ruid := uuid.New()[:8] go func(hex string, endpoint string, ruid string) { for { - err := fetchFeed(hex, userHex, endpoint, dataHash, ruid) + err := fetchFeed(hex, userHex, httpEndpoint(host), dataHash, ruid) if err != nil { continue } @@ -221,20 +208,18 @@ func feedUploadAndSync(c *cli.Context) error { wg.Done() return } - }(hex, endpoint, ruid) - + }(hex, httpEndpoint(host), ruid) } } wg.Wait() log.Info("all endpoints synced random data successfully") // upload test file - seed := int(time.Now().UnixNano() / 1e6) - log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed) + log.Info("feed uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) randomBytes := testutil.RandomBytes(seed, filesize*1000) - hash, err := upload(&randomBytes, endpoints[0]) + hash, err := upload(randomBytes, httpEndpoint(hosts[0])) if err != nil { return err } @@ -243,15 +228,12 @@ func feedUploadAndSync(c *cli.Context) error { return err } multihashHex := hexutil.Encode(hashBytes) - fileHash, err := digest(bytes.NewReader(randomBytes)) - if err != nil { - return err - } + fileHash := h.Sum(nil) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash)) // update file with topic - cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, multihashHex) + cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, multihashHex) cmd.Stdout = &out err = cmd.Run() if err != nil { @@ -261,7 +243,7 @@ func feedUploadAndSync(c *cli.Context) error { out.Reset() // update file with subtopic - cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, multihashHex) + cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, multihashHex) cmd.Stdout = &out err = cmd.Run() if err != nil { @@ -271,7 +253,7 @@ func feedUploadAndSync(c *cli.Context) error { out.Reset() // update file with merged topic - cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex) + cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex) cmd.Stdout = &out err = cmd.Run() if err != nil { @@ -282,7 +264,7 @@ func feedUploadAndSync(c *cli.Context) error { time.Sleep(3 * time.Second) - for _, endpoint := range endpoints { + for _, host := range hosts { // manifest retrieve, topic only for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} { @@ -290,7 +272,7 @@ func feedUploadAndSync(c *cli.Context) error { ruid := uuid.New()[:8] go func(url string, endpoint string, ruid string) { for { - err := fetch(url, endpoint, fileHash, ruid) + err := fetch(url, endpoint, fileHash, ruid, "") if err != nil { continue } @@ -298,7 +280,7 @@ func feedUploadAndSync(c *cli.Context) error { wg.Done() return } - }(url, endpoint, ruid) + }(url, httpEndpoint(host), ruid) } } @@ -307,60 +289,3 @@ func feedUploadAndSync(c *cli.Context) error { return nil } - -func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { - ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch") - defer sp.Finish() - - log.Trace("sleeping", "ruid", ruid) - time.Sleep(3 * time.Second) - - log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) - - var tn time.Time - reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user - req, _ := http.NewRequest("GET", reqUri, nil) - - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)) - - trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn) - - req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) - transport := http.DefaultTransport - - //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - - tn = time.Now() - res, err := transport.RoundTrip(req) - if err != nil { - log.Error(err.Error(), "ruid", ruid) - return err - } - - log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) - - if res.StatusCode != 200 { - return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid) - } - - defer res.Body.Close() - - rdigest, err := digest(res.Body) - if err != nil { - log.Warn(err.Error(), "ruid", ruid) - return err - } - - if !bytes.Equal(rdigest, original) { - err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) - - return nil -} diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 66cecdc5c..43d2c1ff5 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -37,18 +37,16 @@ var ( ) var ( - endpoints []string - includeLocalhost bool - cluster string - appName string - scheme string - filesize int - syncDelay int - from int - to int - verbosity int - timeout int - single bool + allhosts string + hosts []string + filesize int + syncDelay int + httpPort int + wsPort int + verbosity int + timeout int + single bool + trackTimeout int ) func main() { @@ -59,39 +57,22 @@ func main() { app.Flags = []cli.Flag{ cli.StringFlag{ - Name: "cluster-endpoint", - Value: "prod", - Usage: "cluster to point to (prod or a given namespace)", - Destination: &cluster, - }, - cli.StringFlag{ - Name: "app", - Value: "swarm", - Usage: "application to point to (swarm or swarm-private)", - Destination: &appName, + Name: "hosts", + Value: "", + Usage: "comma-separated list of swarm hosts", + Destination: &allhosts, }, cli.IntFlag{ - Name: "cluster-from", - Value: 8501, - Usage: "swarm node (from)", - Destination: &from, + Name: "http-port", + Value: 80, + Usage: "http port", + Destination: &httpPort, }, cli.IntFlag{ - Name: "cluster-to", - Value: 8512, - Usage: "swarm node (to)", - Destination: &to, - }, - cli.StringFlag{ - Name: "cluster-scheme", - Value: "http", - Usage: "http or https", - Destination: &scheme, - }, - cli.BoolFlag{ - Name: "include-localhost", - Usage: "whether to include localhost:8500 as an endpoint", - Destination: &includeLocalhost, + Name: "ws-port", + Value: 8546, + Usage: "ws port", + Destination: &wsPort, }, cli.IntFlag{ Name: "filesize", @@ -122,6 +103,12 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, + cli.IntFlag{ + Name: "track-timeout", + Value: 5, + Usage: "timeout in seconds to wait for GetAllReferences to return", + Destination: &trackTimeout, + }, } app.Flags = append(app.Flags, []cli.Flag{ @@ -130,7 +117,7 @@ func main() { swarmmetrics.MetricsInfluxDBDatabaseFlag, swarmmetrics.MetricsInfluxDBUsernameFlag, swarmmetrics.MetricsInfluxDBPasswordFlag, - swarmmetrics.MetricsInfluxDBHostTagFlag, + swarmmetrics.MetricsInfluxDBTagsFlag, }...) app.Flags = append(app.Flags, tracing.Flags...) @@ -140,13 +127,25 @@ func main() { Name: "upload_and_sync", Aliases: []string{"c"}, Usage: "upload and sync", - Action: cliUploadAndSync, + Action: wrapCliCommand("upload-and-sync", uploadAndSyncCmd), }, { Name: "feed_sync", Aliases: []string{"f"}, Usage: "feed update generate, upload and sync", - Action: cliFeedUploadAndSync, + Action: wrapCliCommand("feed-and-sync", feedUploadAndSyncCmd), + }, + { + Name: "upload_speed", + Aliases: []string{"u"}, + Usage: "measure upload speed", + Action: wrapCliCommand("upload-speed", uploadSpeedCmd), + }, + { + Name: "sliding_window", + Aliases: []string{"s"}, + Usage: "measure network aggregate capacity", + Action: wrapCliCommand("sliding-window", slidingWindowCmd), }, } @@ -177,13 +176,14 @@ func emitMetrics(ctx *cli.Context) error { database = ctx.GlobalString(swarmmetrics.MetricsInfluxDBDatabaseFlag.Name) username = ctx.GlobalString(swarmmetrics.MetricsInfluxDBUsernameFlag.Name) password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name) - hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) + tags = ctx.GlobalString(swarmmetrics.MetricsInfluxDBTagsFlag.Name) ) - return influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ - "host": hosttag, - "version": gitCommit, - "filesize": fmt.Sprintf("%v", filesize), - }) + + tagsMap := utils.SplitTagsFlag(tags) + tagsMap["version"] = gitCommit + tagsMap["filesize"] = fmt.Sprintf("%v", filesize) + + return influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", tagsMap) } return nil diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go new file mode 100644 index 000000000..d313bbc37 --- /dev/null +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -0,0 +1,131 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "bytes" + "fmt" + "math/rand" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/testutil" + "github.com/pborman/uuid" + + cli "gopkg.in/urfave/cli.v1" +) + +type uploadResult struct { + hash string + digest []byte +} + +func slidingWindowCmd(ctx *cli.Context, tuid string) error { + errc := make(chan error) + + go func() { + errc <- slidingWindow(ctx, tuid) + }() + + select { + case err := <-errc: + if err != nil { + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) + + return fmt.Errorf("timeout after %v sec", timeout) + } +} + +func slidingWindow(ctx *cli.Context, tuid string) error { + hashes := []uploadResult{} //swarm hashes of the uploads + nodes := len(hosts) + const iterationTimeout = 30 * time.Second + log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) + uploadedBytes := 0 + networkDepth := 0 + errored := false + +outer: + for { + log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) + + t1 := time.Now() + + randomBytes := testutil.RandomBytes(seed, filesize*1000) + + hash, err := upload(randomBytes, httpEndpoint(hosts[0])) + if err != nil { + log.Error(err.Error()) + return err + } + + metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1) + + fhash, err := digest(bytes.NewReader(randomBytes)) + if err != nil { + log.Error(err.Error()) + return err + } + + log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) + hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) + time.Sleep(time.Duration(syncDelay) * time.Second) + uploadedBytes += filesize * 1000 + + for i, v := range hashes { + timeout := time.After(time.Duration(timeout) * time.Second) + errored = false + + inner: + for { + select { + case <-timeout: + errored = true + log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) + metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) + break inner + default: + idx := 1 + rand.Intn(len(hosts)-1) + ruid := uuid.New()[:8] + start := time.Now() + err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") + if err != nil { + continue inner + } + metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start) + break inner + } + } + + if errored { + break outer + } + networkDepth = i + metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth)) + } + } + + log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", networkDepth*filesize) + log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize) + + return nil +} diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index d605f79a3..90230df25 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -19,91 +19,122 @@ package main import ( "bytes" "context" - "crypto/md5" - crand "crypto/rand" - "errors" "fmt" - "io" "io/ioutil" "math/rand" - "net/http" - "net/http/httptrace" "os" "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/api" - "github.com/ethereum/go-ethereum/swarm/api/client" - "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" - opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) -func generateEndpoints(scheme string, cluster string, app string, from int, to int) { - if cluster == "prod" { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) - } - } else { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) - } - } - - if includeLocalhost { - endpoints = append(endpoints, "http://localhost:8500") - } -} - -func cliUploadAndSync(c *cli.Context) error { - log.PrintOrigins(true) - log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - - metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) +func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { + randomBytes := testutil.RandomBytes(seed, filesize*1000) errc := make(chan error) + go func() { - errc <- uploadAndSync(c) + errc <- uplaodAndSync(ctx, randomBytes, tuid) }() select { case err := <-errc: if err != nil { - metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) } return err case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1) - return fmt.Errorf("timeout after %v sec", timeout) + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) + + e := fmt.Errorf("timeout after %v sec", timeout) + // trigger debug functionality on randomBytes + err := trackChunks(randomBytes[:]) + if err != nil { + e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err) + } + + return e } } -func uploadAndSync(c *cli.Context) error { - defer func(now time.Time) { - totalTime := time.Since(now) +func trackChunks(testData []byte) error { + log.Warn("Test timed out; running chunk debug sequence") - log.Info("total time", "time", totalTime, "kb", filesize) - metrics.GetOrRegisterCounter("upload-and-sync.total-time", nil).Inc(int64(totalTime)) - }(time.Now()) + addrs, err := getAllRefs(testData) + if err != nil { + return err + } + log.Trace("All references retrieved") - generateEndpoints(scheme, cluster, appName, from, to) - seed := int(time.Now().UnixNano() / 1e6) - log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) + // has-chunks + for _, host := range hosts { + httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + log.Trace("Calling `Has` on host", "httpHost", httpHost) + rpcClient, err := rpc.Dial(httpHost) + if err != nil { + log.Trace("Error dialing host", "err", err) + return err + } + log.Trace("rpc dial ok") + var hasInfo []api.HasInfo + err = rpcClient.Call(&hasInfo, "bzz_has", addrs) + if err != nil { + log.Trace("Error calling host", "err", err) + return err + } + log.Trace("rpc call ok") + count := 0 + for _, info := range hasInfo { + if !info.Has { + count++ + log.Error("Host does not have chunk", "host", httpHost, "chunk", info.Addr) + } + } + if count == 0 { + log.Info("Host reported to have all chunks", "host", httpHost) + } + } + return nil +} - randomBytes := testutil.RandomBytes(seed, filesize*1000) +func getAllRefs(testData []byte) (storage.AddressCollection, error) { + log.Trace("Getting all references for given root hash") + datadir, err := ioutil.TempDir("", "chunk-debug") + if err != nil { + return nil, fmt.Errorf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second) + defer cancel() + + reader := bytes.NewReader(testData) + return fileStore.GetAllReferences(ctx, reader, false) +} + +func uplaodAndSync(c *cli.Context, randomBytes []byte, tuid string) error { + log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed) t1 := time.Now() - hash, err := upload(&randomBytes, endpoints[0]) + hash, err := upload(randomBytes, httpEndpoint(hosts[0])) if err != nil { log.Error(err.Error()) return err } - metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1))) + t2 := time.Since(t1) + metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).Update(t2) fhash, err := digest(bytes.NewReader(randomBytes)) if err != nil { @@ -111,147 +142,53 @@ func uploadAndSync(c *cli.Context) error { return err } - log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) + log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) time.Sleep(time.Duration(syncDelay) * time.Second) wg := sync.WaitGroup{} if single { - rand.Seed(time.Now().UTC().UnixNano()) - randIndex := 1 + rand.Intn(len(endpoints)-1) + randIndex := 1 + rand.Intn(len(hosts)-1) ruid := uuid.New()[:8] wg.Add(1) go func(endpoint string, ruid string) { for { start := time.Now() - err := fetch(hash, endpoint, fhash, ruid) - fetchTime := time.Since(start) + err := fetch(hash, endpoint, fhash, ruid, tuid) if err != nil { continue } + ended := time.Since(start) - metrics.GetOrRegisterMeter("upload-and-sync.single.fetch-time", nil).Mark(int64(fetchTime)) + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) + log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) wg.Done() return } - }(endpoints[randIndex], ruid) + }(httpEndpoint(hosts[randIndex]), ruid) } else { - for _, endpoint := range endpoints { + for _, endpoint := range hosts[1:] { ruid := uuid.New()[:8] wg.Add(1) go func(endpoint string, ruid string) { for { start := time.Now() - err := fetch(hash, endpoint, fhash, ruid) - fetchTime := time.Since(start) + err := fetch(hash, endpoint, fhash, ruid, tuid) if err != nil { continue } + ended := time.Since(start) - metrics.GetOrRegisterMeter("upload-and-sync.each.fetch-time", nil).Mark(int64(fetchTime)) + metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended) + log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) wg.Done() return } - }(endpoint, ruid) + }(httpEndpoint(endpoint), ruid) } } wg.Wait() - log.Info("all endpoints synced random file successfully") + log.Info("all hosts synced random file successfully") return nil } - -// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file -func fetch(hash string, endpoint string, original []byte, ruid string) error { - ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") - defer sp.Finish() - - log.Trace("sleeping", "ruid", ruid) - time.Sleep(3 * time.Second) - log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) - - var tn time.Time - reqUri := endpoint + "/bzz:/" + hash + "/" - req, _ := http.NewRequest("GET", reqUri, nil) - - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)) - - trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) - - req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) - transport := http.DefaultTransport - - //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - - tn = time.Now() - res, err := transport.RoundTrip(req) - if err != nil { - log.Error(err.Error(), "ruid", ruid) - return err - } - log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) - - if res.StatusCode != 200 { - err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - defer res.Body.Close() - - rdigest, err := digest(res.Body) - if err != nil { - log.Warn(err.Error(), "ruid", ruid) - return err - } - - if !bytes.Equal(rdigest, original) { - err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) - - return nil -} - -// upload is uploading a file `f` to `endpoint` via the `swarm up` cmd -func upload(dataBytes *[]byte, endpoint string) (string, error) { - swarm := client.NewClient(endpoint) - f := &client.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), - ManifestEntry: api.ManifestEntry{ - ContentType: "text/plain", - Mode: 0660, - Size: int64(len(*dataBytes)), - }, - } - - // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - return swarm.Upload(f, "", false) -} - -func digest(r io.Reader) ([]byte, error) { - h := md5.New() - _, err := io.Copy(h, r) - if err != nil { - return nil, err - } - return h.Sum(nil), nil -} - -// generates random data in heap buffer -func generateRandomData(datasize int) ([]byte, error) { - b := make([]byte, datasize) - c, err := crand.Read(b) - if err != nil { - return nil, err - } else if c != datasize { - return nil, errors.New("short read") - } - return b, nil -} diff --git a/cmd/swarm/swarm-smoke/upload_speed.go b/cmd/swarm/swarm-smoke/upload_speed.go new file mode 100644 index 000000000..20bf7b86c --- /dev/null +++ b/cmd/swarm/swarm-smoke/upload_speed.go @@ -0,0 +1,73 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "bytes" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/testutil" + + cli "gopkg.in/urfave/cli.v1" +) + +func uploadSpeedCmd(ctx *cli.Context, tuid string) error { + log.Info("uploading to "+hosts[0], "tuid", tuid, "seed", seed) + randomBytes := testutil.RandomBytes(seed, filesize*1000) + + errc := make(chan error) + + go func() { + errc <- uploadSpeed(ctx, tuid, randomBytes) + }() + + select { + case err := <-errc: + if err != nil { + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) + + // trigger debug functionality on randomBytes + + return fmt.Errorf("timeout after %v sec", timeout) + } +} + +func uploadSpeed(c *cli.Context, tuid string, data []byte) error { + t1 := time.Now() + hash, err := upload(data, hosts[0]) + if err != nil { + log.Error(err.Error()) + return err + } + metrics.GetOrRegisterCounter("upload-speed.upload-time", nil).Inc(int64(time.Since(t1))) + + fhash, err := digest(bytes.NewReader(data)) + if err != nil { + log.Error(err.Error()) + return err + } + + log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) + return nil +} diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go new file mode 100644 index 000000000..87abb44b0 --- /dev/null +++ b/cmd/swarm/swarm-smoke/util.go @@ -0,0 +1,235 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "bytes" + "context" + "crypto/md5" + crand "crypto/rand" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/http/httptrace" + "os" + "strings" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/api/client" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pborman/uuid" + cli "gopkg.in/urfave/cli.v1" +) + +var ( + commandName = "" + seed = int(time.Now().UTC().UnixNano()) +) + +func init() { + rand.Seed(int64(seed)) +} + +func httpEndpoint(host string) string { + return fmt.Sprintf("http://%s:%d", host, httpPort) +} + +func wsEndpoint(host string) string { + return fmt.Sprintf("ws://%s:%d", host, wsPort) +} + +func wrapCliCommand(name string, command func(*cli.Context, string) error) func(*cli.Context) error { + return func(ctx *cli.Context) error { + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) + + // test uuid + tuid := uuid.New()[:8] + + commandName = name + + hosts = strings.Split(allhosts, ",") + + defer func(now time.Time) { + totalTime := time.Since(now) + log.Info("total time", "tuid", tuid, "time", totalTime, "kb", filesize) + metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) + }(time.Now()) + + log.Info("smoke test starting", "tuid", tuid, "task", name, "timeout", timeout) + metrics.GetOrRegisterCounter(name, nil).Inc(1) + + return command(ctx, tuid) + } +} + +func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { + ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch") + defer sp.Finish() + + log.Trace("sleeping", "ruid", ruid) + time.Sleep(3 * time.Second) + + log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) + + var tn time.Time + reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user + req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + tn = time.Now() + res, err := transport.RoundTrip(req) + if err != nil { + log.Error(err.Error(), "ruid", ruid) + return err + } + + log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) + + if res.StatusCode != 200 { + return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid) + } + + defer res.Body.Close() + + rdigest, err := digest(res.Body) + if err != nil { + log.Warn(err.Error(), "ruid", ruid) + return err + } + + if !bytes.Equal(rdigest, original) { + err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) + log.Warn(err.Error(), "ruid", ruid) + return err + } + + log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) + + return nil +} + +// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file +func fetch(hash string, endpoint string, original []byte, ruid string, tuid string) error { + ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") + defer sp.Finish() + + log.Info("http get request", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash) + + var tn time.Time + reqUri := endpoint + "/bzz:/" + hash + "/" + req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := client.GetClientTrace(commandName+" - http get", commandName, ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + tn = time.Now() + res, err := transport.RoundTrip(req) + if err != nil { + log.Error(err.Error(), "ruid", ruid) + return err + } + log.Info("http get response", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) + + if res.StatusCode != 200 { + err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) + log.Warn(err.Error(), "ruid", ruid) + return err + } + + defer res.Body.Close() + + rdigest, err := digest(res.Body) + if err != nil { + log.Warn(err.Error(), "ruid", ruid) + return err + } + + if !bytes.Equal(rdigest, original) { + err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) + log.Warn(err.Error(), "ruid", ruid) + return err + } + + log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) + + return nil +} + +// upload an arbitrary byte as a plaintext file to `endpoint` using the api client +func upload(data []byte, endpoint string) (string, error) { + swarm := client.NewClient(endpoint) + f := &client.File{ + ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), + ManifestEntry: api.ManifestEntry{ + ContentType: "text/plain", + Mode: 0660, + Size: int64(len(data)), + }, + } + + // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. + return swarm.Upload(f, "", false) +} + +func digest(r io.Reader) ([]byte, error) { + h := md5.New() + _, err := io.Copy(h, r) + if err != nil { + return nil, err + } + return h.Sum(nil), nil +} + +// generates random data in heap buffer +func generateRandomData(datasize int) ([]byte, error) { + b := make([]byte, datasize) + c, err := crand.Read(b) + if err != nil { + return nil, err + } else if c != datasize { + return nil, errors.New("short read") + } + return b, nil +} |