aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/Azure/azure-pipeline-go/pipeline/core.go
blob: 0dde81d728c9938169c70365cddde1d158f0fc64 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
package pipeline

import (
    "context"
    "net"
    "net/http"
    "os"
    "time"
)

// The Factory interface represents an object that can create its Policy object. Each HTTP request sent
// requires that this Factory create a new instance of its Policy object.
type Factory interface {
    New(next Policy, po *PolicyOptions) Policy
}

// FactoryFunc is an adapter that allows the use of an ordinary function as a Factory interface.
type FactoryFunc func(next Policy, po *PolicyOptions) PolicyFunc

// New calls f(next,po).
func (f FactoryFunc) New(next Policy, po *PolicyOptions) Policy {
    return f(next, po)
}

// The Policy interface represents a mutable Policy object created by a Factory. The object can mutate/process
// the HTTP request and then forward it on to the next Policy object in the linked-list. The returned
// Response goes backward through the linked-list for additional processing.
// NOTE: Request is passed by value so changes do not change the caller's version of
// the request. However, Request has some fields that reference mutable objects (not strings).
// These references are copied; a deep copy is not performed. Specifically, this means that
// you should avoid modifying the objects referred to by these fields: URL, Header, Body,
// GetBody, TransferEncoding, Form, MultipartForm, Trailer, TLS, Cancel, and Response.
type Policy interface {
    Do(ctx context.Context, request Request) (Response, error)
}

// PolicyFunc is an adapter that allows the use of an ordinary function as a Policy interface.
type PolicyFunc func(ctx context.Context, request Request) (Response, error)

// Do calls f(ctx, request).
func (f PolicyFunc) Do(ctx context.Context, request Request) (Response, error) {
    return f(ctx, request)
}

// Options configures a Pipeline's behavior.
type Options struct {
    HTTPSender Factory // If sender is nil, then the pipeline's default client is used to send the HTTP requests.
    Log        LogOptions
}

// LogLevel tells a logger the minimum level to log. When code reports a log entry,
// the LogLevel indicates the level of the log entry. The logger only records entries
// whose level is at least the level it was told to log. See the Log* constants.
// For example, if a logger is configured with LogError, then LogError, LogPanic,
// and LogFatal entries will be logged; lower level entries are ignored.
type LogLevel uint32

const (
    // LogNone tells a logger not to log any entries passed to it.
    LogNone LogLevel = iota

    // LogFatal tells a logger to log all LogFatal entries passed to it.
    LogFatal

    // LogPanic tells a logger to log all LogPanic and LogFatal entries passed to it.
    LogPanic

    // LogError tells a logger to log all LogError, LogPanic and LogFatal entries passed to it.
    LogError

    // LogWarning tells a logger to log all LogWarning, LogError, LogPanic and LogFatal entries passed to it.
    LogWarning

    // LogInfo tells a logger to log all LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
    LogInfo

    // LogDebug tells a logger to log all LogDebug, LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
    LogDebug
)

// LogOptions configures the pipeline's logging mechanism & level filtering.
type LogOptions struct {
    Log func(level LogLevel, message string)

    // ShouldLog is called periodically allowing you to return whether the specified LogLevel should be logged or not.
    // An application can return different values over the its lifetime; this allows the application to dynamically
    // alter what is logged. NOTE: This method can be called by multiple goroutines simultaneously so make sure
    // you implement it in a goroutine-safe way. If nil, nothing is logged (the equivalent of returning LogNone).
    // Usually, the function will be implemented simply like this: return level <= LogWarning
    ShouldLog func(level LogLevel) bool
}

type pipeline struct {
    factories []Factory
    options   Options
}

// The Pipeline interface represents an ordered list of Factory objects and an object implementing the HTTPSender interface.
// You construct a Pipeline by calling the pipeline.NewPipeline function. To send an HTTP request, call pipeline.NewRequest
// and then call Pipeline's Do method passing a context, the request, and a method-specific Factory (or nil). Passing a
// method-specific Factory allows this one call to Do to inject a Policy into the linked-list. The policy is injected where
// the MethodFactoryMarker (see the pipeline.MethodFactoryMarker function) is in the slice of Factory objects.
//
// When Do is called, the Pipeline object asks each Factory object to construct its Policy object and adds each Policy to a linked-list.
// THen, Do sends the Context and Request through all the Policy objects. The final Policy object sends the request over the network
// (via the HTTPSender object passed to NewPipeline) and the response is returned backwards through all the Policy objects.
// Since Pipeline and Factory objects are goroutine-safe, you typically create 1 Pipeline object and reuse it to make many HTTP requests.
type Pipeline interface {
    Do(ctx context.Context, methodFactory Factory, request Request) (Response, error)
}

// NewPipeline creates a new goroutine-safe Pipeline object from the slice of Factory objects and the specified options.
func NewPipeline(factories []Factory, o Options) Pipeline {
    if o.HTTPSender == nil {
        o.HTTPSender = newDefaultHTTPClientFactory()
    }
    if o.Log.Log == nil {
        o.Log.Log = func(LogLevel, string) {} // No-op logger
    }
    return &pipeline{factories: factories, options: o}
}

// Do is called for each and every HTTP request. It tells each Factory to create its own (mutable) Policy object
// replacing a MethodFactoryMarker factory (if it exists) with the methodFactory passed in. Then, the Context and Request
// are sent through the pipeline of Policy objects (which can transform the Request's URL/query parameters/headers) and
// ultimately sends the transformed HTTP request over the network.
func (p *pipeline) Do(ctx context.Context, methodFactory Factory, request Request) (Response, error) {
    response, err := p.newPolicies(methodFactory).Do(ctx, request)
    request.close()
    return response, err
}

func (p *pipeline) newPolicies(methodFactory Factory) Policy {
    // The last Policy is the one that actually sends the request over the wire and gets the response.
    // It is overridable via the Options' HTTPSender field.
    po := &PolicyOptions{pipeline: p} // One object shared by all policy objects
    next := p.options.HTTPSender.New(nil, po)

    // Walk over the slice of Factory objects in reverse (from wire to API)
    markers := 0
    for i := len(p.factories) - 1; i >= 0; i-- {
        factory := p.factories[i]
        if _, ok := factory.(methodFactoryMarker); ok {
            markers++
            if markers > 1 {
                panic("MethodFactoryMarker can only appear once in the pipeline")
            }
            if methodFactory != nil {
                // Replace MethodFactoryMarker with passed-in methodFactory
                next = methodFactory.New(next, po)
            }
        } else {
            // Use the slice's Factory to construct its Policy
            next = factory.New(next, po)
        }
    }

    // Each Factory has created its Policy
    if markers == 0 && methodFactory != nil {
        panic("Non-nil methodFactory requires MethodFactoryMarker in the pipeline")
    }
    return next // Return head of the Policy object linked-list
}

// A PolicyOptions represents optional information that can be used by a node in the
// linked-list of Policy objects. A PolicyOptions is passed to the Factory's New method
// which passes it (if desired) to the Policy object it creates. Today, the Policy object
// uses the options to perform logging. But, in the future, this could be used for more.
type PolicyOptions struct {
    pipeline *pipeline
}

// ShouldLog returns true if the specified log level should be logged.
func (po *PolicyOptions) ShouldLog(level LogLevel) bool {
    if po.pipeline.options.Log.ShouldLog != nil {
        return po.pipeline.options.Log.ShouldLog(level)
    }
    return false
}

// Log logs a string to the Pipeline's Logger.
func (po *PolicyOptions) Log(level LogLevel, msg string) {
    if !po.ShouldLog(level) {
        return // Short circuit message formatting if we're not logging it
    }

    // We are logging it, ensure trailing newline
    if len(msg) == 0 || msg[len(msg)-1] != '\n' {
        msg += "\n" // Ensure trailing newline
    }
    po.pipeline.options.Log.Log(level, msg)

    // If logger doesn't handle fatal/panic, we'll do it here.
    if level == LogFatal {
        os.Exit(1)
    } else if level == LogPanic {
        panic(msg)
    }
}

var pipelineHTTPClient = newDefaultHTTPClient()

func newDefaultHTTPClient() *http.Client {
    // We want the Transport to have a large connection pool
    return &http.Client{
        Transport: &http.Transport{
            Proxy: http.ProxyFromEnvironment,
            // We use Dial instead of DialContext as DialContext has been reported to cause slower performance.
            Dial /*Context*/ : (&net.Dialer{
                Timeout:   30 * time.Second,
                KeepAlive: 30 * time.Second,
                DualStack: true,
            }).Dial, /*Context*/
            MaxIdleConns:           0, // No limit
            MaxIdleConnsPerHost:    100,
            IdleConnTimeout:        90 * time.Second,
            TLSHandshakeTimeout:    10 * time.Second,
            ExpectContinueTimeout:  1 * time.Second,
            DisableKeepAlives:      false,
            DisableCompression:     false,
            MaxResponseHeaderBytes: 0,
            //ResponseHeaderTimeout:  time.Duration{},
            //ExpectContinueTimeout:  time.Duration{},
        },
    }
}

// newDefaultHTTPClientFactory creates a DefaultHTTPClientPolicyFactory object that sends HTTP requests to a Go's default http.Client.
func newDefaultHTTPClientFactory() Factory {
    return FactoryFunc(func(next Policy, po *PolicyOptions) PolicyFunc {
        return func(ctx context.Context, request Request) (Response, error) {
            r, err := pipelineHTTPClient.Do(request.WithContext(ctx))
            if err != nil {
                err = NewError(err, "HTTP request failed")
            }
            return NewHTTPResponse(r), err
        }
    })
}

var mfm = methodFactoryMarker{} // Singleton

// MethodFactoryMarker returns a special marker Factory object. When Pipeline's Do method is called, any
// MethodMarkerFactory object is replaced with the specified methodFactory object. If nil is passed fro Do's
// methodFactory parameter, then the MethodFactoryMarker is ignored as the linked-list of Policy objects is created.
func MethodFactoryMarker() Factory {
    return mfm
}

type methodFactoryMarker struct {
}

func (methodFactoryMarker) New(next Policy, po *PolicyOptions) Policy {
    panic("methodFactoryMarker policy should have been replaced with a method policy")
}