aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/swarm/swarm-smoke/main.go7
-rw-r--r--cmd/swarm/swarm-smoke/sliding_window.go8
-rw-r--r--cmd/swarm/swarm-smoke/upload_and_sync.go16
-rw-r--r--swarm/network/stream/messages.go16
-rw-r--r--swarm/network/stream/peer.go5
5 files changed, 40 insertions, 12 deletions
diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go
index 2c1dd65a0..03e2cc2c4 100644
--- a/cmd/swarm/swarm-smoke/main.go
+++ b/cmd/swarm/swarm-smoke/main.go
@@ -40,7 +40,7 @@ var (
allhosts string
hosts []string
filesize int
- syncDelay int
+ syncDelay bool
inputSeed int
httpPort int
wsPort int
@@ -87,10 +87,9 @@ func main() {
Usage: "file size for generated random file in KB",
Destination: &filesize,
},
- cli.IntFlag{
+ cli.BoolFlag{
Name: "sync-delay",
- Value: 5,
- Usage: "duration of delay in seconds to wait for content to be synced",
+ Usage: "wait for content to be synced",
Destination: &syncDelay,
},
cli.IntFlag{
diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go
index ab082c543..6ca3d3947 100644
--- a/cmd/swarm/swarm-smoke/sliding_window.go
+++ b/cmd/swarm/swarm-smoke/sliding_window.go
@@ -81,9 +81,13 @@ outer:
return err
}
- log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay)
+ log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "wait for sync", syncDelay)
hashes = append(hashes, uploadResult{hash: hash, digest: fhash})
- time.Sleep(time.Duration(syncDelay) * time.Second)
+
+ if syncDelay {
+ waitToSync()
+ }
+
uploadedBytes += filesize * 1000
q := make(chan struct{}, 1)
d := make(chan struct{})
diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go
index d6eb87ace..7338e3473 100644
--- a/cmd/swarm/swarm-smoke/upload_and_sync.go
+++ b/cmd/swarm/swarm-smoke/upload_and_sync.go
@@ -197,7 +197,8 @@ func getBzzAddrFromHost(client *rpc.Client) (string, error) {
// we make an ugly assumption about the output format of the hive.String() method
// ideally we should replace this with an API call that returns the bzz addr for a given host,
// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
- return strings.Split(strings.Split(hive, "\n")[3], " ")[10], nil
+ ss := strings.Split(strings.Split(hive, "\n")[3], " ")
+ return ss[len(ss)-1], nil
}
// checkChunksVsMostProxHosts is checking:
@@ -284,13 +285,16 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))
- waitToSync()
+ // wait to sync and log chunks before fetch attempt, only if syncDelay is set to true
+ if syncDelay {
+ waitToSync()
- log.Debug("chunks before fetch attempt", "hash", hash)
+ log.Debug("chunks before fetch attempt", "hash", hash)
- err = trackChunks(randomBytes, false)
- if err != nil {
- log.Error(err.Error())
+ err = trackChunks(randomBytes, false)
+ if err != nil {
+ log.Error(err.Error())
+ }
}
if onlyUpload {
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 821cdaa9a..b43fdeee2 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -223,6 +223,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err)
}
+ var wantDelaySet bool
+ var wantDelay time.Time
+
ctr := 0
errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
@@ -234,6 +237,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if wait := c.NeedData(ctx, hash); wait != nil {
ctr++
want.Set(i/HashSize, true)
+
+ // measure how long it takes before we mark chunks for retrieval, and actually send the request
+ if !wantDelaySet {
+ wantDelaySet = true
+ wantDelay = time.Now()
+ }
+
// create request and wait until the chunk data arrives and is stored
go func(w func(context.Context) error) {
select {
@@ -304,6 +314,12 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
+
+ // record want delay
+ if wantDelaySet {
+ metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay)
+ }
+
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority error", "err", err)
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 17ce0d798..28fd06e4d 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -415,9 +415,14 @@ func (p *Peer) removeClientParams(s Stream) error {
}
func (p *Peer) close() {
+ p.serverMu.Lock()
+ defer p.serverMu.Unlock()
+
for _, s := range p.servers {
s.Close()
}
+
+ p.servers = nil
}
// runUpdateSyncing is a long running function that creates the initial