diff options
author | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-04-11 16:26:52 +0800 |
---|---|---|
committer | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-05-10 18:26:30 +0800 |
commit | 993b145f25845e50e8af41ffb1116eaee381d693 (patch) | |
tree | 47a88eec27f66b7237512c862d7ab2f8e9f314d3 /cmd | |
parent | 996755c4a832afce8629a771cab8879c88c98355 (diff) | |
download | go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.gz go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.bz2 go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.lz go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.xz go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.zst go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.zip |
swarm/storage/localstore: fix export db.Put signature
cmd/swarm/swarm-smoke: improve smoke tests (#1337)
swarm/network: remove dead code (#1339)
swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342)
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 8 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/main.go | 33 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/sliding_window.go | 10 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/upload_and_sync.go | 228 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/upload_speed.go | 8 | ||||
-rw-r--r-- | cmd/swarm/swarm-smoke/util.go | 18 |
6 files changed, 184 insertions, 121 deletions
diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 6b3fed0c7..b5ffc43d2 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -26,11 +26,11 @@ const ( feedRandomDataLength = 8 ) -func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { +func feedUploadAndSyncCmd(ctx *cli.Context) error { errc := make(chan error) go func() { - errc <- feedUploadAndSync(ctx, tuid) + errc <- feedUploadAndSync(ctx) }() select { @@ -46,7 +46,7 @@ func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { } } -func feedUploadAndSync(c *cli.Context, tuid string) error { +func feedUploadAndSync(c *cli.Context) error { log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing") // create a random private key to sign updates with and derive the address @@ -272,7 +272,7 @@ func feedUploadAndSync(c *cli.Context, tuid string) error { ruid := uuid.New()[:8] go func(url string, endpoint string, ruid string) { for { - err := fetch(url, endpoint, fileHash, ruid, "") + err := fetch(url, endpoint, fileHash, ruid) if err != nil { continue } diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 860fbcc1d..2c1dd65a0 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -37,17 +37,17 @@ var ( ) var ( - allhosts string - hosts []string - filesize int - inputSeed int - syncDelay int - httpPort int - wsPort int - verbosity int - timeout int - single bool - trackTimeout int + allhosts string + hosts []string + filesize int + syncDelay int + inputSeed int + httpPort int + wsPort int + verbosity int + timeout int + single bool + onlyUpload bool ) func main() { @@ -101,7 +101,7 @@ func main() { }, cli.IntFlag{ Name: "timeout", - Value: 120, + Value: 180, Usage: "timeout in seconds after which kill the process", Destination: &timeout, }, @@ -110,11 +110,10 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, - cli.IntFlag{ - Name: "track-timeout", - Value: 5, - Usage: "timeout in seconds to wait for GetAllReferences to return", - Destination: &trackTimeout, + cli.BoolFlag{ + Name: "only-upload", + Usage: "whether to only upload content to a single node without fetching", + Destination: &onlyUpload, }, } diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index d589124bd..ab082c543 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -35,11 +35,11 @@ type uploadResult struct { digest []byte } -func slidingWindowCmd(ctx *cli.Context, tuid string) error { +func slidingWindowCmd(ctx *cli.Context) error { errc := make(chan error) go func() { - errc <- slidingWindow(ctx, tuid) + errc <- slidingWindow(ctx) }() err := <-errc @@ -49,10 +49,10 @@ func slidingWindowCmd(ctx *cli.Context, tuid string) error { return err } -func slidingWindow(ctx *cli.Context, tuid string) error { +func slidingWindow(ctx *cli.Context) error { var hashes []uploadResult //swarm hashes of the uploads nodes := len(hosts) - log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) + log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) uploadedBytes := 0 networkDepth := 0 errored := false @@ -107,7 +107,7 @@ outer: start = time.Now() // fetch hangs when swarm dies out, so we have to jump through a bit more hoops to actually // catch the timeout, but also allow this retry logic - err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") + err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid) if err != nil { log.Error("error fetching hash", "err", err) continue diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 6c20a4fa6..6a434a0b2 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -23,22 +23,20 @@ import ( "io/ioutil" "math/rand" "os" - "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" - "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) -func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { +func uploadAndSyncCmd(ctx *cli.Context) error { // use input seed if it has been set if inputSeed != 0 { seed = inputSeed @@ -49,7 +47,7 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { errc := make(chan error) go func() { - errc <- uploadAndSync(ctx, randomBytes, tuid) + errc <- uploadAndSync(ctx, randomBytes) }() var err error @@ -65,7 +63,7 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { } // trigger debug functionality on randomBytes - e := trackChunks(randomBytes[:]) + e := trackChunks(randomBytes[:], true) if e != nil { log.Error(e.Error()) } @@ -73,50 +71,84 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { return err } -func trackChunks(testData []byte) error { +func trackChunks(testData []byte, submitMetrics bool) error { addrs, err := getAllRefs(testData) if err != nil { return err } for i, ref := range addrs { - log.Trace(fmt.Sprintf("ref %d", i), "ref", ref) + log.Debug(fmt.Sprintf("ref %d", i), "ref", ref) } + var globalYes, globalNo int + var globalMu sync.Mutex + var hasErr bool + + var wg sync.WaitGroup + wg.Add(len(hosts)) + for _, host := range hosts { - httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + host := host + go func() { + defer wg.Done() + httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + rpcClient, err := rpc.DialContext(ctx, httpHost) + if rpcClient != nil { + defer rpcClient.Close() + } + if err != nil { + log.Error("error dialing host", "err", err, "host", httpHost) + hasErr = true + return + } - hostChunks := []string{} + var hostChunks string + err = rpcClient.Call(&hostChunks, "bzz_has", addrs) + if err != nil { + log.Error("error calling rpc client", "err", err, "host", httpHost) + hasErr = true + return + } - rpcClient, err := rpc.Dial(httpHost) - if err != nil { - log.Error("error dialing host", "err", err, "host", httpHost) - continue - } + yes, no := 0, 0 + for _, val := range hostChunks { + if val == '1' { + yes++ + } else { + no++ + } + } - var hasInfo []api.HasInfo - err = rpcClient.Call(&hasInfo, "bzz_has", addrs) - if err != nil { - log.Error("error calling rpc client", "err", err, "host", httpHost) - continue - } + if no == 0 { + log.Info("host reported to have all chunks", "host", host) + } - count := 0 - for _, info := range hasInfo { - if info.Has { - hostChunks = append(hostChunks, "1") - } else { - hostChunks = append(hostChunks, "0") - count++ + log.Debug("chunks", "chunks", hostChunks, "yes", yes, "no", no, "host", host) + + if submitMetrics { + globalMu.Lock() + globalYes += yes + globalNo += no + globalMu.Unlock() } - } + }() + } - if count == 0 { - log.Info("host reported to have all chunks", "host", host) - } + wg.Wait() + + if !hasErr && submitMetrics { + // remove the chunks stored on the uploader node + globalYes -= len(addrs) - log.Trace("chunks", "chunks", strings.Join(hostChunks, ""), "host", host) + metrics.GetOrRegisterCounter("deployment.chunks.yes", nil).Inc(int64(globalYes)) + metrics.GetOrRegisterCounter("deployment.chunks.no", nil).Inc(int64(globalNo)) + metrics.GetOrRegisterCounter("deployment.chunks.refs", nil).Inc(int64(len(addrs))) } + return nil } @@ -130,15 +162,13 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { if err != nil { return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second) - defer cancel() reader := bytes.NewReader(testData) - return fileStore.GetAllReferences(ctx, reader, false) + return fileStore.GetAllReferences(context.Background(), reader, false) } -func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error { - log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed) +func uploadAndSync(c *cli.Context, randomBytes []byte) error { + log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) t1 := time.Now() hash, err := upload(randomBytes, httpEndpoint(hosts[0])) @@ -155,53 +185,91 @@ func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error { return err } - log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) + log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) - time.Sleep(time.Duration(syncDelay) * time.Second) + waitToSync() - wg := sync.WaitGroup{} - if single { - randIndex := 1 + rand.Intn(len(hosts)-1) - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - for { - start := time.Now() - err := fetch(hash, endpoint, fhash, ruid, tuid) - if err != nil { - continue - } - ended := time.Since(start) + log.Debug("chunks before fetch attempt", "hash", hash) - metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) - log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) - wg.Done() - return - } - }(httpEndpoint(hosts[randIndex]), ruid) - } else { - for _, endpoint := range hosts[1:] { - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - for { - start := time.Now() - err := fetch(hash, endpoint, fhash, ruid, tuid) - if err != nil { - continue - } - ended := time.Since(start) - - metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended) - log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) - wg.Done() - return - } - }(httpEndpoint(endpoint), ruid) + err = trackChunks(randomBytes, false) + if err != nil { + log.Error(err.Error()) + } + + if onlyUpload { + log.Debug("only-upload is true, stoppping test", "hash", hash) + return nil + } + + randIndex := 1 + rand.Intn(len(hosts)-1) + + for { + start := time.Now() + err := fetch(hash, httpEndpoint(hosts[randIndex]), fhash, "") + if err != nil { + time.Sleep(2 * time.Second) + continue } + ended := time.Since(start) + + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) + log.Info("fetch successful", "took", ended, "endpoint", httpEndpoint(hosts[randIndex])) + break } - wg.Wait() - log.Info("all hosts synced random file successfully") return nil } + +func isSyncing(wsHost string) (bool, error) { + rpcClient, err := rpc.Dial(wsHost) + if rpcClient != nil { + defer rpcClient.Close() + } + + if err != nil { + log.Error("error dialing host", "err", err) + return false, err + } + + var isSyncing bool + err = rpcClient.Call(&isSyncing, "bzz_isSyncing") + if err != nil { + log.Error("error calling host for isSyncing", "err", err) + return false, err + } + + log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing) + + return isSyncing, nil +} + +func waitToSync() { + t1 := time.Now() + + ns := uint64(1) + + for ns > 0 { + time.Sleep(3 * time.Second) + + notSynced := uint64(0) + var wg sync.WaitGroup + wg.Add(len(hosts)) + for i := 0; i < len(hosts); i++ { + i := i + go func(idx int) { + stillSyncing, err := isSyncing(wsEndpoint(hosts[idx])) + + if stillSyncing || err != nil { + atomic.AddUint64(¬Synced, 1) + } + wg.Done() + }(i) + } + wg.Wait() + + ns = atomic.LoadUint64(¬Synced) + } + + t2 := time.Since(t1) + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.wait-for-sync.deployment", nil).Update(t2) +} diff --git a/cmd/swarm/swarm-smoke/upload_speed.go b/cmd/swarm/swarm-smoke/upload_speed.go index 20bf7b86c..047ea0092 100644 --- a/cmd/swarm/swarm-smoke/upload_speed.go +++ b/cmd/swarm/swarm-smoke/upload_speed.go @@ -28,14 +28,14 @@ import ( cli "gopkg.in/urfave/cli.v1" ) -func uploadSpeedCmd(ctx *cli.Context, tuid string) error { - log.Info("uploading to "+hosts[0], "tuid", tuid, "seed", seed) +func uploadSpeedCmd(ctx *cli.Context) error { + log.Info("uploading to "+hosts[0], "seed", seed) randomBytes := testutil.RandomBytes(seed, filesize*1000) errc := make(chan error) go func() { - errc <- uploadSpeed(ctx, tuid, randomBytes) + errc <- uploadSpeed(ctx, randomBytes) }() select { @@ -53,7 +53,7 @@ func uploadSpeedCmd(ctx *cli.Context, tuid string) error { } } -func uploadSpeed(c *cli.Context, tuid string, data []byte) error { +func uploadSpeed(c *cli.Context, data []byte) error { t1 := time.Now() hash, err := upload(data, hosts[0]) if err != nil { diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index 87abb44b0..b95f993e8 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" - "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -59,28 +58,25 @@ func wsEndpoint(host string) string { return fmt.Sprintf("ws://%s:%d", host, wsPort) } -func wrapCliCommand(name string, command func(*cli.Context, string) error) func(*cli.Context) error { +func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { return func(ctx *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) - // test uuid - tuid := uuid.New()[:8] - commandName = name hosts = strings.Split(allhosts, ",") defer func(now time.Time) { totalTime := time.Since(now) - log.Info("total time", "tuid", tuid, "time", totalTime, "kb", filesize) + log.Info("total time", "time", totalTime, "kb", filesize) metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) }(time.Now()) - log.Info("smoke test starting", "tuid", tuid, "task", name, "timeout", timeout) + log.Info("smoke test starting", "task", name, "timeout", timeout) metrics.GetOrRegisterCounter(name, nil).Inc(1) - return command(ctx, tuid) + return command(ctx) } } @@ -142,11 +138,11 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid } // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file -func fetch(hash string, endpoint string, original []byte, ruid string, tuid string) error { +func fetch(hash string, endpoint string, original []byte, ruid string) error { ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") defer sp.Finish() - log.Info("http get request", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash) + log.Info("http get request", "ruid", ruid, "endpoint", endpoint, "hash", hash) var tn time.Time reqUri := endpoint + "/bzz:/" + hash + "/" @@ -170,7 +166,7 @@ func fetch(hash string, endpoint string, original []byte, ruid string, tuid stri log.Error(err.Error(), "ruid", ruid) return err } - log.Info("http get response", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) + log.Info("http get response", "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) if res.StatusCode != 200 { err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) |