aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/zc_policy_retry.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/zc_policy_retry.go')
-rw-r--r--vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/zc_policy_retry.go318
1 files changed, 318 insertions, 0 deletions
diff --git a/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/zc_policy_retry.go b/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/zc_policy_retry.go
new file mode 100644
index 000000000..4c885ea1a
--- /dev/null
+++ b/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/zc_policy_retry.go
@@ -0,0 +1,318 @@
+package azblob
+
+import (
+ "context"
+ "math/rand"
+ "net"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/Azure/azure-pipeline-go/pipeline"
+ "io/ioutil"
+ "io"
+)
+
+// RetryPolicy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants.
+type RetryPolicy int32
+
+const (
+ // RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy
+ RetryPolicyExponential RetryPolicy = 0
+
+ // RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy
+ RetryPolicyFixed RetryPolicy = 1
+)
+
+// RetryOptions configures the retry policy's behavior.
+type RetryOptions struct {
+ // Policy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants.\
+ // A value of zero means that you accept our default policy.
+ Policy RetryPolicy
+
+ // MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default).
+ // A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries.
+ MaxTries int32
+
+ // TryTimeout indicates the maximum time allowed for any single try of an HTTP request.
+ // A value of zero means that you accept our default timeout. NOTE: When transferring large amounts
+ // of data, the default TryTimeout will probably not be sufficient. You should override this value
+ // based on the bandwidth available to the host machine and proximity to the Storage service. A good
+ // starting point may be something like (60 seconds per MB of anticipated-payload-size).
+ TryTimeout time.Duration
+
+ // RetryDelay specifies the amount of delay to use before retrying an operation (0=default).
+ // When RetryPolicy is specified as RetryPolicyExponential, the delay increases exponentially
+ // with each retry up to a maximum specified by MaxRetryDelay.
+ // If you specify 0, then you must also specify 0 for MaxRetryDelay.
+ // If you specify RetryDelay, then you must also specify MaxRetryDelay, and MaxRetryDelay should be
+ // equal to or greater than RetryDelay.
+ RetryDelay time.Duration
+
+ // MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default).
+ // If you specify 0, then you must also specify 0 for RetryDelay.
+ MaxRetryDelay time.Duration
+
+ // RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host.
+ // If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host.
+ // NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent
+ // data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs
+ RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs
+}
+
+func (o RetryOptions) retryReadsFromSecondaryHost() string {
+ return o.RetryReadsFromSecondaryHost // This is for the Blob SDK only
+ //return "" // This is for non-blob SDKs
+}
+
+func (o RetryOptions) defaults() RetryOptions {
+ if o.Policy != RetryPolicyExponential && o.Policy != RetryPolicyFixed {
+ panic("RetryPolicy must be RetryPolicyExponential or RetryPolicyFixed")
+ }
+ if o.MaxTries < 0 {
+ panic("MaxTries must be >= 0")
+ }
+ if o.TryTimeout < 0 || o.RetryDelay < 0 || o.MaxRetryDelay < 0 {
+ panic("TryTimeout, RetryDelay, and MaxRetryDelay must all be >= 0")
+ }
+ if o.RetryDelay > o.MaxRetryDelay {
+ panic("RetryDelay must be <= MaxRetryDelay")
+ }
+ if (o.RetryDelay == 0 && o.MaxRetryDelay != 0) || (o.RetryDelay != 0 && o.MaxRetryDelay == 0) {
+ panic("Both RetryDelay and MaxRetryDelay must be 0 or neither can be 0")
+ }
+
+ IfDefault := func(current *time.Duration, desired time.Duration) {
+ if *current == time.Duration(0) {
+ *current = desired
+ }
+ }
+
+ // Set defaults if unspecified
+ if o.MaxTries == 0 {
+ o.MaxTries = 4
+ }
+ switch o.Policy {
+ case RetryPolicyExponential:
+ IfDefault(&o.TryTimeout, 1*time.Minute)
+ IfDefault(&o.RetryDelay, 4*time.Second)
+ IfDefault(&o.MaxRetryDelay, 120*time.Second)
+
+ case RetryPolicyFixed:
+ IfDefault(&o.TryTimeout, 1*time.Minute)
+ IfDefault(&o.RetryDelay, 30*time.Second)
+ IfDefault(&o.MaxRetryDelay, 120*time.Second)
+ }
+ return o
+}
+
+func (o RetryOptions) calcDelay(try int32) time.Duration { // try is >=1; never 0
+ pow := func(number int64, exponent int32) int64 { // pow is nested helper function
+ var result int64 = 1
+ for n := int32(0); n < exponent; n++ {
+ result *= number
+ }
+ return result
+ }
+
+ delay := time.Duration(0)
+ switch o.Policy {
+ case RetryPolicyExponential:
+ delay = time.Duration(pow(2, try-1)-1) * o.RetryDelay
+
+ case RetryPolicyFixed:
+ if try > 1 { // Any try after the 1st uses the fixed delay
+ delay = o.RetryDelay
+ }
+ }
+
+ // Introduce some jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
+ delay = time.Duration(delay.Seconds() * (rand.Float64()/2 + 0.8) * float64(time.Second)) // NOTE: We want math/rand; not crypto/rand
+ if delay > o.MaxRetryDelay {
+ delay = o.MaxRetryDelay
+ }
+ return delay
+}
+
+// NewRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.
+func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
+ o = o.defaults() // Force defaults to be calculated
+ return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
+ return func(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
+ // Before each try, we'll select either the primary or secondary URL.
+ primaryTry := int32(0) // This indicates how many tries we've attempted against the primary DC
+
+ // We only consider retrying against a secondary if we have a read request (GET/HEAD) AND this policy has a Secondary URL it can use
+ considerSecondary := (request.Method == http.MethodGet || request.Method == http.MethodHead) && o.retryReadsFromSecondaryHost() != ""
+
+ // Exponential retry algorithm: ((2 ^ attempt) - 1) * delay * random(0.8, 1.2)
+ // When to retry: connection failure or temporary/timeout. NOTE: StorageError considers HTTP 500/503 as temporary & is therefore retryable
+ // If using a secondary:
+ // Even tries go against primary; odd tries go against the secondary
+ // For a primary wait ((2 ^ primaryTries - 1) * delay * random(0.8, 1.2)
+ // If secondary gets a 404, don't fail, retry but future retries are only against the primary
+ // When retrying against a secondary, ignore the retry count and wait (.1 second * random(0.8, 1.2))
+ for try := int32(1); try <= o.MaxTries; try++ {
+ logf("\n=====> Try=%d\n", try)
+
+ // Determine which endpoint to try. It's primary if there is no secondary or if it is an add # attempt.
+ tryingPrimary := !considerSecondary || (try%2 == 1)
+ // Select the correct host and delay
+ if tryingPrimary {
+ primaryTry++
+ delay := o.calcDelay(primaryTry)
+ logf("Primary try=%d, Delay=%v\n", primaryTry, delay)
+ time.Sleep(delay) // The 1st try returns 0 delay
+ } else {
+ delay := time.Second * time.Duration(rand.Float32()/2+0.8)
+ logf("Secondary try=%d, Delay=%v\n", try-primaryTry, delay)
+ time.Sleep(delay) // Delay with some jitter before trying secondary
+ }
+
+ // Clone the original request to ensure that each try starts with the original (unmutated) request.
+ requestCopy := request.Copy()
+
+ // For each try, seek to the beginning of the Body stream. We do this even for the 1st try because
+ // the stream may not be at offset 0 when we first get it and we want the same behavior for the
+ // 1st try as for additional tries.
+ if err = requestCopy.RewindBody(); err != nil {
+ panic(err)
+ }
+ if !tryingPrimary {
+ requestCopy.Request.URL.Host = o.retryReadsFromSecondaryHost()
+ }
+
+ // Set the server-side timeout query parameter "timeout=[seconds]"
+ timeout := int32(o.TryTimeout.Seconds()) // Max seconds per try
+ if deadline, ok := ctx.Deadline(); ok { // If user's ctx has a deadline, make the timeout the smaller of the two
+ t := int32(deadline.Sub(time.Now()).Seconds()) // Duration from now until user's ctx reaches its deadline
+ logf("MaxTryTimeout=%d secs, TimeTilDeadline=%d sec\n", timeout, t)
+ if t < timeout {
+ timeout = t
+ }
+ if timeout < 0 {
+ timeout = 0 // If timeout ever goes negative, set it to zero; this happen while debugging
+ }
+ logf("TryTimeout adjusted to=%d sec\n", timeout)
+ }
+ q := requestCopy.Request.URL.Query()
+ q.Set("timeout", strconv.Itoa(int(timeout+1))) // Add 1 to "round up"
+ requestCopy.Request.URL.RawQuery = q.Encode()
+ logf("Url=%s\n", requestCopy.Request.URL.String())
+
+ // Set the time for this particular retry operation and then Do the operation.
+ tryCtx, tryCancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
+ //requestCopy.Body = &deadlineExceededReadCloser{r: requestCopy.Request.Body}
+ response, err = next.Do(tryCtx, requestCopy) // Make the request
+ /*err = improveDeadlineExceeded(err)
+ if err == nil {
+ response.Response().Body = &deadlineExceededReadCloser{r: response.Response().Body}
+ }*/
+ logf("Err=%v, response=%v\n", err, response)
+
+ action := "" // This MUST get changed within the switch code below
+ switch {
+ case ctx.Err() != nil:
+ action = "NoRetry: Op timeout"
+ case !tryingPrimary && response != nil && response.Response().StatusCode == http.StatusNotFound:
+ // If attempt was against the secondary & it returned a StatusNotFound (404), then
+ // the resource was not found. This may be due to replication delay. So, in this
+ // case, we'll never try the secondary again for this operation.
+ considerSecondary = false
+ action = "Retry: Secondary URL returned 404"
+ case err != nil:
+ // NOTE: Protocol Responder returns non-nil if REST API returns invalid status code for the invoked operation
+ if netErr, ok := err.(net.Error); ok && (netErr.Temporary() || netErr.Timeout()) {
+ action = "Retry: net.Error and Temporary() or Timeout()"
+ } else {
+ action = "NoRetry: unrecognized error"
+ }
+ default:
+ action = "NoRetry: successful HTTP request" // no error
+ }
+
+ logf("Action=%s\n", action)
+ // fmt.Println(action + "\n") // This is where we could log the retry operation; action is why we're retrying
+ if action[0] != 'R' { // Retry only if action starts with 'R'
+ if err != nil {
+ tryCancel() // If we're returning an error, cancel this current/last per-retry timeout context
+ } else {
+ // TODO: Right now, we've decided to leak the per-try Context until the user's Context is canceled.
+ // Another option is that we wrap the last per-try context in a body and overwrite the Response's Body field with our wrapper.
+ // So, when the user closes the Body, the our per-try context gets closed too.
+ // Another option, is that the Last Policy do this wrapping for a per-retry context (not for the user's context)
+ _ = tryCancel // So, for now, we don't call cancel: cancel()
+ }
+ break // Don't retry
+ }
+ if response != nil && response.Response() != nil && response.Response().Body != nil {
+ // If we're going to retry and we got a previous response, then flush its body to avoid leaking its TCP connection
+ body := response.Response().Body
+ io.Copy(ioutil.Discard, body)
+ body.Close()
+ }
+ // If retrying, cancel the current per-try timeout context
+ tryCancel()
+ }
+ return response, err // Not retryable or too many retries; return the last response/error
+ }
+ })
+}
+
+// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away
+var logf = func(format string, a ...interface{}) {}
+
+// Use this version to see the retry method's code path (import "fmt")
+//var logf = fmt.Printf
+
+/*
+type deadlineExceededReadCloser struct {
+ r io.ReadCloser
+}
+
+func (r *deadlineExceededReadCloser) Read(p []byte) (int, error) {
+ n, err := 0, io.EOF
+ if r.r != nil {
+ n, err = r.r.Read(p)
+ }
+ return n, improveDeadlineExceeded(err)
+}
+func (r *deadlineExceededReadCloser) Seek(offset int64, whence int) (int64, error) {
+ // For an HTTP request, the ReadCloser MUST also implement seek
+ // For an HTTP response, Seek MUST not be called (or this will panic)
+ o, err := r.r.(io.Seeker).Seek(offset, whence)
+ return o, improveDeadlineExceeded(err)
+}
+func (r *deadlineExceededReadCloser) Close() error {
+ if c, ok := r.r.(io.Closer); ok {
+ c.Close()
+ }
+ return nil
+}
+
+// timeoutError is the internal struct that implements our richer timeout error.
+type deadlineExceeded struct {
+ responseError
+}
+
+var _ net.Error = (*deadlineExceeded)(nil) // Ensure deadlineExceeded implements the net.Error interface at compile time
+
+// improveDeadlineExceeded creates a timeoutError object that implements the error interface IF cause is a context.DeadlineExceeded error.
+func improveDeadlineExceeded(cause error) error {
+ // If cause is not DeadlineExceeded, return the same error passed in.
+ if cause != context.DeadlineExceeded {
+ return cause
+ }
+ // Else, convert DeadlineExceeded to our timeoutError which gives a richer string message
+ return &deadlineExceeded{
+ responseError: responseError{
+ ErrorNode: pipeline.ErrorNode{}.Initialize(cause, 3),
+ },
+ }
+}
+
+// Error implements the error interface's Error method to return a string representation of the error.
+func (e *deadlineExceeded) Error() string {
+ return e.ErrorNode.Error("context deadline exceeded; when creating a pipeline, consider increasing RetryOptions' TryTimeout field")
+}
+*/