Saturday, March 31, 2018

Copy large data objects from Azure BLOB to Amazon S3

Scenario

This article is going to focus on providing guidance and library for copying Azure BLOB to Amazon S3. Using .NET (C#), have created few classes which will help us to achieve copying large BLOB into Amazon S3.

Points to be considered

  1. As we're targeting large data objects to be copied, we need perform Multipart upload. 
  2. From Azure BLOB, in order to download data in parts, we can utilize DownloadRangeToStream method. 
  3. Amazon S3 also provides us the facility to perform Multipart upload using the APIs exposed through AWS .NET SDK.
    1. But first we need to understand the nature and different options the SDK provides and select which best suit of needs.
    2. AWS .NET SDK for Multipart upload provides High-Level API and Low-Level API.
    3. High-Level API provides us with some sophisticated way of performing few common operations on S3 by encapsulating core level implementations.
    4. Low-Level API as you might have guessed, exposes the core Multipart operations on S3. 
    5. For our scenario, Low-Level API will be the best fit as we need few detailed control over the operations.
    6. Maximum of 10000 parts are allowed per upload.
    7. Part size can be of range 5MB to 5GB with an exception to the last part which can be less than 5MB.
  4. To handle our scenario, we're going to stick to 100MB as part size in our library.

Library Source code

Following classes has been created to facilitate developers utilize as library encapsulating the implementation logic of the copy operation and provide Request and Response object for ease of use and code management.
  1. BlobToS3Manager: Holds the implementation logic of copying BLOB data int S3.
  2. IBlobToS3Manager: Prototype of BlobToS3Manager.
  3. BlobToS3Request: To hold the BLOB source and S3 target data needed for copy operation.
  4. BlobToS3Response: To hold the response of the copy operation.   
You can find the above 4 library files in my github directory here.

Snippet 1 (BlobToS3Manager)

namespace AWSSampleConsoleApp1.BlobToS3
{
    using Amazon.S3.Model;
    using Microsoft.WindowsAzure.Storage.Blob;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Security.Cryptography;
    using System.Threading.Tasks;

    /// <summary>
    /// Handles the BLOB to S3 
    /// </summary>
    public class BlobToS3Manager : IBlobToS3Manager
    {
        /// <summary>
        /// The container not exists.
        /// </summary>
        private const string ContainerNotExists = "BLOB Container doesn't exists.";

        /// <summary>
        /// The BLOB not exists.
        /// </summary>
        private const string BlobNotExists = "BLOB doesn't exists.";

        /// <summary>
        /// Part size to read from BLOB and upload to S3.
        /// </summary>
        private const long PartSize = 104857600; // 100 MB.

        /// <summary>
        /// The BLOB S3 request.
        /// </summary>
        private readonly BlobToS3Request blobToS3Request;
        /// <summary>
        /// Initializes new instance of BLOB to S3 Manager.
        /// </summary>
        /// <param name="blobToS3Request">The BLOB to S3 request.</param>
        public BlobToS3Manager(BlobToS3Request blobToS3Request)
        {
            this.blobToS3Request = blobToS3Request;
        }

        /// <summary>
        /// Copies BLOB to S3.
        /// </summary>
        /// <returns>The BLOB to S3 response.</returns>
        public async Task<BlobToS3Response> CopyFromBlobToS3Async()
        {
            BlobToS3Response blobToS3Response = new BlobToS3Response();
            var validation = await this.Validate();

            if (!validation.Item1)
            {
                return validation.Item2;
            }

            var sourceBlob = validation.Item3;
            await sourceBlob.FetchAttributesAsync();
            var remainingBytes = sourceBlob.Properties.Length;
            long readPosition = 0; // To be used offset / position from where to start reading from BLOB.

            InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest
            {
                BucketName = this.blobToS3Request.TargetS3Bucket,
                Key = this.blobToS3Request.TargetS3File
            };

            // Will use UploadId from this response.
            InitiateMultipartUploadResponse initiateMultipartUploadResponse = this.blobToS3Request.S3Client.InitiateMultipartUpload(initiateMultipartUploadRequest);
            List<UploadPartResponse> uploadPartResponses = new List<UploadPartResponse>();

            try
            {
                int partCounter = 0; // To increment on each read of parts and use it as part number.
                var sha256 = new SHA256Managed();

                while (remainingBytes > 0)
                {
                    // Determine the size when final block reached as it might be less than Part size. 
                    // Will be PartSize except final block.
                    long bytesToCopy = Math.Min(PartSize, remainingBytes);

                    using (MemoryStream memoryStream = new MemoryStream())
                    {
                        // To download part from BLOB.
                        await sourceBlob.DownloadRangeToStreamAsync(memoryStream, readPosition, bytesToCopy).ConfigureAwait(false);
                        memoryStream.Position = 0;
                        partCounter++;

                        UploadPartRequest uploadRequest = new UploadPartRequest
                        {
                            BucketName = this.blobToS3Request.TargetS3Bucket,
                            Key = this.blobToS3Request.TargetS3File,
                            UploadId = initiateMultipartUploadResponse.UploadId,
                            PartNumber = partCounter,
                            PartSize = bytesToCopy,
                            InputStream = memoryStream
                        };

                        UploadPartResponse uploadPartResponse = this.blobToS3Request.S3Client.UploadPart(uploadRequest);
                        uploadPartResponses.Add(uploadPartResponse);

                        remainingBytes -= bytesToCopy;
                        readPosition += bytesToCopy;

                        // $"Uploaded part with part number {partCounter}, size {bytesToCopy}bytes and remaining {remainingBytes}bytes to read.")

                        // Calculate the checksum value.
                        if (remainingBytes <= 0)
                        {
                            sha256.TransformFinalBlock(memoryStream.ToArray(), 0, (int)bytesToCopy);
                        }
                        else
                        {
                            byte[] bytesToSend = memoryStream.ToArray();
                            sha256.TransformBlock(bytesToSend, 0, (int)bytesToCopy, bytesToSend, 0);
                        }
                    }
                }

                blobToS3Response.Sha256CheckSum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);

                CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest
                {
                    BucketName = this.blobToS3Request.TargetS3Bucket,
                    Key = this.blobToS3Request.TargetS3File,
                    UploadId = initiateMultipartUploadResponse.UploadId
                };

                completeMultipartUploadRequest.AddPartETags(uploadPartResponses);

                CompleteMultipartUploadResponse completeMultipartUploadResponse = await this.blobToS3Request.S3Client.CompleteMultipartUploadAsync(completeMultipartUploadRequest).ConfigureAwait(false);

                blobToS3Response.IsSuccess = true;
                blobToS3Response.S3Path = completeMultipartUploadResponse.Location;
            }
            catch (Exception exception)
            {
                blobToS3Response.IsSuccess = false;
                blobToS3Response.Message = exception.Message;

                AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest
                {
                    BucketName = this.blobToS3Request.TargetS3Bucket,
                    Key = this.blobToS3Request.TargetS3File,
                    UploadId = initiateMultipartUploadResponse.UploadId
                };

                await this.blobToS3Request.S3Client.AbortMultipartUploadAsync(abortMultipartUploadRequest).ConfigureAwait(false);
            }

            return blobToS3Response;
        }

        /// <summary>
        /// Validates the source BLOB is valid.
        /// </summary>
        /// <returns>The resultant tuple.</returns>
        private async Task<Tuple<bool, BlobToS3Response, CloudBlockBlob>> Validate()
        {
            BlobToS3Response blobToS3Response = null;
            CloudBlobContainer cloudBlobContainer = this.blobToS3Request.BlobClient.GetContainerReference(this.blobToS3Request.SourceBlobContainer);

            if (!await cloudBlobContainer.ExistsAsync())
            {
                blobToS3Response = new BlobToS3Response
                {
                    IsSuccess = false,
                    Message = ContainerNotExists
                };

                return new Tuple<bool, BlobToS3Response, CloudBlockBlob>(false, blobToS3Response, null);
            }

            CloudBlockBlob cloudBlockBlob = cloudBlobContainer.GetBlockBlobReference(this.blobToS3Request.SourceBlob);

            if (await cloudBlockBlob.ExistsAsync())
            {
                return new Tuple<bool, BlobToS3Response, CloudBlockBlob>(true, null, cloudBlockBlob);
            }

            blobToS3Response = new BlobToS3Response
            {
                IsSuccess = false,
                Message = BlobNotExists
            };

            return new Tuple<bool, BlobToS3Response, CloudBlockBlob>(false, blobToS3Response, null);
        }

    }
}

Snippet 2 (CopyFromBlobToS3: Sample on how to create instances and call the library)


Full source code can be found in my github repo here.

Sunday, February 18, 2018

Calculating Hash values / checksum for files while we read them as Streams in chunks

Introduction

You might need to encounter situations where you need to calculate the checksum of file / stream while transmitting across the wire. Nowadays, it is common to transmit the file stream frequently and during that scenario, you need to ensure data has not been corrupted during this transmission process. For which in the receiving end you need to use the same algorithm and recalculate the checksum to ensure the transmitted data is not corrupted.

Scenario

Let me take the same scenario I explained in my previous post Download large files as chunks and upload them into BLOB. In which we downloaded and transmitted large file as stream in chunks. There are lot of articles over web explaining calculating checksum for full file stream. Here we'll see the snippet below for the case of calculating checksum for chunks and get them accumulated at the end.


Tips

HashAlgorithm.TransformBlock and HashAlgorithm.TransformFinalBlock will help you achieve this.

Snippet

public class LargeFileProcessor
    {       
        /// <summary>
        /// Logger instance.
        /// </summary>
        private ILogger logger = new Logger();

        /// <summary>
        /// Download Large File as chunk and upload as chunk into BLOB.
        /// </summary>
        public async Task ProcessLargeFile()
        {
            // Trimmed for brevity.

            string urlToDownload = CloudConfigurationManager.GetSetting("DownloadURL"); // Provide valid URL from where the large file can be downloaded.

            Stopwatch stopwatch = Stopwatch.StartNew();

            try
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    var httpRequestMessage = new HttpRequestMessage(HttpMethod.Get, new Uri(urlToDownload))
                    {
                        // To avoid error related to 'An existing connection was forcibly closed by the remote host'. Use Http1.0 instead of Http1.1.
                        Version = HttpVersion.Version10
                    };

                    using (HttpResponseMessage response = await httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false))
                    {
                        using (Stream stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
                        {
                            const int pageSizeInBytes = 104857600; // 100MB. As Blob chunk max size is 100MB as of now.

                            var sha256 = new SHA256Managed();

                            var bytesRemaing = response.Content.Headers.ContentLength.Value; // Read Total file size from the header.

                            while (bytesRemaing > 0)
                            {
                                var bytesToCopy = (int)Math.Min(bytesRemaing, pageSizeInBytes);
                                var bytesToSend = new byte[bytesToCopy];

                                var bytesCountRead = await ReadStreamAndAccumulate(stream, bytesToSend, bytesToCopy);

                                // Instead of calculating bytes remaining to exit the While loop,  we can use bytesCountRead as bytesCountRead will be 0 when there are no more bytes to read form the stream.   
                                bytesRemaing -= bytesCountRead;

                                // Calculate the checksum value.
                                if (bytesRemaing <= 0)
                                {
                                    sha256.TransformFinalBlock(bytesToSend, 0, bytesCountRead);
                                }
                                else
                                {
                                    sha256.TransformBlock(bytesToSend, 0, bytesCountRead, bytesToSend, 0);
                                }
                            }

                            var checksum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);
                            this.logger.WriteLine($"Hash value is : {checksum}");

                            await Task.FromResult(0);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                this.logger.WriteLine(ex.Message);
                throw;
            }
            finally
            {
                stopwatch.Stop();
                this.logger.WriteLine($"Execution time in mins: {stopwatch.Elapsed.TotalMinutes}");
            }
        }

        /// <summary>
        /// Read the stream and accumulate till it reaches the number of bytes specified to copy.
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamAndAccumulate(Stream stream, byte[] bytesToSend, int bytesCountToCopy)
        {
                        // Trimmed for brevity.
        }

        /// <summary>
        /// Reads the stream with retry when failed. 
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamWithRetry(Stream stream, byte[] bytesToSend, int bytesCountToCopy, int offset)
        {
                        // Trimmed for brevity.
        }
    }


In the above Snippet I've trimmed set of code to just focus on minimalist snippet to portray calculating checksum for chunk read. You can find full source code in my github repo here.

References

Sunday, February 11, 2018

Download large files as chunks and upload them into BLOB

Introduction

In real case scenarios there are time where we need to download large files from from a Web resource and save it to Azure BLOB storage. Even though there are few articles over Web which helped, I'm not able to get a end to end working solution for files with size ranging from 20GB to 30GB. Have detailed below challenges and related solutions for your convenience.

Problem statement

Download large files from the Web resource and upload them into Azure BLOB storage.

Challenges and Solutions

  1. First we'll hit memory issue when we try to read the full stream and load all bytes into memory. An article here explains well how to avoid this Memory issue.
  2. Another article here detailed how to read FileStream  as chunks and upload them into BLOB. Using the details from this and #1 we can try to achieve the solution. 
    1. Use Stream.Read by passing respective parameters. In that one of the parameter is maximum number of bytes to read from the current stream. And  the Stream.Read method returns the bytes read. But that also got into issue as the Stream.Read method returns total number of bytes read which can be less than the maximum count parameter. Detailed documentation of Stream.Read can be found here
    2.  Along with that, we can use CloudBlockBlob.PutBlock method to upload read chunks into BLOB. Points to note here are, each block (chunk) in BLOB can be maximum of 100MB in size and at the max you can have 50000 blocks (100MB x 50000 blocks = 4.75TB). Detailed documentation can be found here
    3. Even though #2.1 and #2.2 looks straight forward, there might be issues because Read can read less number of bytes which in turn increases the number of blocks (more than 50000 limit) in the Azure BLOB storage.
    4. In order to avoid issue mentioned above in #2.3, we need to accumulate the resultant of  Stream.Read till it reaches the expected size (100MB in our case).
  3. Even though you fix the issues specified in #1 and #2 above, you might face exception stating An existing connection was forcibly closed by the remote host. . Though I'm not able to identify the solution for this, have found workaround using the articles here and here. Use Http1.0 instead of Http1.1.
Code samples can be found in my github repo here.

Snippet 

public class LargeFileProcessor
    {
        /// <summary>
        /// Logger instance.
        /// </summary>
        private ILogger logger = new Logger();

        /// <summary>
        /// Retry count.
        /// </summary>
        private int retryCount = 5;

        /// <summary>
        /// Time delay for retry.
        /// </summary>
        private TimeSpan delay = TimeSpan.FromSeconds(10);

        /// <summary>
        /// Download Large File as chunk and upload as chunk into BLOB.
        /// </summary>
        public async Task ProcessLargeFile()
        {
            // Create Storage account reference.
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageAccount"));

            // Create the blob client.
            CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

            // Retrieve reference to a container.
            CloudBlobContainer container = blobClient.GetContainerReference(CloudConfigurationManager.GetSetting("ContainerName"));
            container.CreateIfNotExists();

            // Create Blob reference.
            CloudBlockBlob blob = container.GetBlockBlobReference(CloudConfigurationManager.GetSetting("BlobFileName"));

            string urlToDownload = CloudConfigurationManager.GetSetting("DownloadURL"); // Provide valid URL from where the large file can be downloaded.

            Stopwatch stopwatch = Stopwatch.StartNew();

            try
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    var httpRequestMessage = new HttpRequestMessage(HttpMethod.Get, new Uri(urlToDownload))
                    {
                        // To avoid error related to 'An existing connection was forcibly closed by the remote host'. Use Http1.0 instead of Http1.1.
                        Version = HttpVersion.Version10
                    };

                    using (HttpResponseMessage response = await httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false))
                    {
                        using (Stream stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
                        {
                            const int pageSizeInBytes = 104857600; // 100MB. As Blob chunk max size is 100MB as of now.

                            var blockIds = new List<string>();
                            var sha256 = new SHA256Managed();

                            var bytesRemaing = response.Content.Headers.ContentLength.Value; // Read Total file size from the header.
                            int blockIdentifier = 0;

                            while (bytesRemaing > 0)
                            {
                                blockIdentifier++;
                                var bytesToCopy = (int)Math.Min(bytesRemaing, pageSizeInBytes);
                                var bytesToSend = new byte[bytesToCopy];

                                var bytesCountRead = await ReadStreamAndAccumulate(stream, bytesToSend, bytesToCopy);

                                // Instead of calculating bytes remaining to exit the While loop,  we can use bytesCountRead as bytesCountRead will be 0 when there are no more bytes to read form the stream.   
                                bytesRemaing -= bytesCountRead;

                                this.logger.WriteLine($"bytes read: {bytesCountRead}");
                                this.logger.WriteLine($"bytes remaining: {bytesRemaing}");

                                string base64BlockId = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(string.Format("largefile1BlockId{0}", blockIdentifier.ToString("0000000"))));
                                blockIds.Add(base64BlockId);

                                // Calculate the checksum value.
                                if (bytesRemaing <= 0)
                                {
                                    sha256.TransformFinalBlock(bytesToSend, 0, bytesCountRead);
                                }
                                else
                                {
                                    sha256.TransformBlock(bytesToSend, 0, bytesCountRead, bytesToSend, 0);
                                }

                                await blob.PutBlockAsync(base64BlockId, new MemoryStream(bytesToSend), null);
                            }

                            var checksum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);
                            this.logger.WriteLine($"Hash value is : {checksum}");
                            await blob.PutBlockListAsync(blockIds);

                            await Task.FromResult(0);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                this.logger.WriteLine(ex.Message);
                throw;
            }
            finally
            {
                stopwatch.Stop();
                this.logger.WriteLine($"Execution time in mins: {stopwatch.Elapsed.TotalMinutes}");
            }
        }

        /// <summary>
        /// Read the stream and accumulate till it reaches the number of bytes specified to copy.
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamAndAccumulate(Stream stream, byte[] bytesToSend, int bytesCountToCopy)
        {
            int bytesReadSoFar = 0;

            while (bytesReadSoFar < bytesCountToCopy)
            {
                var currentBytesCountRead = await ReadStreamWithRetry(stream, bytesToSend, bytesCountToCopy - bytesReadSoFar, bytesReadSoFar).ConfigureAwait(false);
                bytesReadSoFar += currentBytesCountRead;
            }

            return bytesReadSoFar;
        }

        /// <summary>
        /// Reads the stream with retry when failed. 
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamWithRetry(Stream stream, byte[] bytesToSend, int bytesCountToCopy, int offset)
        {
            int currentRetry = 0;
            for (; ; )
            {
                try
                {
                    var bytesRead = await stream.ReadAsync(bytesToSend, offset, bytesCountToCopy);
                    return bytesRead;
                }
                catch (Exception ex)
                {
                    this.logger.WriteLine($"Operation Exception : {ex.Message}");

                    currentRetry++;

                    // Check if it is within the retry count specified.
                    if (currentRetry > this.retryCount)
                    {
                        // Rethrow the exception if it more than the retry attempt.
                        throw;
                    }
                }

                // Wait to retry the operation.
                await Task.Delay(delay);
            }

        }
    }

Full source code has been provided here.

References



 


Creative Commons License
This work by Tito is licensed under a Creative Commons Attribution 3.0 Unported License.