Saturday, March 31, 2018

Copy large data objects from Azure BLOB to Amazon S3

Scenario

This article is going to focus on providing guidance and library for copying Azure BLOB to Amazon S3. Using .NET (C#), have created few classes which will help us to achieve copying large BLOB into Amazon S3.

Points to be considered

  1. As we're targeting large data objects to be copied, we need perform Multipart upload. 
  2. From Azure BLOB, in order to download data in parts, we can utilize DownloadRangeToStream method. 
  3. Amazon S3 also provides us the facility to perform Multipart upload using the APIs exposed through AWS .NET SDK.
    1. But first we need to understand the nature and different options the SDK provides and select which best suit of needs.
    2. AWS .NET SDK for Multipart upload provides High-Level API and Low-Level API.
    3. High-Level API provides us with some sophisticated way of performing few common operations on S3 by encapsulating core level implementations.
    4. Low-Level API as you might have guessed, exposes the core Multipart operations on S3. 
    5. For our scenario, Low-Level API will be the best fit as we need few detailed control over the operations.
    6. Maximum of 10000 parts are allowed per upload.
    7. Part size can be of range 5MB to 5GB with an exception to the last part which can be less than 5MB.
  4. To handle our scenario, we're going to stick to 100MB as part size in our library.

Library Source code

Following classes has been created to facilitate developers utilize as library encapsulating the implementation logic of the copy operation and provide Request and Response object for ease of use and code management.
  1. BlobToS3Manager: Holds the implementation logic of copying BLOB data int S3.
  2. IBlobToS3Manager: Prototype of BlobToS3Manager.
  3. BlobToS3Request: To hold the BLOB source and S3 target data needed for copy operation.
  4. BlobToS3Response: To hold the response of the copy operation.   
You can find the above 4 library files in my github directory here.

Snippet 1 (BlobToS3Manager)

namespace AWSSampleConsoleApp1.BlobToS3
{
    using Amazon.S3.Model;
    using Microsoft.WindowsAzure.Storage.Blob;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Security.Cryptography;
    using System.Threading.Tasks;

    /// <summary>
    /// Handles the BLOB to S3 
    /// </summary>
    public class BlobToS3Manager : IBlobToS3Manager
    {
        /// <summary>
        /// The container not exists.
        /// </summary>
        private const string ContainerNotExists = "BLOB Container doesn't exists.";

        /// <summary>
        /// The BLOB not exists.
        /// </summary>
        private const string BlobNotExists = "BLOB doesn't exists.";

        /// <summary>
        /// Part size to read from BLOB and upload to S3.
        /// </summary>
        private const long PartSize = 104857600; // 100 MB.

        /// <summary>
        /// The BLOB S3 request.
        /// </summary>
        private readonly BlobToS3Request blobToS3Request;
        /// <summary>
        /// Initializes new instance of BLOB to S3 Manager.
        /// </summary>
        /// <param name="blobToS3Request">The BLOB to S3 request.</param>
        public BlobToS3Manager(BlobToS3Request blobToS3Request)
        {
            this.blobToS3Request = blobToS3Request;
        }

        /// <summary>
        /// Copies BLOB to S3.
        /// </summary>
        /// <returns>The BLOB to S3 response.</returns>
        public async Task<BlobToS3Response> CopyFromBlobToS3Async()
        {
            BlobToS3Response blobToS3Response = new BlobToS3Response();
            var validation = await this.Validate();

            if (!validation.Item1)
            {
                return validation.Item2;
            }

            var sourceBlob = validation.Item3;
            await sourceBlob.FetchAttributesAsync();
            var remainingBytes = sourceBlob.Properties.Length;
            long readPosition = 0; // To be used offset / position from where to start reading from BLOB.

            InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest
            {
                BucketName = this.blobToS3Request.TargetS3Bucket,
                Key = this.blobToS3Request.TargetS3File
            };

            // Will use UploadId from this response.
            InitiateMultipartUploadResponse initiateMultipartUploadResponse = this.blobToS3Request.S3Client.InitiateMultipartUpload(initiateMultipartUploadRequest);
            List<UploadPartResponse> uploadPartResponses = new List<UploadPartResponse>();

            try
            {
                int partCounter = 0; // To increment on each read of parts and use it as part number.
                var sha256 = new SHA256Managed();

                while (remainingBytes > 0)
                {
                    // Determine the size when final block reached as it might be less than Part size. 
                    // Will be PartSize except final block.
                    long bytesToCopy = Math.Min(PartSize, remainingBytes);

                    using (MemoryStream memoryStream = new MemoryStream())
                    {
                        // To download part from BLOB.
                        await sourceBlob.DownloadRangeToStreamAsync(memoryStream, readPosition, bytesToCopy).ConfigureAwait(false);
                        memoryStream.Position = 0;
                        partCounter++;

                        UploadPartRequest uploadRequest = new UploadPartRequest
                        {
                            BucketName = this.blobToS3Request.TargetS3Bucket,
                            Key = this.blobToS3Request.TargetS3File,
                            UploadId = initiateMultipartUploadResponse.UploadId,
                            PartNumber = partCounter,
                            PartSize = bytesToCopy,
                            InputStream = memoryStream
                        };

                        UploadPartResponse uploadPartResponse = this.blobToS3Request.S3Client.UploadPart(uploadRequest);
                        uploadPartResponses.Add(uploadPartResponse);

                        remainingBytes -= bytesToCopy;
                        readPosition += bytesToCopy;

                        // $"Uploaded part with part number {partCounter}, size {bytesToCopy}bytes and remaining {remainingBytes}bytes to read.")

                        // Calculate the checksum value.
                        if (remainingBytes <= 0)
                        {
                            sha256.TransformFinalBlock(memoryStream.ToArray(), 0, (int)bytesToCopy);
                        }
                        else
                        {
                            byte[] bytesToSend = memoryStream.ToArray();
                            sha256.TransformBlock(bytesToSend, 0, (int)bytesToCopy, bytesToSend, 0);
                        }
                    }
                }

                blobToS3Response.Sha256CheckSum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);

                CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest
                {
                    BucketName = this.blobToS3Request.TargetS3Bucket,
                    Key = this.blobToS3Request.TargetS3File,
                    UploadId = initiateMultipartUploadResponse.UploadId
                };

                completeMultipartUploadRequest.AddPartETags(uploadPartResponses);

                CompleteMultipartUploadResponse completeMultipartUploadResponse = await this.blobToS3Request.S3Client.CompleteMultipartUploadAsync(completeMultipartUploadRequest).ConfigureAwait(false);

                blobToS3Response.IsSuccess = true;
                blobToS3Response.S3Path = completeMultipartUploadResponse.Location;
            }
            catch (Exception exception)
            {
                blobToS3Response.IsSuccess = false;
                blobToS3Response.Message = exception.Message;

                AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest
                {
                    BucketName = this.blobToS3Request.TargetS3Bucket,
                    Key = this.blobToS3Request.TargetS3File,
                    UploadId = initiateMultipartUploadResponse.UploadId
                };

                await this.blobToS3Request.S3Client.AbortMultipartUploadAsync(abortMultipartUploadRequest).ConfigureAwait(false);
            }

            return blobToS3Response;
        }

        /// <summary>
        /// Validates the source BLOB is valid.
        /// </summary>
        /// <returns>The resultant tuple.</returns>
        private async Task<Tuple<bool, BlobToS3Response, CloudBlockBlob>> Validate()
        {
            BlobToS3Response blobToS3Response = null;
            CloudBlobContainer cloudBlobContainer = this.blobToS3Request.BlobClient.GetContainerReference(this.blobToS3Request.SourceBlobContainer);

            if (!await cloudBlobContainer.ExistsAsync())
            {
                blobToS3Response = new BlobToS3Response
                {
                    IsSuccess = false,
                    Message = ContainerNotExists
                };

                return new Tuple<bool, BlobToS3Response, CloudBlockBlob>(false, blobToS3Response, null);
            }

            CloudBlockBlob cloudBlockBlob = cloudBlobContainer.GetBlockBlobReference(this.blobToS3Request.SourceBlob);

            if (await cloudBlockBlob.ExistsAsync())
            {
                return new Tuple<bool, BlobToS3Response, CloudBlockBlob>(true, null, cloudBlockBlob);
            }

            blobToS3Response = new BlobToS3Response
            {
                IsSuccess = false,
                Message = BlobNotExists
            };

            return new Tuple<bool, BlobToS3Response, CloudBlockBlob>(false, blobToS3Response, null);
        }

    }
}

Snippet 2 (CopyFromBlobToS3: Sample on how to create instances and call the library)


Full source code can be found in my github repo here.

Sunday, February 18, 2018

Calculating Hash values / checksum for files while we read them as Streams in chunks

Introduction

You might need to encounter situations where you need to calculate the checksum of file / stream while transmitting across the wire. Nowadays, it is common to transmit the file stream frequently and during that scenario, you need to ensure data has not been corrupted during this transmission process. For which in the receiving end you need to use the same algorithm and recalculate the checksum to ensure the transmitted data is not corrupted.

Scenario

Let me take the same scenario I explained in my previous post Download large files as chunks and upload them into BLOB. In which we downloaded and transmitted large file as stream in chunks. There are lot of articles over web explaining calculating checksum for full file stream. Here we'll see the snippet below for the case of calculating checksum for chunks and get them accumulated at the end.


Tips

HashAlgorithm.TransformBlock and HashAlgorithm.TransformFinalBlock will help you achieve this.

Snippet

public class LargeFileProcessor
    {       
        /// <summary>
        /// Logger instance.
        /// </summary>
        private ILogger logger = new Logger();

        /// <summary>
        /// Download Large File as chunk and upload as chunk into BLOB.
        /// </summary>
        public async Task ProcessLargeFile()
        {
            // Trimmed for brevity.

            string urlToDownload = CloudConfigurationManager.GetSetting("DownloadURL"); // Provide valid URL from where the large file can be downloaded.

            Stopwatch stopwatch = Stopwatch.StartNew();

            try
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    var httpRequestMessage = new HttpRequestMessage(HttpMethod.Get, new Uri(urlToDownload))
                    {
                        // To avoid error related to 'An existing connection was forcibly closed by the remote host'. Use Http1.0 instead of Http1.1.
                        Version = HttpVersion.Version10
                    };

                    using (HttpResponseMessage response = await httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false))
                    {
                        using (Stream stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
                        {
                            const int pageSizeInBytes = 104857600; // 100MB. As Blob chunk max size is 100MB as of now.

                            var sha256 = new SHA256Managed();

                            var bytesRemaing = response.Content.Headers.ContentLength.Value; // Read Total file size from the header.

                            while (bytesRemaing > 0)
                            {
                                var bytesToCopy = (int)Math.Min(bytesRemaing, pageSizeInBytes);
                                var bytesToSend = new byte[bytesToCopy];

                                var bytesCountRead = await ReadStreamAndAccumulate(stream, bytesToSend, bytesToCopy);

                                // Instead of calculating bytes remaining to exit the While loop,  we can use bytesCountRead as bytesCountRead will be 0 when there are no more bytes to read form the stream.   
                                bytesRemaing -= bytesCountRead;

                                // Calculate the checksum value.
                                if (bytesRemaing <= 0)
                                {
                                    sha256.TransformFinalBlock(bytesToSend, 0, bytesCountRead);
                                }
                                else
                                {
                                    sha256.TransformBlock(bytesToSend, 0, bytesCountRead, bytesToSend, 0);
                                }
                            }

                            var checksum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);
                            this.logger.WriteLine($"Hash value is : {checksum}");

                            await Task.FromResult(0);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                this.logger.WriteLine(ex.Message);
                throw;
            }
            finally
            {
                stopwatch.Stop();
                this.logger.WriteLine($"Execution time in mins: {stopwatch.Elapsed.TotalMinutes}");
            }
        }

        /// <summary>
        /// Read the stream and accumulate till it reaches the number of bytes specified to copy.
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamAndAccumulate(Stream stream, byte[] bytesToSend, int bytesCountToCopy)
        {
                        // Trimmed for brevity.
        }

        /// <summary>
        /// Reads the stream with retry when failed. 
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamWithRetry(Stream stream, byte[] bytesToSend, int bytesCountToCopy, int offset)
        {
                        // Trimmed for brevity.
        }
    }


In the above Snippet I've trimmed set of code to just focus on minimalist snippet to portray calculating checksum for chunk read. You can find full source code in my github repo here.

References

Sunday, February 11, 2018

Download large files as chunks and upload them into BLOB

Introduction

In real case scenarios there are time where we need to download large files from from a Web resource and save it to Azure BLOB storage. Even though there are few articles over Web which helped, I'm not able to get a end to end working solution for files with size ranging from 20GB to 30GB. Have detailed below challenges and related solutions for your convenience.

Problem statement

Download large files from the Web resource and upload them into Azure BLOB storage.

Challenges and Solutions

  1. First we'll hit memory issue when we try to read the full stream and load all bytes into memory. An article here explains well how to avoid this Memory issue.
  2. Another article here detailed how to read FileStream  as chunks and upload them into BLOB. Using the details from this and #1 we can try to achieve the solution. 
    1. Use Stream.Read by passing respective parameters. In that one of the parameter is maximum number of bytes to read from the current stream. And  the Stream.Read method returns the bytes read. But that also got into issue as the Stream.Read method returns total number of bytes read which can be less than the maximum count parameter. Detailed documentation of Stream.Read can be found here
    2.  Along with that, we can use CloudBlockBlob.PutBlock method to upload read chunks into BLOB. Points to note here are, each block (chunk) in BLOB can be maximum of 100MB in size and at the max you can have 50000 blocks (100MB x 50000 blocks = 4.75TB). Detailed documentation can be found here
    3. Even though #2.1 and #2.2 looks straight forward, there might be issues because Read can read less number of bytes which in turn increases the number of blocks (more than 50000 limit) in the Azure BLOB storage.
    4. In order to avoid issue mentioned above in #2.3, we need to accumulate the resultant of  Stream.Read till it reaches the expected size (100MB in our case).
  3. Even though you fix the issues specified in #1 and #2 above, you might face exception stating An existing connection was forcibly closed by the remote host. . Though I'm not able to identify the solution for this, have found workaround using the articles here and here. Use Http1.0 instead of Http1.1.
Code samples can be found in my github repo here.

Snippet 

public class LargeFileProcessor
    {
        /// <summary>
        /// Logger instance.
        /// </summary>
        private ILogger logger = new Logger();

        /// <summary>
        /// Retry count.
        /// </summary>
        private int retryCount = 5;

        /// <summary>
        /// Time delay for retry.
        /// </summary>
        private TimeSpan delay = TimeSpan.FromSeconds(10);

        /// <summary>
        /// Download Large File as chunk and upload as chunk into BLOB.
        /// </summary>
        public async Task ProcessLargeFile()
        {
            // Create Storage account reference.
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageAccount"));

            // Create the blob client.
            CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

            // Retrieve reference to a container.
            CloudBlobContainer container = blobClient.GetContainerReference(CloudConfigurationManager.GetSetting("ContainerName"));
            container.CreateIfNotExists();

            // Create Blob reference.
            CloudBlockBlob blob = container.GetBlockBlobReference(CloudConfigurationManager.GetSetting("BlobFileName"));

            string urlToDownload = CloudConfigurationManager.GetSetting("DownloadURL"); // Provide valid URL from where the large file can be downloaded.

            Stopwatch stopwatch = Stopwatch.StartNew();

            try
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    var httpRequestMessage = new HttpRequestMessage(HttpMethod.Get, new Uri(urlToDownload))
                    {
                        // To avoid error related to 'An existing connection was forcibly closed by the remote host'. Use Http1.0 instead of Http1.1.
                        Version = HttpVersion.Version10
                    };

                    using (HttpResponseMessage response = await httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false))
                    {
                        using (Stream stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
                        {
                            const int pageSizeInBytes = 104857600; // 100MB. As Blob chunk max size is 100MB as of now.

                            var blockIds = new List<string>();
                            var sha256 = new SHA256Managed();

                            var bytesRemaing = response.Content.Headers.ContentLength.Value; // Read Total file size from the header.
                            int blockIdentifier = 0;

                            while (bytesRemaing > 0)
                            {
                                blockIdentifier++;
                                var bytesToCopy = (int)Math.Min(bytesRemaing, pageSizeInBytes);
                                var bytesToSend = new byte[bytesToCopy];

                                var bytesCountRead = await ReadStreamAndAccumulate(stream, bytesToSend, bytesToCopy);

                                // Instead of calculating bytes remaining to exit the While loop,  we can use bytesCountRead as bytesCountRead will be 0 when there are no more bytes to read form the stream.   
                                bytesRemaing -= bytesCountRead;

                                this.logger.WriteLine($"bytes read: {bytesCountRead}");
                                this.logger.WriteLine($"bytes remaining: {bytesRemaing}");

                                string base64BlockId = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(string.Format("largefile1BlockId{0}", blockIdentifier.ToString("0000000"))));
                                blockIds.Add(base64BlockId);

                                // Calculate the checksum value.
                                if (bytesRemaing <= 0)
                                {
                                    sha256.TransformFinalBlock(bytesToSend, 0, bytesCountRead);
                                }
                                else
                                {
                                    sha256.TransformBlock(bytesToSend, 0, bytesCountRead, bytesToSend, 0);
                                }

                                await blob.PutBlockAsync(base64BlockId, new MemoryStream(bytesToSend), null);
                            }

                            var checksum = BitConverter.ToString(sha256.Hash).Replace("-", string.Empty);
                            this.logger.WriteLine($"Hash value is : {checksum}");
                            await blob.PutBlockListAsync(blockIds);

                            await Task.FromResult(0);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                this.logger.WriteLine(ex.Message);
                throw;
            }
            finally
            {
                stopwatch.Stop();
                this.logger.WriteLine($"Execution time in mins: {stopwatch.Elapsed.TotalMinutes}");
            }
        }

        /// <summary>
        /// Read the stream and accumulate till it reaches the number of bytes specified to copy.
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamAndAccumulate(Stream stream, byte[] bytesToSend, int bytesCountToCopy)
        {
            int bytesReadSoFar = 0;

            while (bytesReadSoFar < bytesCountToCopy)
            {
                var currentBytesCountRead = await ReadStreamWithRetry(stream, bytesToSend, bytesCountToCopy - bytesReadSoFar, bytesReadSoFar).ConfigureAwait(false);
                bytesReadSoFar += currentBytesCountRead;
            }

            return bytesReadSoFar;
        }

        /// <summary>
        /// Reads the stream with retry when failed. 
        /// </summary>
        /// <param name="stream">Stream to be read from.</param>
        /// <param name="bytesToSend">Target byte array that holds the bytes read.</param>
        /// <param name="bytesCountToCopy">The number of bytes to be copied.</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
        /// <returns>The number of bytes read.</returns>
        private async Task<int> ReadStreamWithRetry(Stream stream, byte[] bytesToSend, int bytesCountToCopy, int offset)
        {
            int currentRetry = 0;
            for (; ; )
            {
                try
                {
                    var bytesRead = await stream.ReadAsync(bytesToSend, offset, bytesCountToCopy);
                    return bytesRead;
                }
                catch (Exception ex)
                {
                    this.logger.WriteLine($"Operation Exception : {ex.Message}");

                    currentRetry++;

                    // Check if it is within the retry count specified.
                    if (currentRetry > this.retryCount)
                    {
                        // Rethrow the exception if it more than the retry attempt.
                        throw;
                    }
                }

                // Wait to retry the operation.
                await Task.Delay(delay);
            }

        }
    }

Full source code has been provided here.

References



 


Monday, October 30, 2017

Time triggered / Scheduler service with Azure Service Fabric

Introduction

Azure Service Fabric Services can be configured to exhibit trigger based behavior similar to WebJobs and Azure Functions.

Time triggered / Scheduler Service in Azure Service Fabric

To create a time triggered (scheduler) Service in Azure Service Fabric, I can think of following two quick options among the possible ways.
  1. Create a time triggered WebJob and add/deploy it as a guest executable in Azure Service Fabric.
  2. Create Azure Service Fabric Stateless Service and implement the listener to handle jobs.
Here I'm taking the second approach which involves the respective listener to be created. Rather than writing new Scheduling framework, I'm using the Quartz.Net framework. I'm using the CRON expression behavior to have the behavior inline with the time trigger behavior of WebJobs and Azure Functions.

I'll be explaining two approaches below.
  1. Simple Time trigger Service
  2. Time trigger Service using Dependency Injection

Simple Time trigger Service

  • Uses Quartz.Net framework.
  • Without Dependency Injection.
  • Listerner gets all the IJob types from the current assembly with JobInfoAttribute decorated.
  • JobInfoAttribute holds the name and CRON expression of the job.
  • Code samples can be found in my github repo here.

Snippet 1 (SimpleTimeTriggerListener)

using Microsoft.ServiceFabric.Services.Communication.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Reflection;
using Quartz;
using TimeTriggerService;
using Quartz.Impl;

namespace SimpleTimeTriggerService
{
    public class SimpleTimeTriggerListener : ICommunicationListener
    {
        private IScheduler scheduler;

        public void Abort()
        {
            scheduler.Shutdown(false);
        }

        public Task CloseAsync(CancellationToken cancellationToken)
        {
            scheduler.Shutdown(true);
            return Task.FromResult(true);
        }

        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            var tasks =
                Assembly.GetExecutingAssembly()
                    .GetTypes()
                    .Where(
                        t =>
                            typeof(IJob).IsAssignableFrom(t) &&
                            Attribute.IsDefined(t, typeof(JobInfoAttribute)));

            foreach (var task in tasks)
            {
                var schedulerFactory = new StdSchedulerFactory();
                scheduler = schedulerFactory.GetScheduler();
                var jobInfo = Attribute.GetCustomAttribute(task, typeof(JobInfoAttribute)) as JobInfoAttribute;
                var jobName = jobInfo == null ? task.Name : jobInfo.Name;
                var job = new JobDetailImpl(jobName, null, task);

                var trigger =
                    TriggerBuilder.Create()
                        .WithIdentity($"{jobName}Trigger", null)
                        .WithCronSchedule(jobInfo.CronExpression)
                        .ForJob(job)
                        .Build();
                scheduler.ScheduleJob(job, trigger);                
            }

            scheduler.Start();

            return Task.FromResult("Sample Simple Job scheduler");
        }
    }
}

Time trigger Service using Dependency Injection

  • Listerner gets all the IJob types from the current assembly with JobInfoAttribute decorated.
  • JobInfoAttribute holds the name and CRON expression of the job.
  • Code samples can be found in my github repo here.



Snippet 2 (TimeTriggerListener with Autofac)

using Microsoft.ServiceFabric.Services.Communication.Runtime;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using Autofac;
using Quartz;
using Autofac.Extras.Quartz;
using System.Reflection;

namespace TimeTriggerService
{
    internal class TimeTriggerListener : ICommunicationListener
    {
        private IScheduler scheduler;

        public void Abort()
        {
            scheduler.Shutdown(false);
        }

        public Task CloseAsync(CancellationToken cancellationToken)
        {
            scheduler.Shutdown(true);
            return Task.FromResult(true);
        }

        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            ContainerBuilder builder = new ContainerBuilder();
            builder.RegisterModule(new QuartzAutofacFactoryModule());
            builder.RegisterModule(new QuartzAutofacJobsModule(Assembly.GetExecutingAssembly()));
            builder.RegisterAssemblyTypes(Assembly.GetExecutingAssembly()).Where(x => typeof(IJob).IsAssignableFrom(x)).As<IJob>();
            builder.RegisterType<Logger>().As<ILogger>();
            builder.RegisterType<JobScheduler>().AsSelf();
            IContainer container = builder.Build();
            ConfigureScheduler(container);

            return Task.FromResult("Sample Job scheduler");
        }
        
        private void ConfigureScheduler(IContainer container)
        {
            IEnumerable<IJob> jobList = container.Resolve<IEnumerable<IJob>>();
            var jobScheduler = container.Resolve<JobScheduler>();
            this.scheduler = jobScheduler.Start();
        }
    }
}



Snippet 3 (JobScheduler)

using Quartz;
using Quartz.Impl;
using System;
using System.Collections.Generic;

namespace TimeTriggerService
{
    public class JobScheduler
    {
        private IScheduler scheduler;
        private IEnumerable<IJob> jobs;
        public JobScheduler(IScheduler schedulerParam, IEnumerable<IJob> jobsParam)
        {
            this.scheduler = schedulerParam;
            this.jobs = jobsParam;
        }

        public IScheduler Start()
        {
            foreach (var job in jobs)
            {
                var task = job.GetType();
                var jobInfo = Attribute.GetCustomAttribute(task, typeof(JobInfoAttribute)) as JobInfoAttribute;
                var jobName = jobInfo == null ? task.Name : jobInfo.Name;
                var jobDetail = new JobDetailImpl(jobName, null, task);
                var trigger =
                    TriggerBuilder.Create()
                        .WithIdentity($"{jobName}Trigger", null)
                        .WithCronSchedule(jobInfo.CronExpression)
                        .ForJob(jobDetail)
                        .Build();
                scheduler.ScheduleJob(jobDetail, trigger);
            }
            scheduler.Start();

            return scheduler;
        }
    }
}


Full source code has been provided here.

Applying strict boolean behavior in ASP.NET WEB API JSON request model

Introduction

In ASP.NET WEB API / MVC in the request input model if you have some property that accepts bool type with content type application/json, it might even accept numbers which will be converted to bool type. If you send numbers as input for bool, it'll be converted to false for 0 and true for everything else (with an exception observed - 08 and 09 as null). 

Problem statement

To restrict from numbers being accepted as input for bool type property in JSON content type.

Solution

  1. Create a converter that inherits from Newtonsoft.Json.JsonConverter.
  2. Override ReadJson method, check the ValueType for bool type and throw JSON serializer exception if not.

Snippet (Converter)

public class BooleanConverter : JsonConverter
{
    public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
    {
        
    }

    public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
    {
        if (reader.ValueType != typeof(bool))
        {
            throw new JsonSerializationException("Invalid data type");
        }
        return reader.Value;
    }

    public override bool CanConvert(Type objectType)
    {
        return objectType == typeof(bool);
    }
}

Saturday, October 28, 2017

Calculate execution time for your ASP.NET WEB API controller actions using Autofac action filter

Requirement

  • Calculate execution time of ASP.NET WEB API actions
  • Create a common code that can calculate and log the details
  • Project uses Autofac for Dependency Injection.
If you've integrated Azure Application Insights to your WEB API, automatically it'll log/capture the execution time of each request. If you need it as additional logging or in place where Application Insights not associated, this will be helpful. Let us get into the code directly.

Approach 

  1. We will be using ActionFilter to write the logic for calculating the execution time.
  2. In case of WEB API that uses Autofac as dependency resolver, we can utilize the IAutofacActionFilter to achieve the same.
  3. Register the respective action filter globally to execute for all controllers and actions, so that you don't need to worry when you add new controllers/actions.
  4. In the below example I provided the sample snippet related to Autofac based solution. Ignore the Custom logger in the below example as it is just for portraying the sample.

Snippet1: (Action Filter that calculates the execution time and logs)

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Web;
using System.Web.Http.Controllers;
using System.Web.Http.Filters;
using Autofac.Integration.WebApi;

namespace TestWebAPI1.Utils
{
    public class ExecutionTimeActionFilterAttribute : IAutofacActionFilter
    {
        private const string StopwatchInstanceKey = "StopwatchInstance";

        private readonly ICustomLogger1 logger;

        public ExecutionTimeActionFilterAttribute(ICustomLogger1 loggerParam)
        {
            this.logger = loggerParam;
        }


        public void OnActionExecuting(HttpActionContext actionContext)
        {
            Stopwatch stopwatchInstance = new Stopwatch();
            actionContext.Request.Properties[StopwatchInstanceKey] = stopwatchInstance;
            stopwatchInstance.Start();
        }

        public void OnActionExecuted(HttpActionExecutedContext actionExecutedContext)
        {
            if (!actionExecutedContext.Request.Properties.ContainsKey(StopwatchInstanceKey))
            {
                return;
            }

            Stopwatch stopwatchInstance = actionExecutedContext.Request.Properties[StopwatchInstanceKey] as Stopwatch;
            if (stopwatchInstance == null)
            {
                return;
            }

            stopwatchInstance.Stop();
            string actionName = actionExecutedContext.ActionContext.ActionDescriptor.ActionName;

            this.logger.WriteLine($"Execution time of action : {actionName} in controller : {actionExecutedContext.ActionContext.ControllerContext.ControllerDescriptor.ControllerName} is {stopwatchInstance.ElapsedMilliseconds} milliseconds");
        }
    }
}

Snippet 2: (Autofac registration in WebApiConfig)

namespace TestWebAPI1
{
    public static class WebApiConfig
    {
        public static void Register(HttpConfiguration config)
        {
            // Web API configuration and services
            // Configure Web API to use only bearer token authentication.
            config.SuppressDefaultHostAuthentication();
            config.Filters.Add(new HostAuthenticationFilter(OAuthDefaults.AuthenticationType));

            // Web API routes
            config.MapHttpAttributeRoutes();

            config.Routes.MapHttpRoute(
                name: "DefaultApi",
                routeTemplate: "api/{controller}/{id}",
                defaults: new { id = RouteParameter.Optional }
            );

            var builder = new ContainerBuilder();
            
            builder.RegisterType<CustomLogger1>().As<ICustomLogger1>().InstancePerDependency();
            
            builder.RegisterApiControllers(Assembly.GetExecutingAssembly());
            
            builder.RegisterWebApiFilterProvider(config);
            builder.Register(c => new ExecutionTimeActionFilterAttribute(c.Resolve<ICustomLogger1>()))
                .AsWebApiActionFilterFor<ApiController>()
                .InstancePerRequest();

            var container = builder.Build();
            config.DependencyResolver = new AutofacWebApiDependencyResolver(container);
        }
    }
}

References

The concept has been taken based on the suggestions by some blogs and stackoverflow forums (unable to recall exact article).

Web Deployment package Zip not getting created even after specifying required MSBuild args in Visual Studio Build task

Introduction

Visual Studio Team Services (VSTS) provide the facilitation to incorporate Continuous Integration and Continuous deployment. For Azure WebApp/WebJobs Documentation and Blogs are available (like this, this ) to guide you how to achieve those. In this post we're going to have a look at scenario where the Web deployment package itself might not get created and solution for that.

Scenario


  • Visual Studio build task has been added and configured appropriately.
  • Respective MSBuild arguments "/p:DeployOnBuild=true /p:WebPublishMethod=Package /p:PackageAsSingleFile=true /p:SkipInvalidConfigurations=true /p:PackageLocation=$(Build.BinariesDirectory)" has been provided.
But even after that Web deployment package zip file not getting created.

Possible Cause and Solution

Ensure you have the Microsoft.Web.WebJobs.Publish NuGet package added to the respective project. If the package is not associated, in your CI execution Buildstep won't throw any exception, rather it simply won't create the web deployment zip package.
Creative Commons License
This work by Tito is licensed under a Creative Commons Attribution 3.0 Unported License.