aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go')
-rw-r--r--vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go510
1 files changed, 510 insertions, 0 deletions
diff --git a/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go b/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go
new file mode 100644
index 000000000..aa022826b
--- /dev/null
+++ b/vendor/github.com/Azure/azure-storage-blob-go/2018-03-28/azblob/highlevel.go
@@ -0,0 +1,510 @@
+package azblob
+
+import (
+ "context"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "net/http"
+
+ "bytes"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/Azure/azure-pipeline-go/pipeline"
+)
+
+// CommonResponseHeaders returns the headers common to all blob REST API responses.
+type CommonResponse interface {
+ // ETag returns the value for header ETag.
+ ETag() ETag
+
+ // LastModified returns the value for header Last-Modified.
+ LastModified() time.Time
+
+ // RequestID returns the value for header x-ms-request-id.
+ RequestID() string
+
+ // Date returns the value for header Date.
+ Date() time.Time
+
+ // Version returns the value for header x-ms-version.
+ Version() string
+
+ // Response returns the raw HTTP response object.
+ Response() *http.Response
+}
+
+// UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions.
+type UploadToBlockBlobOptions struct {
+ // BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes.
+ BlockSize int64
+
+ // Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL.
+ Progress pipeline.ProgressReceiver
+
+ // BlobHTTPHeaders indicates the HTTP headers to be associated with the blob.
+ BlobHTTPHeaders BlobHTTPHeaders
+
+ // Metadata indicates the metadata to be associated with the blob when PutBlockList is called.
+ Metadata Metadata
+
+ // AccessConditions indicates the access conditions for the block blob.
+ AccessConditions BlobAccessConditions
+
+ // Parallelism indicates the maximum number of blocks to upload in parallel (0=default)
+ Parallelism uint16
+}
+
+// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
+func UploadBufferToBlockBlob(ctx context.Context, b []byte,
+ blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
+
+ // Validate parameters and set defaults
+ if o.BlockSize < 0 || o.BlockSize > BlockBlobMaxUploadBlobBytes {
+ panic(fmt.Sprintf("BlockSize option must be > 0 and <= %d", BlockBlobMaxUploadBlobBytes))
+ }
+ if o.BlockSize == 0 {
+ o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
+ }
+ size := int64(len(b))
+
+ if size <= BlockBlobMaxUploadBlobBytes {
+ // If the size can fit in 1 Upload call, do it this way
+ var body io.ReadSeeker = bytes.NewReader(b)
+ if o.Progress != nil {
+ body = pipeline.NewRequestBodyProgress(body, o.Progress)
+ }
+ return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
+ }
+
+ var numBlocks = uint16(((size - 1) / o.BlockSize) + 1)
+ if numBlocks > BlockBlobMaxBlocks {
+ panic(fmt.Sprintf("The buffer's size is too big or the BlockSize is too small; the number of blocks must be <= %d", BlockBlobMaxBlocks))
+ }
+
+ blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
+ progress := int64(0)
+ progressLock := &sync.Mutex{}
+
+ err := doBatchTransfer(ctx, batchTransferOptions{
+ operationName: "UploadBufferToBlockBlob",
+ transferSize: size,
+ chunkSize: o.BlockSize,
+ parallelism: o.Parallelism,
+ operation: func(offset int64, count int64) error {
+ // This function is called once per block.
+ // It is passed this block's offset within the buffer and its count of bytes
+ // Prepare to read the proper block/section of the buffer
+ var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count])
+ blockNum := offset / o.BlockSize
+ if o.Progress != nil {
+ blockProgress := int64(0)
+ body = pipeline.NewRequestBodyProgress(body,
+ func(bytesTransferred int64) {
+ diff := bytesTransferred - blockProgress
+ blockProgress = bytesTransferred
+ progressLock.Lock() // 1 goroutine at a time gets a progress report
+ progress += diff
+ o.Progress(progress)
+ progressLock.Unlock()
+ })
+ }
+
+ // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
+ // at the same time causing PutBlockList to get a mix of blocks from all the clients.
+ blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
+ _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions)
+ return err
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ // All put blocks were successful, call Put Block List to finalize the blob
+ return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
+}
+
+// UploadFileToBlockBlob uploads a file in blocks to a block blob.
+func UploadFileToBlockBlob(ctx context.Context, file *os.File,
+ blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
+
+ stat, err := file.Stat()
+ if err != nil {
+ return nil, err
+ }
+ m := mmf{} // Default to an empty slice; used for 0-size file
+ if stat.Size() != 0 {
+ m, err = newMMF(file, false, 0, int(stat.Size()))
+ if err != nil {
+ return nil, err
+ }
+ defer m.unmap()
+ }
+ return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o)
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+
+const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB
+
+// DownloadFromAzureFileOptions identifies options used by the DownloadAzureFileToBuffer and DownloadAzureFileToFile functions.
+type DownloadFromBlobOptions struct {
+ // BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize.
+ BlockSize int64
+
+ // Progress is a function that is invoked periodically as bytes are received.
+ Progress pipeline.ProgressReceiver
+
+ // AccessConditions indicates the access conditions used when making HTTP GET requests against the blob.
+ AccessConditions BlobAccessConditions
+
+ // Parallelism indicates the maximum number of blocks to download in parallel (0=default)
+ Parallelism uint16
+
+ // RetryReaderOptionsPerBlock is used when downloading each block.
+ RetryReaderOptionsPerBlock RetryReaderOptions
+}
+
+// downloadAzureFileToBuffer downloads an Azure file to a buffer with parallel.
+func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
+ ac BlobAccessConditions, b []byte, o DownloadFromBlobOptions,
+ initialDownloadResponse *DownloadResponse) error {
+ // Validate parameters, and set defaults.
+ if o.BlockSize < 0 {
+ panic("BlockSize option must be >= 0")
+ }
+ if o.BlockSize == 0 {
+ o.BlockSize = BlobDefaultDownloadBlockSize
+ }
+
+ if offset < 0 {
+ panic("offset option must be >= 0")
+ }
+
+ if count < 0 {
+ panic("count option must be >= 0")
+ }
+
+ if count == CountToEnd { // If size not specified, calculate it
+ if initialDownloadResponse != nil {
+ count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it
+ } else {
+ // If we don't have the length at all, get it
+ dr, err := blobURL.Download(ctx, 0, CountToEnd, ac, false)
+ if err != nil {
+ return err
+ }
+ count = dr.ContentLength() - offset
+ }
+ }
+
+ if int64(len(b)) < count {
+ panic(fmt.Errorf("the buffer's size should be equal to or larger than the request count of bytes: %d", count))
+ }
+
+ // Prepare and do parallel download.
+ progress := int64(0)
+ progressLock := &sync.Mutex{}
+
+ err := doBatchTransfer(ctx, batchTransferOptions{
+ operationName: "downloadBlobToBuffer",
+ transferSize: count,
+ chunkSize: o.BlockSize,
+ parallelism: o.Parallelism,
+ operation: func(chunkStart int64, count int64) error {
+ dr, err := blobURL.Download(ctx, chunkStart+ offset, count, ac, false)
+ body := dr.Body(o.RetryReaderOptionsPerBlock)
+ if o.Progress != nil {
+ rangeProgress := int64(0)
+ body = pipeline.NewResponseBodyProgress(
+ body,
+ func(bytesTransferred int64) {
+ diff := bytesTransferred - rangeProgress
+ rangeProgress = bytesTransferred
+ progressLock.Lock()
+ progress += diff
+ o.Progress(progress)
+ progressLock.Unlock()
+ })
+ }
+ _, err = io.ReadFull(body, b[chunkStart:chunkStart+count])
+ body.Close()
+ return err
+ },
+ })
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// DownloadAzureFileToBuffer downloads an Azure file to a buffer with parallel.
+// Offset and count are optional, pass 0 for both to download the entire blob.
+func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
+ ac BlobAccessConditions, b []byte, o DownloadFromBlobOptions) error {
+ return downloadBlobToBuffer(ctx, blobURL, offset, count, ac, b, o, nil)
+}
+
+// DownloadBlobToFile downloads an Azure file to a local file.
+// The file would be truncated if the size doesn't match.
+// Offset and count are optional, pass 0 for both to download the entire blob.
+func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64,
+ ac BlobAccessConditions, file *os.File, o DownloadFromBlobOptions) error {
+ // 1. Validate parameters.
+ if file == nil {
+ panic("file must not be nil")
+ }
+
+ // 2. Calculate the size of the destination file
+ var size int64
+
+ if count == CountToEnd {
+ // Try to get Azure file's size
+ props, err := blobURL.GetProperties(ctx, ac)
+ if err != nil {
+ return err
+ }
+ size = props.ContentLength() - offset
+ } else {
+ size = count
+ }
+
+ // 3. Compare and try to resize local file's size if it doesn't match Azure file's size.
+ stat, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ if stat.Size() != size {
+ if err = file.Truncate(size); err != nil {
+ return err
+ }
+ }
+
+ if size > 0 {
+ // 4. Set mmap and call DownloadAzureFileToBuffer.
+ m, err := newMMF(file, true, 0, int(size))
+ if err != nil {
+ return err
+ }
+ defer m.unmap()
+ return downloadBlobToBuffer(ctx, blobURL, offset, size, ac, m, o, nil)
+ } else { // if the blob's size is 0, there is no need in downloading it
+ return nil
+ }
+}
+
+
+///////////////////////////////////////////////////////////////////////////////
+
+// BatchTransferOptions identifies options used by doBatchTransfer.
+type batchTransferOptions struct {
+ transferSize int64
+ chunkSize int64
+ parallelism uint16
+ operation func(offset int64, chunkSize int64) error
+ operationName string
+}
+
+// doBatchTransfer helps to execute operations in a batch manner.
+func doBatchTransfer(ctx context.Context, o batchTransferOptions) error {
+ // Prepare and do parallel operations.
+ numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1)
+ operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently
+ operationResponseChannel := make(chan error, numChunks) // Holds each response
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // Create the goroutines that process each operation (in parallel).
+ if o.parallelism == 0 {
+ o.parallelism = 5 // default parallelism
+ }
+ for g := uint16(0); g < o.parallelism; g++ {
+ //grIndex := g
+ go func() {
+ for f := range operationChannel {
+ //fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex)
+ err := f()
+ operationResponseChannel <- err
+ //fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex)
+ }
+ }()
+ }
+
+ // Add each chunk's operation to the channel.
+ for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
+ curChunkSize := o.chunkSize
+
+ if chunkNum == numChunks-1 { // Last chunk
+ curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total
+ }
+ offset := int64(chunkNum) * o.chunkSize
+
+ operationChannel <- func() error {
+ return o.operation(offset, curChunkSize)
+ }
+ }
+ close(operationChannel)
+
+ // Wait for the operations to complete.
+ for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
+ responseError := <-operationResponseChannel
+ if responseError != nil {
+ cancel() // As soon as any operation fails, cancel all remaining operation calls
+ return responseError // No need to process anymore responses
+ }
+ }
+ return nil
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////
+
+type UploadStreamToBlockBlobOptions struct {
+ BufferSize int
+ MaxBuffers int
+ BlobHTTPHeaders BlobHTTPHeaders
+ Metadata Metadata
+ AccessConditions BlobAccessConditions
+}
+
+func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL,
+ o UploadStreamToBlockBlobOptions) (CommonResponse, error) {
+ result, err := uploadStream(ctx, reader,
+ UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers},
+ &uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()})
+ return result.(CommonResponse), err
+}
+
+type uploadStreamToBlockBlobOptions struct {
+ b BlockBlobURL
+ o UploadStreamToBlockBlobOptions
+ blockIDPrefix uuid // UUID used with all blockIDs
+ maxBlockNum uint32 // defaults to 0
+ firstBlock []byte // Used only if maxBlockNum is 0
+}
+
+func (t *uploadStreamToBlockBlobOptions) start(ctx context.Context) (interface{}, error) {
+ return nil, nil
+}
+
+func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32, buffer []byte) error {
+ if num == 0 && len(buffer) < t.o.BufferSize {
+ // If whole payload fits in 1 block, don't stage it; End will upload it with 1 I/O operation
+ t.firstBlock = buffer
+ return nil
+ }
+ // Else, upload a staged block...
+ AtomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
+ // Atomically remember (in t.numBlocks) the maximum block num we've ever seen
+ if startVal < num {
+ return num, nil
+ }
+ return startVal, nil
+ })
+ blockID := newUuidBlockID(t.blockIDPrefix).WithBlockNumber(num).ToBase64()
+ _, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{})
+ return err
+}
+
+func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{}, error) {
+ if t.maxBlockNum == 0 {
+ // If whole payload fits in 1 block (block #0), upload it with 1 I/O operation
+ return t.b.Upload(ctx, bytes.NewReader(t.firstBlock),
+ t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions)
+ }
+ // Multiple blocks staged, commit them all now
+ blockID := newUuidBlockID(t.blockIDPrefix)
+ blockIDs := make([]string, t.maxBlockNum + 1)
+ for bn := uint32(0); bn <= t.maxBlockNum; bn++ {
+ blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64()
+ }
+ return t.b.CommitBlockList(ctx, blockIDs, t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions)
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+type iTransfer interface {
+ start(ctx context.Context) (interface{}, error)
+ chunk(ctx context.Context, num uint32, buffer []byte) error
+ end(ctx context.Context) (interface{}, error)
+}
+
+type UploadStreamOptions struct {
+ MaxBuffers int
+ BufferSize int
+}
+
+func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) {
+ ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything
+ defer cancel()
+ wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing
+ type OutgoingMsg struct {
+ chunkNum uint32
+ buffer []byte
+ }
+
+ // Create a channel to hold the buffers usable for incoming datsa
+ incoming := make(chan []byte, o.MaxBuffers)
+ outgoing := make(chan OutgoingMsg, o.MaxBuffers) // Channel holding outgoing buffers
+ if result, err := t.start(ctx); err != nil {
+ return result, err
+ }
+
+ numBuffers := 0 // The number of buffers & out going goroutines created so far
+ injectBuffer := func() {
+ // For each Buffer, create it and a goroutine to upload it
+ incoming <- make([]byte, o.BufferSize) // Add the new buffer to the incoming channel so this goroutine can from the reader into it
+ numBuffers++
+ go func() {
+ for outgoingMsg := range outgoing {
+ // Upload the outgoing buffer
+ err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer)
+ wg.Done() // Indicate this buffer was sent
+ if nil != err {
+ cancel()
+ }
+ incoming <- outgoingMsg.buffer // The goroutine reading from the stream can use reuse this buffer now
+ }
+ }()
+ }
+ injectBuffer() // Create our 1st buffer & outgoing goroutine
+
+ // This goroutine grabs a buffer, reads from the stream into the buffer,
+ // and inserts the buffer into the outgoing channel to be uploaded
+ for c := uint32(0); true; c++ { // Iterate once per chunk
+ var buffer []byte
+ if numBuffers < o.MaxBuffers {
+ select {
+ // We're not at max buffers, see if a previously-created buffer is available
+ case buffer = <-incoming:
+ break
+ default:
+ // No buffer available; inject a new buffer & go routine to process it
+ injectBuffer()
+ buffer = <-incoming // Grab the just-injected buffer
+ }
+ } else {
+ // We are at max buffers, block until we get to reuse one
+ buffer = <-incoming
+ }
+ n, err := io.ReadFull(reader, buffer)
+ if err != nil {
+ buffer = buffer[:n] // Make slice match the # of read bytes
+ }
+ if len(buffer) > 0 {
+ // Buffer not empty, upload it
+ wg.Add(1) // We're posting a buffer to be sent
+ outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer}
+ }
+ if err != nil { // The reader is done, no more outgoing buffers
+ break
+ }
+ }
+ // NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done
+ close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty
+ wg.Wait() // Wait for all pending outgoing messages to complete
+ // After all blocks uploaded, commit them to the blob & return the result
+ return t.end(ctx)
+}