TPL

Why Task Parallel Library (TPL) = Great stuffs

Posted on

I just came back to office today from a short staycation and while going through the email back logs, I saw an email from a colleague asking for my practical experience with the Task Parallel Library (TPL) so far.

I’m quite delighted to share some with him and thought of just dropping some to here as well ;)

Practical usages:

  • Parallel data retrieval from different sources
    This has saved significant amount of time, e.g. we have 3 data retrievals, the 1st takes 10 seconds, the 2nd takes 7 seconds and the 3rd takes 4 seconds, rather than 21 seconds sequentially, we can get all 3 results in 10 (max) now.
  • Parallel data submission with limited concurrency
    With long running processes, we need to be careful not to overload the back end services, so when TPL comes with Limited Concurrency Task Scheduler, this becomes a life saver and we can remove all the previous code complexities to achieve the same.

Summaries:

  • The Task has greatly simplified the developer’s code to be able to perform asynchronous operation which returns values where previously it’s not straight forward to achieve the same (e.g. creaging a wrapper using threadpool)
  • Out of the box / It comes with the .Net 4 (directly shipped by microsoft), this very much makes TPL available for us (developers) and the users (client) without worrying a separate component to download
  • More good stuffs are on the way with latest .Net 4.5 Framework

The downsides:

  • Potential parallelism -> good stuff + not too good stuff, good stuff -> we don’t need to worry, not too good stuff -> there are times that we want it to happen in parallel but it’s too clever not to do parallel in heavy usage especially in low spec machine (e.g. client machine is not as good as our developer PC)
  • Unit testing -> It’s not so easy to test our code if we start mixing TPL into our code, although it’s true that we should separate the logic bit from the concurrency, but in reality sometimes we end up with unnecessary code to make it testable. I do hope more improvements to come for this, there are many being discussed in stackoverflow
Advertisements

Controlling the concurrency level using MaxDegreeOfParallelism in Parallel

Posted on

This is another nice available in the box feature when using the Parallel class, is the ability to basically limit the concurrency for the actions that we want to execute in parallel.

Why would we ever want to do this? wouldn’t it be best if we parallel some activities as many as possible?

For certain cases, we might want to be careful for not creating too many sudden requests at the same time, a good example will be if we’re triggering a long running process in our application server, you wouldn’t want to spawn too many requests at the same time to the server, as the server may not be able to handle so many requests at the same time and not surprisingly go down due to overload. This can be categorized as DOS attack, your back end guys will hate you for this, trust me :p

To set the concurrency level of the actions that you’re going to invoke with the Parallel class is pretty simple.

Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = concurrencyLevel }, actions);

Find more about the MaxDegreeOfParallelism property in msdn.
We don’t have this such of property in the TaskCreationOptions that we can simply pass for Task, but there’s a LimitedConcurrencyLevelTaskScheduler nicely available which we can achieve the same result below where i basically spawn 10 processes with 2 max concurrency level.


15-Apr-2011 21:04:33.948 - Thead#13 - Creating 10 process definitions
15-Apr-2011 21:04:33.948 - Thead#13 - Start queueing and invoking all 10 processes
15-Apr-2011 21:04:33.948 - Thead#13 - Doing something here
15-Apr-2011 21:04:33.948 - Thead#14 - Doing something here
15-Apr-2011 21:04:34.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:34.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:35.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:35.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:36.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:36.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:37.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:37.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:38.964 - Thead#13 - All processes have been completed

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace ParallelTests
{
    [TestClass]
    public class TaskSchedulerConcurrencyTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new ProcessorUsingParallel();
            p.DoProcess(processToCreate: 10, concurrencyLevel: 2);
        }
    }

    public class ProcessorUsingParallel
    {
        public void DoProcess(int processToCreate, int concurrencyLevel)
        {
            SetTurboMode();

            Debug("Creating {0} process definitions", processToCreate.ToString());

            var actions = new Action[processToCreate];
            for (int i = 0; i < processToCreate; i++)
            {
                actions[i] = () => DoSomething(1000);
            }

            Debug("Start queueing and invoking all {0} processes", processToCreate.ToString());
            var options = new ParallelOptions();
            options.MaxDegreeOfParallelism = concurrencyLevel;
            //options.TaskScheduler = new LimitedConcurrencyLevelTaskScheduler(concurrencyLevel); -- we can achieve the same result with this
            Parallel.Invoke(options, actions);

            Debug("All processes have been completed");
        }

        private void DoSomething(int Sleep)
        {
            Debug("Doing something here");
            Thread.Sleep(Sleep);
        }

        /// <summary>
        /// oh i just wish the framework would have this in place like Console.WriteLine
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private static void Debug(string format, params object[] args)
        {
            System.Diagnostics.Debug.WriteLine(
                string.Format("{0} - Thead#{1} - {2}",
                    DateTime.Now.ToString("dd-MMM-yyyy HH:mm:ss.fff"),
                    Thread.CurrentThread.ManagedThreadId.ToString(),
                    string.Format(format, args)));
        }
        /// <summary>
        /// This is not intended for production purpose
        /// </summary>
        private static void SetTurboMode()
        {
            int t, io;
            ThreadPool.GetMaxThreads(out t, out io);
            Debug("Default Max {0}, I/O: {1}", t, io);

            var success = ThreadPool.SetMinThreads(t, io);
            Debug("Successfully set Min {0}, I/O: {1}", t, io);
        }
    }
}

Using Parallel in .Net 4.0 without worrying WaitHandle.WaitAll 64 handles limitation

Posted on

As my previous post was providing an example of using the Task class from the TPL library to overcome the 64 waithandles limitation on WaitHandle.WaitAll, here’s the other alternative of code which leverage the Parallel class from the same library. Here’s the download link for the whole test project.

However, notice one thing below that I’m using ConcurrentBag<T> type which is a thread-safe bag implementation, optimized for scenarios where the same thread will be both producing and consuming data stored in the bag (taken from the msdn page).

An interesting fact with this, it takes around 74 secs to complete where if we’re not concern about return values (remove the usage of the ConcurrentBag<T>), it will only take around 28 seconds where using Task (with or without return values), it will only take 22 secs. I guess we can probably use a “state object” to pass into the invocation to hold the values for each process to avoid the locking costs, but since Task is already doing this well, it will need a justification why we need to do that manually ourselves with Parallel.Invoke solution. Moral of the story: always get the facts first by each alternative for your solution then decision will be easy and justifiable.

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

namespace ParallelismTests
{
    [TestClass]
    public class ParallelTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new ProcessorUsingParallel();
            var counts = p.DoProcess(hiLevelParallelism: 1000, loLevelParallelism: 1000);

            Assert.AreEqual(1000 * 1000, counts);
        }
    }

    public class ProcessorUsingParallel
    {
        public int DoProcess(int hiLevelParallelism, int loLevelParallelism)
        {
            Utility.SetTurboMode();

            Utility.Debug("Start queueing {0} high level processes", hiLevelParallelism.ToString());

            var counts = new ConcurrentBag<int>();
            var r = new Random();

            var actions = new Action[hiLevelParallelism];
            for (int i = 0; i < hiLevelParallelism; i++)
            {
                actions[i] = () => counts.Add(DoHighLevelProcess(r, loLevelParallelism));
            }

            Utility.Debug("Invoking all {0} high level tasks", hiLevelParallelism.ToString());
            Parallel.Invoke(actions);

            Utility.Debug("All processes have been completed");
            return counts.Sum(t => t);
        }

        private int DoHighLevelProcess(Random r, int loLevelParallelism)
        {
            var counts = new ConcurrentBag<int>();
            var actions = new Action[loLevelParallelism];
            for (int i = 0; i < loLevelParallelism; i++)
            {
                actions[i] = () => counts.Add(DoLowLevelProcess(r.Next(1, 10)));
            }
            Parallel.Invoke(actions);

            Utility.Debug("DoHighLevelProcess - Completed with {0} LowLeveProcesses", loLevelParallelism);
            return counts.Sum(t => t);
        }
        private int DoLowLevelProcess(int Sleep)
        {
            Thread.Sleep(Sleep);
            return 1;
        }
    }
}

Using Task in .Net 4.0 without worrying WaitHandle.WaitAll 64 handles limitation

Posted on

In my previous post, I was looking into having 2 level of concurrent processes, creating concurrent processes which will spawn another concurrent processes each.

Now by using WaitHandle.WaitAll, we can see there’s a limit of 64 waithandles for waiting, so with the new Task Parallel Library in .Net 4.0, we don’t have to worry about that kind of limitation anymore.

Using the code in the previous post, I then used the Task class to perform the concurrent processes. Notice the code below, I’m actually create 1.000 high level processes which will create 1.000 low level processes each, so in total 1.000.000 processes. And another very nice thing here is that we can actually return a value from each task very easily compared to the previous solution using Threadpool.

One more interesting here is that, noticed the line#40 & #54: Task.WaitAll(tasks.ToArray());, with that line in place, it would only take 20+ seconds to complete where without it (remove/comment those lines), it would then take 11 minutes. That a pretty huge difference ;)

Reason: I think it makes sense, because without that line, we will be waiting for the task completion sequentially in the SUM operation when Task.Result was being called, where the completion of 1 task with the next/previous task in the collection might be different, where Task.WaitAll will basically get all the signals by each task as when it completed, so when the SUM operation is being called, all the results are already available without any need to wait anymore :)

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using System.Threading;

namespace ParallelismTests
{
    [TestClass]
    public class TaskTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new ProcessorUsingTask();
            var counts = p.DoProcess(hiLevelParallelism: 1000, loLevelParallelism: 1000);

            Assert.AreEqual(1000*1000, counts);
        }
    }

    public class ProcessorUsingTask
    {
        public int DoProcess(int hiLevelParallelism, int loLevelParallelism)
        {
            SetTurboMode();

            Debug("Start queueing {0} high level processes", hiLevelParallelism.ToString());

            var tasks = new List<Task<int>>();
            var r = new Random();
            for (int i = 0; i < hiLevelParallelism; i++)
            {
                var task = Task.Factory.StartNew<int>(() => DoHighLevelProcess(r, loLevelParallelism));
                tasks.Add(task);
            }

            Debug("Waiting for all {0} high level tasks to complete", hiLevelParallelism.ToString());
            Task.WaitAll(tasks.ToArray()); // try comment this line out and see the performance impact :)

            Debug("All processes have been completed");
            return tasks.Sum(t => t.Result);
        }

        private int DoHighLevelProcess(Random r, int loLevelParallelism)
        {
            var tasks = new List<Task<int>>();
            for (int i = 0; i < loLevelParallelism; i++)
            {
                var task = Task.Factory.StartNew<int>(() => DoLowLevelProcess(r.Next(1, 10)));
                tasks.Add(task);
            }
            Task.WaitAll(tasks.ToArray()); // try comment this line out and see the performance impact :)
            
            Debug("DoHighLevelProcess - Completed with {0} LowLeveProcesses", loLevelParallelism);
            return tasks.Sum(t => t.Result);
        }
        private int DoLowLevelProcess(int Sleep)
        {
            Thread.Sleep(Sleep);
            //Debug("DoLowLevelProcess - Completed after {0} ms", Sleep.ToString());
            return 1;
        }

        /// <summary>
        /// oh i just wish the framework would have this in place like Console.WriteLine
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private static void Debug(string format, params object[] args)
        {
            System.Diagnostics.Debug.WriteLine(
                string.Format("{0} - Thead#{1} - {2}",
                    DateTime.Now.ToString("dd-MMM-yyyy HH:mm:ss.fff"),
                    Thread.CurrentThread.ManagedThreadId.ToString(),
                    string.Format(format, args)));
        }
        /// <summary>
        /// This is not intended for production purpose
        /// </summary>
        private static void SetTurboMode()
        {
            int t, io;
            ThreadPool.GetMaxThreads(out t, out io);
            Debug("Default Max {0}, I/O: {1}", t, io);

            var success = ThreadPool.SetMinThreads(t, io);
            Debug("Successfully set Min {0}, I/O: {1}", t, io);
        }
    }
}