Sunday, April 30, 2017

Parallel.Invoke – Task Parallel Library


In my previous article, I explained about Parallel.For and Parallel.Foreach Loop in detail. In this article I will explain, how to use Parallel.Invoke to do multiple tasks concurrently.

Parallel.For and Parallel.Foreach can be used to loop asynchronously, but Parallel.Invoke can be used to do multiple tasks concurrently. Parallel.Invoke method is part of System.Threading.Tasks and accept array of action delegates as input and run all action delegates in parallel.

You can use Parallel.Invoke method when you want to do multiple tasks in parallel. This will be always faster than calling all the tasks synchronously. Task parallel library internally manages to divide and run multiple task in different threads in parallel. It also manages thread scheduling and scaling automatically as per the number of cores on your computer. There is no guarantee in which order all your tasks are executed and completed. See below example.

Code –

namespace ParallelInvoke
{
using System.Threading;
using System.Threading.Tasks;
class Program
{
    static void Main(string[] args)
    {
        Parallel.Invoke(() =>
        {
            Console.WriteLine("Starting first task");
            Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
            DoFirstTask();
        }, () =>
        {
            Console.WriteLine("Starting second task");
            Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
            DoSecondTask();
        }, () =>
        {
            Console.WriteLine("Starting third task");
            Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
            DoThirdTask();
        }
            );
        Console.ReadLine();
    }
    public static void DoFirstTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Total numbers - {0}", numbers.Count());
        Console.WriteLine("First task completed.");
    }
    public static void DoSecondTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Sum of all numbers - {0}", numbers.Sum());
        Console.WriteLine("Second task completed.");
    }
    public static void DoThirdTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Average of all numbers - {0}", numbers.Average());
        Console.WriteLine("Third task completed.");
    }
}
}

Output –


As you can see in above example, there are three delegates assigned to Parallel.Invoke method to execute. As i mentioned earlier that all delegates runs in different threads concurrently hence the order of task execution is different. You can also check managed thread id for each task on which they are executing.

Cancelling Parallel.Invoke method –


You can cancel any task which is running via Parallel.Invoke method based on certain condition. Parallel.Invoke method has overload which accepts ParallelOptions in which you can specify cancellation token. See Parallel.Invoke overload method which accept ParallelOptions as parameter.

public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);

See below little bit of modified code to accept cancel request.

Code –
namespace ParallelInvoke
{
using System.Threading;
using System.Threading.Tasks;
class Program
{
    static CancellationTokenSource cancelToken = new CancellationTokenSource();
    static void Main(string[] args)
    {
        ParallelOptions options = new ParallelOptions();
        options.CancellationToken = cancelToken.Token;
        try
        {
            Parallel.Invoke(options, () =>
            {
                Console.WriteLine("Starting first task");
                Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
                DoFirstTask();
            }, () =>
            {
                Console.WriteLine("Starting second task");
                Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
                DoSecondTask();
                options.CancellationToken.ThrowIfCancellationRequested();
            }, () =>
            {
                Console.WriteLine("Starting third task");
                Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
                DoThirdTask();
            }
            );
        }
        catch (OperationCanceledException ex)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("Error Message - {0}", ex.Message);
        }
           
        Console.ReadLine();
    }
    public static void DoFirstTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Total numbers - {0}", numbers.Count());
        Console.WriteLine("First task completed.");
    }
    public static void DoSecondTask()
    {
        List<int> numbers = new List<int>();
        if (numbers.Count <= 0)
        {
            cancelToken.Cancel();
            return;
        }
        Console.WriteLine("Sum of all numbers - {0}", numbers.Sum());
        Console.WriteLine("Second task completed.");
    }
    public static void DoThirdTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Average of all numbers - {0}", numbers.Average());
        Console.WriteLine("Third task completed.");
    }
}
}

Output –


As you can see in above example, CancellationTokenSource and ParallelOptions used to cancel Parallel.Invoke method. This cancellation process is similar to the cancellation process for Parallel.For and Parallel.Foreach. In above example, DoSecondTask method has condition to cancel the Parallel.Invoke method. Once this condition is true, the parallel options will throw OperationCanceledException and Parallel.Invoke method will be cancelled.

Exception Handling –


Exception handling in Parallel.Invoke is also similar to Parallel.For and Parallel.Foreach. There is no special mechanism provided by Task Parallel Library to handle exceptions. We can use try catch block to catch exception inside parallel.invoke method. 

See below modified code to handle exceptions.

Code –
namespace ParallelInvoke
{
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
    public static ConcurrentQueue<Exception> exceptionQueue = new ConcurrentQueue<Exception>();
    static void Main(string[] args)
    {
        try
        {
            Parallel.Invoke(() =>
            {
                Console.WriteLine("Starting first task");
                Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
                DoFirstTask();
            }, () =>
            {
                Console.WriteLine("Starting second task");
                Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
                DoSecondTask();
                if (exceptionQueue.Count > 0) throw new AggregateException(exceptionQueue);
            }, () =>
            {
                Console.WriteLine("Starting third task");
                Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
                DoThirdTask();
            }
            );
        }
        catch (AggregateException ex)
        {
            foreach (Exception e in ex.InnerExceptions)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Error Message - {0}", e.InnerException.Message);
            }
        }
        Console.ReadLine();
    }
    public static void DoFirstTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Total numbers - {0}", numbers.Count());
        Console.WriteLine("First task completed.");
    }
    public static void DoSecondTask()
    {
        List<int> numbers = new List<int>();
        if (numbers.Count <= 0)
        {
            exceptionQueue.Enqueue(new Exception("numbers list is empty"));
        }
        Console.WriteLine("Sum of all numbers - {0}", numbers.Sum());
        Console.WriteLine("Second task completed.");
    }
    public static void DoThirdTask()
    {
        List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Console.WriteLine("Average of all numbers - {0}", numbers.Average());
        Console.WriteLine("Third task completed.");
    }
}
}

Output –


As you can see in above example, exception thrown from DoSecondTask method. ConcurrentQueue has been used to safely add multiple exception thrown from multiple threads. Parallel.Invoke method covered with try catch block. So if any exception thrown from delegate, this will enqueue inside ConcurrentQueue and once all tasks are completed, you can check which exceptions occurred by iterating all the InnerException.

You can download full code from Gist.

I hope this article helps you to know more about Parallel.Invoke and Task Parallel Library. Please leave your feedback in comments below.

References –

See Also –


Sunday, March 26, 2017

Chain of Responsibility Pattern


Chain of Responsibility design pattern can be used to process multiple types of request and each type of request can be handled by multiple handlers. Chain of Responsibility design pattern falls under behavioral pattern of GOF (Gang of Four) pattern.

When to use –


Chain of Responsibility pattern can be used when multiple types of request are handled by multiple handlers. If one handler is not able to process your request, then it will pass to another handler. In this pattern, each handler has reference to next handler. If one handler can’t process request, then it passes it to next handler.

Major components of Chain of Responsibility pattern –


Handler – This is an abstract class. This holds reference of next handler and also has abstract method to handler request.

Concrete Handler – This is concrete class which inherits from Handler class and implements abstract method to handle request.

Client – This is client class and create requests and passes to different handlers in the chain of responsibility pattern.

See below example of Chain of responsibility pattern.

Code –

//Handler class
public abstract class Approver
{
    public abstract void HandleRequest(int requestAmount);
    protected Approver NextApprover;
    public void SetNextApprover(Approver approver)
    {
        NextApprover = approver;
    }
}
//Concrete Handler class
public class Manager : Approver
{
    public override void HandleRequest(int requestAmount)
    {
        if (requestAmount < 100000)
            Console.WriteLine("Manager has approved amount - {0}", requestAmount);
        else if (NextApprover != null)
            NextApprover.HandleRequest(requestAmount);
    }
}
//Concrete Handler class
public class SeniorManager : Approver
{
    public override void HandleRequest(int requestAmount)
    {
        if (requestAmount < 200000)
            Console.WriteLine("Senior Manager has approved amount - {0}", requestAmount);
        else if (NextApprover != null)
            NextApprover.HandleRequest(requestAmount);
    }
}
//Concrete Handler class
public class Director : Approver
{
    public override void HandleRequest(int requestAmount)
    {
        if (requestAmount < 300000)
            Console.WriteLine("Director has approved amount - {0}", requestAmount);
        else if (NextApprover != null)
            NextApprover.HandleRequest(requestAmount);
    }
}

//Client Class
class Program
{
    static void Main(string[] args)
    {
        //Create classes of chain of responsibility
        Approver Manager = new Manager();
        Approver SeniorManager = new SeniorManager();
        Approver Director = new Director();

        //Set next request approver or handler
        Manager.SetNextApprover(SeniorManager);
        SeniorManager.SetNextApprover(Director);

        //Create requests.
        Manager.HandleRequest(75000);
        Manager.HandleRequest(150000);
        Manager.HandleRequest(250000);

        Console.ReadLine();
    }
}

Output –



As you can see in above example, Approver abstract class is inherited to respective approvers. In client class, instance of each concrete handler class created as handler class and set its next approver or handler for each concrete handler class. At last manager class handle request method called all the times. If manager class not able to process request it will pass on to its next approver and so on in chain of responsibilities classes.

You can download full code from Gist.

I hope this article helps you to know more about Chain of Responsibility pattern. Please leave your feedback in comments section below.

References –

See Also –