In my previous article, I explained about Parallel.For and Parallel.Foreach Loop in detail. In this article I will explain, how to use
Parallel.Invoke to do multiple tasks concurrently.
Parallel.For and Parallel.Foreach can be used to loop
asynchronously, but Parallel.Invoke can be used to do multiple tasks
concurrently. Parallel.Invoke method is part of System.Threading.Tasks and
accept array of action delegates as input and run all action delegates in parallel.
You can use Parallel.Invoke method when you want to do multiple
tasks in parallel. This will be always faster than calling all
the tasks synchronously. Task parallel library internally manages to divide and
run multiple task in different threads in parallel. It also manages thread
scheduling and scaling automatically as per the number of cores on your
computer. There is no guarantee in which order all your tasks are executed and
completed. See below example.
Code –
namespace ParallelInvoke
{
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Parallel.Invoke(() =>
{
Console.WriteLine("Starting first task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoFirstTask();
}, () =>
{
Console.WriteLine("Starting second task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoSecondTask();
}, () =>
{
Console.WriteLine("Starting third task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoThirdTask();
}
);
Console.ReadLine();
}
public static void DoFirstTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Total numbers -
{0}", numbers.Count());
Console.WriteLine("First task
completed.");
}
public static void DoSecondTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Sum of all numbers -
{0}", numbers.Sum());
Console.WriteLine("Second task
completed.");
}
public static void DoThirdTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Average of all
numbers - {0}",
numbers.Average());
Console.WriteLine("Third task
completed.");
}
}
}
Output –
As you can see in above example, there are three delegates assigned
to Parallel.Invoke method to execute. As i mentioned earlier that all delegates runs in different threads concurrently hence the order of task execution is different. You
can also check managed thread id for each task on which they are executing.
Cancelling Parallel.Invoke method –
You can cancel any task which is running via Parallel.Invoke
method based on certain condition. Parallel.Invoke method has overload which
accepts ParallelOptions in which you can specify cancellation token. See
Parallel.Invoke overload method which accept ParallelOptions as parameter.
public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);
See below little bit of modified code to accept cancel request.
Code –
namespace ParallelInvoke
{
using System.Threading;
using System.Threading.Tasks;
class Program
{
static CancellationTokenSource cancelToken = new CancellationTokenSource();
static void Main(string[] args)
{
ParallelOptions options = new ParallelOptions();
options.CancellationToken = cancelToken.Token;
try
{
Parallel.Invoke(options, () =>
{
Console.WriteLine("Starting first task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoFirstTask();
}, () =>
{
Console.WriteLine("Starting second task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoSecondTask();
options.CancellationToken.ThrowIfCancellationRequested();
}, () =>
{
Console.WriteLine("Starting third task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoThirdTask();
}
);
}
catch (OperationCanceledException ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Error Message -
{0}",
ex.Message);
}
Console.ReadLine();
}
public static void DoFirstTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Total numbers -
{0}", numbers.Count());
Console.WriteLine("First task
completed.");
}
public static void DoSecondTask()
{
List<int> numbers = new List<int>();
if (numbers.Count <= 0)
{
cancelToken.Cancel();
return;
}
Console.WriteLine("Sum of all numbers -
{0}", numbers.Sum());
Console.WriteLine("Second task completed.");
}
public static void DoThirdTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Average of all
numbers - {0}",
numbers.Average());
Console.WriteLine("Third task
completed.");
}
}
}
Output –
As you can see in above example, CancellationTokenSource and
ParallelOptions used to cancel Parallel.Invoke method. This cancellation process
is similar to the cancellation process for Parallel.For and Parallel.Foreach.
In above example, DoSecondTask method has condition to cancel the
Parallel.Invoke method. Once this condition is true, the parallel options will
throw OperationCanceledException and Parallel.Invoke method will be cancelled.
Exception Handling –
Exception handling in Parallel.Invoke is also similar to
Parallel.For and Parallel.Foreach. There is no special mechanism provided by
Task Parallel Library to handle exceptions. We can use try catch block to catch
exception inside parallel.invoke method.
See below modified code to handle
exceptions.
Code –
namespace ParallelInvoke
{
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
public static ConcurrentQueue<Exception> exceptionQueue = new ConcurrentQueue<Exception>();
static void Main(string[] args)
{
try
{
Parallel.Invoke(() =>
{
Console.WriteLine("Starting first task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoFirstTask();
}, () =>
{
Console.WriteLine("Starting second task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoSecondTask();
if (exceptionQueue.Count >
0) throw new AggregateException(exceptionQueue);
}, () =>
{
Console.WriteLine("Starting third task");
Console.WriteLine("Thread ID - {0}", Thread.CurrentThread.ManagedThreadId);
DoThirdTask();
}
);
}
catch (AggregateException ex)
{
foreach (Exception e in ex.InnerExceptions)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Error Message -
{0}",
e.InnerException.Message);
}
}
Console.ReadLine();
}
public static void DoFirstTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Total numbers -
{0}", numbers.Count());
Console.WriteLine("First task
completed.");
}
public static void DoSecondTask()
{
List<int> numbers = new List<int>();
if (numbers.Count <= 0)
{
exceptionQueue.Enqueue(new Exception("numbers list is
empty"));
}
Console.WriteLine("Sum of all numbers -
{0}", numbers.Sum());
Console.WriteLine("Second task
completed.");
}
public static void DoThirdTask()
{
List<int> numbers = new List<int>() { 1, 2, 3, 4, 5, 6, 7,
8, 9, 10 };
Console.WriteLine("Average of all
numbers - {0}",
numbers.Average());
Console.WriteLine("Third task
completed.");
}
}
}
Output –
As you can see in above example, exception thrown from
DoSecondTask method. ConcurrentQueue has been used to safely add multiple
exception thrown from multiple threads. Parallel.Invoke method covered with try catch block. So if any exception thrown from delegate, this will enqueue inside ConcurrentQueue and once all tasks are
completed, you can check which exceptions occurred by iterating all the InnerException.
You can download full code from Gist.
I hope this article helps you to know more about
Parallel.Invoke and Task Parallel Library. Please leave your feedback in
comments below.
References –
See Also –
Parallel.For and Parallel.Foreach – Task Parallel Library
How to cancel a Parallel.For and Parallel.Foreach loop – Task Parallel Library
How to handle exception in Parallel.For and Parallel.Foreach loop? – Task Parallel Library
How to cancel a Parallel.For and Parallel.Foreach loop – Task Parallel Library
How to handle exception in Parallel.For and Parallel.Foreach loop? – Task Parallel Library
No comments:
Post a Comment