Sunday, February 11, 2018

Download large files as chunks and upload them into BLOB

Introduction

In real case scenarios there are time where we need to download large files from from a Web resource and save it to Azure BLOB storage. Even though there are few articles over Web which helped, I'm not able to get a end to end working solution for files with size ranging from 20GB to 30GB. Have detailed below challenges and related solutions for your convenience.

Problem statement

Download large files from the Web resource and upload them into Azure BLOB storage.

Challenges and Solutions

  1. First we'll hit memory issue when we try to read the full stream and load all bytes into memory. An article here explains well how to avoid this Memory issue.
  2. Another article here detailed how to read FileStream  as chunks and upload them into BLOB. Using the details from this and #1 we can try to achieve he solution. 
    1. Use Stream.Read by passing respective parameters. Points to note here are, In that one of the parameter is maximum number of bytes to read from the current stream. And the Stream.Read method returns  But that also got into issue as the Stream.Read method returns total number of bytes read which can be less than the maximum count parameter. Detailed documentation of Stream.Read can be found here
    2.  Along with that, we can use CloudBlockBlob.PutBlock method to upload read chunks into BLOB. Points to note here are, each block (chunk) in BLOB can be maximum of 100MB in size and at the max you can have 50000 blocks (100MB x 50000 blocks = 4.75TB). Detailed documentation can be found here
    3. Even though #2.1 and #2.2 looks straight forward, there might be issues because Read can read less number of bytes which in turn increases the number of blocks (more than 50000 limit) in the Azure BLOB storage.
    4. In order to avoid issue mentioned above in #2.3, we need to accumulate the resultant of  Stream.Read till it reaches the expected size (100MB in our case).
  3. Even though you fix the issues specified in #1 and #2 above, you might face exception stating An existing connection was forcibly closed by the remote host. . Though I'm not able to identify the solution for this, have found workaround using the articles here and here. Use Http1.0 instead of Http1.1.
Code samples can be found in my github repo here.

Snippet 

public class LargeFileProcessor
    {
        /// <summary>
        /// Logger instance.
        /// </summary>
        private ILogger logger = new Logger();

        /// <summary>
        /// Retry count.
        /// </summary>
        private int retryCount = 5;

        /// <summary>
        /// Time delay for retry.
        /// </summary>
        private TimeSpan delay = TimeSpan.FromSeconds(10);

        /// <summary>
        /// Download Large File as chunk and upload as chunk into BLOB.
        /// </summary>
        public async Task ProcessLargeFile()
        {
            // Create Storage account reference.
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageAccount"));

            // Create the blob client.
            CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

            // Retrieve reference to a container.
            CloudBlobContainer container = blobClient.GetContainerReference(CloudConfigurationManager.GetSetting("ContainerName"));
            container.CreateIfNotExists();

            // Create Blob reference.
            CloudBlockBlob blob = container.GetBlockBlobReference(CloudConfigurationManager.GetSetting("BlobFileName"));

            string urlToDownload = CloudConfigurationManager.GetSetting("DownloadURL"); // Provide valid URL from where the large file can be downloaded.

            Stopwatch stopwatch = Stopwatch.StartNew();

            try
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    var httpRequestMessage = new HttpRequestMessage(HttpMethod.Get, new Uri(urlToDownload))
                    {
                        // To avoid error related to 'An existing connection was forcibly closed by the remote host'. Use Http1.0 instead of Http1.1.
                        Version = HttpVersion.Version10
                    };

                    using (HttpResponseMessage response = await httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false))
                    {
                        using (Stream stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
                        {
                            const int pageSizeInBytes = 104857600; // 100MB. As Blob chunk max size is 100MB as of now.

                            var blockIds = new List<string>();
                            var sha256 = new SHA256Managed();

                            var bytesRemaing = response.Content.Headers.ContentLength.Value; // Read Total file size from the header.
                            int blockIdentifier = 0;

                            while (bytesRemaing > 0)
                            {
                                blockIdentifier++;
                                var bytesToCopy = (int)Math.Min(bytesRemaing, pageSizeInBytes);
                                var bytesToSend = new byte[bytesToCopy];

                                var bytesCountRead = await ReadStreamAndAccumulate(stream, bytesToSend, bytesToCopy);

                                // Instead of calculating bytes remaining to exit the While loop,  we can use bytesCountRead as bytesCountRead will be 0 when there are no more bytes to read form the stream.   
                                bytesRemaing -= bytesCountRead;

                                this.logger.WriteLine($"bytes read: {bytesCountRead}");
                                this.logger.WriteLine($"bytes remaining: {bytesRemaing}");

                                string base64BlockId = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(string.Format("largefile1BlockId{0}", blockIdentifier.ToString("0000000"))));
                                blockIds.Add(base64BlockId);

                                // Calculate the checksum value.
                                if (bytesRemaing <= 0)
                                {
                                    sha256.TransformFinalBlock(bytesToSend, 0, bytesCountRead);
                                }
                                else
                                {
                                    sha256.TransformBlock(bytesToSend, 0, bytesCountRead, bytesToSend, 0);
                                }

                                await blob.PutBlockAsync(base64BlockId, new MemoryStream(bytesToSend), null);
                            }

                            var checksum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);
                            this.logger.WriteLine($"Hash value is : {checksum}");
                            await blob.PutBlockListAsync(blockIds);

                            await Task.FromResult(0);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                this.logger.WriteLine(ex.Message);
                throw;
            }
            finally
            {
                stopwatch.Stop();
                this.logger.WriteLine($"Execution time in mins: {stopwatch.Elapsed.TotalMinutes}");
            }
        }

        /// <summary>
        /// Read the stream and accumulate till it reaches the number of bytes specified to copy.
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamAndAccumulate(Stream stream, byte[] bytesToSend, int bytesCountToCopy)
        {
            int bytesReadSoFar = 0;

            while (bytesReadSoFar < bytesCountToCopy)
            {
                var currentBytesCountRead = await ReadStreamWithRetry(stream, bytesToSend, bytesCountToCopy - bytesReadSoFar, bytesReadSoFar).ConfigureAwait(false);
                bytesReadSoFar += currentBytesCountRead;
            }

            return bytesReadSoFar;
        }

        /// <summary>
        /// Reads the stream with retry when failed. 
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamWithRetry(Stream stream, byte[] bytesToSend, int bytesCountToCopy, int offset)
        {
            int currentRetry = 0;
            for (; ; )
            {
                try
                {
                    var bytesRead = await stream.ReadAsync(bytesToSend, offset, bytesCountToCopy);
                    return bytesRead;
                }
                catch (Exception ex)
                {
                    this.logger.WriteLine($"Operation Exception : {ex.Message}");

                    currentRetry++;

                    // Check if it is within the retry count specified.
                    if (currentRetry > this.retryCount)
                    {
                        // Rethrow the exception if it more than the retry attempt.
                        throw;
                    }
                }

                // Wait to retry the operation.
                await Task.Delay(delay);
            }

        }
    }

Full source code has been provided here.

References



 


No comments:

Post a Comment

Creative Commons License
This work by Tito is licensed under a Creative Commons Attribution 3.0 Unported License.