aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go
blob: aa022826bb18a515a26da6c90fa6f405d7bddb13 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
package azblob

import (
    "context"
    "encoding/base64"
    "fmt"
    "io"
    "net/http"

    "bytes"
    "os"
    "sync"
    "time"

    "github.com/Azure/azure-pipeline-go/pipeline"
)

// CommonResponseHeaders returns the headers common to all blob REST API responses.
type CommonResponse interface {
    // ETag returns the value for header ETag.
    ETag() ETag

    // LastModified returns the value for header Last-Modified.
    LastModified() time.Time

    // RequestID returns the value for header x-ms-request-id.
    RequestID() string

    // Date returns the value for header Date.
    Date() time.Time

    // Version returns the value for header x-ms-version.
    Version() string

    // Response returns the raw HTTP response object.
    Response() *http.Response
}

// UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions.
type UploadToBlockBlobOptions struct {
    // BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes.
    BlockSize int64

    // Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL.
    Progress pipeline.ProgressReceiver

    // BlobHTTPHeaders indicates the HTTP headers to be associated with the blob.
    BlobHTTPHeaders BlobHTTPHeaders

    // Metadata indicates the metadata to be associated with the blob when PutBlockList is called.
    Metadata Metadata

    // AccessConditions indicates the access conditions for the block blob.
    AccessConditions BlobAccessConditions

    // Parallelism indicates the maximum number of blocks to upload in parallel (0=default)
    Parallelism uint16
}

// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
func UploadBufferToBlockBlob(ctx context.Context, b []byte,
    blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {

    // Validate parameters and set defaults
    if o.BlockSize < 0 || o.BlockSize > BlockBlobMaxUploadBlobBytes {
        panic(fmt.Sprintf("BlockSize option must be > 0 and <= %d", BlockBlobMaxUploadBlobBytes))
    }
    if o.BlockSize == 0 {
        o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
    }
    size := int64(len(b))

    if size <= BlockBlobMaxUploadBlobBytes {
        // If the size can fit in 1 Upload call, do it this way
        var body io.ReadSeeker = bytes.NewReader(b)
        if o.Progress != nil {
            body = pipeline.NewRequestBodyProgress(body, o.Progress)
        }
        return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
    }

    var numBlocks = uint16(((size - 1) / o.BlockSize) + 1)
    if numBlocks > BlockBlobMaxBlocks {
        panic(fmt.Sprintf("The buffer's size is too big or the BlockSize is too small; the number of blocks must be <= %d", BlockBlobMaxBlocks))
    }

    blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
    progress := int64(0)
    progressLock := &sync.Mutex{}

    err := doBatchTransfer(ctx, batchTransferOptions{
        operationName: "UploadBufferToBlockBlob",
        transferSize:  size,
        chunkSize:     o.BlockSize,
        parallelism:   o.Parallelism,
        operation: func(offset int64, count int64) error {
            // This function is called once per block.
            // It is passed this block's offset within the buffer and its count of bytes
            // Prepare to read the proper block/section of the buffer
            var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count])
            blockNum := offset / o.BlockSize
            if o.Progress != nil {
                blockProgress := int64(0)
                body = pipeline.NewRequestBodyProgress(body,
                    func(bytesTransferred int64) {
                        diff := bytesTransferred - blockProgress
                        blockProgress = bytesTransferred
                        progressLock.Lock() // 1 goroutine at a time gets a progress report
                        progress += diff
                        o.Progress(progress)
                        progressLock.Unlock()
                    })
            }

            // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
            // at the same time causing PutBlockList to get a mix of blocks from all the clients.
            blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
            _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions)
            return err
        },
    })
    if err != nil {
        return nil, err
    }
    // All put blocks were successful, call Put Block List to finalize the blob
    return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
}

// UploadFileToBlockBlob uploads a file in blocks to a block blob.
func UploadFileToBlockBlob(ctx context.Context, file *os.File,
    blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {

    stat, err := file.Stat()
    if err != nil {
        return nil, err
    }
    m := mmf{} // Default to an empty slice; used for 0-size file
    if stat.Size() != 0 {
        m, err = newMMF(file, false, 0, int(stat.Size()))
        if err != nil {
            return nil, err
        }
        defer m.unmap()
    }
    return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o)
}

///////////////////////////////////////////////////////////////////////////////


const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB

// DownloadFromAzureFileOptions identifies options used by the DownloadAzureFileToBuffer and DownloadAzureFileToFile functions.
type DownloadFromBlobOptions struct {
    // BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize.
    BlockSize int64

    // Progress is a function that is invoked periodically as bytes are received.
    Progress pipeline.ProgressReceiver

    // AccessConditions indicates the access conditions used when making HTTP GET requests against the blob.
    AccessConditions BlobAccessConditions

    // Parallelism indicates the maximum number of blocks to download in parallel (0=default)
    Parallelism uint16

    // RetryReaderOptionsPerBlock is used when downloading each block.
    RetryReaderOptionsPerBlock RetryReaderOptions
}

// downloadAzureFileToBuffer downloads an Azure file to a buffer with parallel.
func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
    ac BlobAccessConditions, b []byte, o DownloadFromBlobOptions,
    initialDownloadResponse *DownloadResponse) error {
    // Validate parameters, and set defaults.
    if o.BlockSize < 0 {
        panic("BlockSize option must be >= 0")
    }
    if o.BlockSize == 0 {
        o.BlockSize = BlobDefaultDownloadBlockSize
    }

    if offset < 0 {
        panic("offset option must be >= 0")
    }

    if count < 0 {
        panic("count option must be >= 0")
    }

    if count == CountToEnd { // If size not specified, calculate it
        if initialDownloadResponse != nil {
            count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it
        } else {
            // If we don't have the length at all, get it
            dr, err := blobURL.Download(ctx, 0, CountToEnd, ac, false)
            if err != nil {
                return err
            }
            count = dr.ContentLength() - offset
        }
    }

    if int64(len(b)) < count {
        panic(fmt.Errorf("the buffer's size should be equal to or larger than the request count of bytes: %d", count))
    }

    // Prepare and do parallel download.
    progress := int64(0)
    progressLock := &sync.Mutex{}

    err := doBatchTransfer(ctx, batchTransferOptions{
        operationName: "downloadBlobToBuffer",
        transferSize:    count,
        chunkSize:     o.BlockSize,
        parallelism:   o.Parallelism,
        operation: func(chunkStart int64, count int64) error {
            dr, err := blobURL.Download(ctx, chunkStart+ offset, count, ac, false)
            body := dr.Body(o.RetryReaderOptionsPerBlock)
            if o.Progress != nil {
                rangeProgress := int64(0)
                body = pipeline.NewResponseBodyProgress(
                    body,
                    func(bytesTransferred int64) {
                        diff := bytesTransferred - rangeProgress
                        rangeProgress = bytesTransferred
                        progressLock.Lock()
                        progress += diff
                        o.Progress(progress)
                        progressLock.Unlock()
                    })
            }
            _, err = io.ReadFull(body, b[chunkStart:chunkStart+count])
            body.Close()
            return err
        },
    })
    if err != nil {
        return err
    }
    return nil
}

// DownloadAzureFileToBuffer downloads an Azure file to a buffer with parallel.
// Offset and count are optional, pass 0 for both to download the entire blob.
func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
    ac BlobAccessConditions, b []byte, o DownloadFromBlobOptions) error {
    return downloadBlobToBuffer(ctx, blobURL, offset, count, ac, b, o, nil)
}

// DownloadBlobToFile downloads an Azure file to a local file.
// The file would be truncated if the size doesn't match.
// Offset and count are optional, pass 0 for both to download the entire blob.
func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64,
    ac BlobAccessConditions, file *os.File, o DownloadFromBlobOptions) error {
    // 1. Validate parameters.
    if file == nil {
        panic("file must not be nil")
    }

    // 2. Calculate the size of the destination file
    var size int64

    if count == CountToEnd {
        // Try to get Azure file's size
        props, err := blobURL.GetProperties(ctx, ac)
        if err != nil {
            return err
        }
        size = props.ContentLength() - offset
    } else {
        size = count
    }

    // 3. Compare and try to resize local file's size if it doesn't match Azure file's size.
    stat, err := file.Stat()
    if err != nil {
        return err
    }
    if stat.Size() != size {
        if err = file.Truncate(size); err != nil {
            return err
        }
    }

    if size > 0 {
        // 4. Set mmap and call DownloadAzureFileToBuffer.
        m, err := newMMF(file, true, 0, int(size))
        if err != nil {
            return err
        }
        defer m.unmap()
        return downloadBlobToBuffer(ctx, blobURL, offset, size, ac, m, o, nil)
    } else { // if the blob's size is 0, there is no need in downloading it
        return nil
    }
}


///////////////////////////////////////////////////////////////////////////////

// BatchTransferOptions identifies options used by doBatchTransfer.
type batchTransferOptions struct {
    transferSize  int64
    chunkSize     int64
    parallelism   uint16
    operation     func(offset int64, chunkSize int64) error
    operationName string
}

// doBatchTransfer helps to execute operations in a batch manner.
func doBatchTransfer(ctx context.Context, o batchTransferOptions) error {
    // Prepare and do parallel operations.
    numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1)
    operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently
    operationResponseChannel := make(chan error, numChunks)    // Holds each response
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Create the goroutines that process each operation (in parallel).
    if o.parallelism == 0 {
        o.parallelism = 5 // default parallelism
    }
    for g := uint16(0); g < o.parallelism; g++ {
        //grIndex := g
        go func() {
            for f := range operationChannel {
                //fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex)
                err := f()
                operationResponseChannel <- err
                //fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex)
            }
        }()
    }

    // Add each chunk's operation to the channel.
    for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
        curChunkSize := o.chunkSize

        if chunkNum == numChunks-1 { // Last chunk
            curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total
        }
        offset := int64(chunkNum) * o.chunkSize

        operationChannel <- func() error {
            return o.operation(offset, curChunkSize)
        }
    }
    close(operationChannel)

    // Wait for the operations to complete.
    for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
        responseError := <-operationResponseChannel
        if responseError != nil {
            cancel()             // As soon as any operation fails, cancel all remaining operation calls
            return responseError // No need to process anymore responses
        }
    }
    return nil
}

////////////////////////////////////////////////////////////////////////////////////////////////

type UploadStreamToBlockBlobOptions struct {
    BufferSize       int
    MaxBuffers       int
    BlobHTTPHeaders  BlobHTTPHeaders
    Metadata         Metadata
    AccessConditions BlobAccessConditions
}

func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL,
    o UploadStreamToBlockBlobOptions) (CommonResponse, error) {
    result, err := uploadStream(ctx, reader,
        UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers},
        &uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()})
    return result.(CommonResponse), err
}

type uploadStreamToBlockBlobOptions struct {
    b             BlockBlobURL
    o             UploadStreamToBlockBlobOptions
    blockIDPrefix uuid   // UUID used with all blockIDs
    maxBlockNum   uint32 // defaults to 0
    firstBlock    []byte // Used only if maxBlockNum is 0
}

func (t *uploadStreamToBlockBlobOptions) start(ctx context.Context) (interface{}, error) {
    return nil, nil
}

func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32, buffer []byte) error {
    if num == 0 && len(buffer) < t.o.BufferSize {
        // If whole payload fits in 1 block, don't stage it; End will upload it with 1 I/O operation
        t.firstBlock = buffer
        return nil
    }
    // Else, upload a staged block...
    AtomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
        // Atomically remember (in t.numBlocks) the maximum block num we've ever seen
        if startVal < num {
            return num, nil
        }
        return startVal, nil
    })
    blockID := newUuidBlockID(t.blockIDPrefix).WithBlockNumber(num).ToBase64()
    _, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{})
    return err
}

func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{}, error) {
    if t.maxBlockNum == 0 {
        // If whole payload fits in 1 block (block #0), upload it with 1 I/O operation
        return t.b.Upload(ctx, bytes.NewReader(t.firstBlock),
            t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions)
    }
    // Multiple blocks staged, commit them all now
    blockID := newUuidBlockID(t.blockIDPrefix)
    blockIDs := make([]string, t.maxBlockNum + 1)
    for bn := uint32(0); bn <= t.maxBlockNum; bn++ {
        blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64()
    }
    return t.b.CommitBlockList(ctx, blockIDs, t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions)
}

////////////////////////////////////////////////////////////////////////////////////////////////////

type iTransfer interface {
    start(ctx context.Context) (interface{}, error)
    chunk(ctx context.Context, num uint32, buffer []byte) error
    end(ctx context.Context) (interface{}, error)
}

type UploadStreamOptions struct {
    MaxBuffers int
    BufferSize int
}

func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) {
    ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything
    defer cancel()
    wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing
    type OutgoingMsg struct {
        chunkNum uint32
        buffer   []byte
    }

    // Create a channel to hold the buffers usable for incoming datsa
    incoming := make(chan []byte, o.MaxBuffers)
    outgoing := make(chan OutgoingMsg, o.MaxBuffers) // Channel holding outgoing buffers
    if result, err := t.start(ctx); err != nil {
        return result, err
    }

    numBuffers := 0 // The number of buffers & out going goroutines created so far
    injectBuffer := func() {
        // For each Buffer, create it and a goroutine to upload it
        incoming <- make([]byte, o.BufferSize) // Add the new buffer to the incoming channel so this goroutine can from the reader into it
        numBuffers++
        go func() {
            for outgoingMsg := range outgoing {
                // Upload the outgoing buffer
                err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer)
                wg.Done() // Indicate this buffer was sent
                if nil != err {
                    cancel()
                }
                incoming <- outgoingMsg.buffer // The goroutine reading from the stream can use reuse this buffer now
            }
        }()
    }
    injectBuffer() // Create our 1st buffer & outgoing goroutine

    // This goroutine grabs a buffer, reads from the stream into the buffer,
    // and inserts the buffer into the outgoing channel to be uploaded
    for c := uint32(0); true; c++ { // Iterate once per chunk
        var buffer []byte
        if numBuffers < o.MaxBuffers {
            select {
            // We're not at max buffers, see if a previously-created buffer is available
            case buffer = <-incoming:
                break
            default:
                // No buffer available; inject a new buffer & go routine to process it
                injectBuffer()
                buffer = <-incoming // Grab the just-injected buffer
            }
        } else {
            // We are at max buffers, block until we get to reuse one
            buffer = <-incoming
        }
        n, err := io.ReadFull(reader, buffer)
        if err != nil {
            buffer = buffer[:n] // Make slice match the # of read bytes
        }
        if len(buffer) > 0 {
            // Buffer not empty, upload it
            wg.Add(1) // We're posting a buffer to be sent
            outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer}
        }
        if err != nil { // The reader is done, no more outgoing buffers
            break
        }
    }
    // NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done
    close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty
    wg.Wait()       // Wait for all pending outgoing messages to complete
    // After all blocks uploaded, commit them to the blob & return the result
    return t.end(ctx)
}