Multithreading

Why Task Parallel Library (TPL) = Great stuffs

Posted on

I just came back to office today from a short staycation and while going through the email back logs, I saw an email from a colleague asking for my practical experience with the Task Parallel Library (TPL) so far.

I’m quite delighted to share some with him and thought of just dropping some to here as well ;)

Practical usages:

  • Parallel data retrieval from different sources
    This has saved significant amount of time, e.g. we have 3 data retrievals, the 1st takes 10 seconds, the 2nd takes 7 seconds and the 3rd takes 4 seconds, rather than 21 seconds sequentially, we can get all 3 results in 10 (max) now.
  • Parallel data submission with limited concurrency
    With long running processes, we need to be careful not to overload the back end services, so when TPL comes with Limited Concurrency Task Scheduler, this becomes a life saver and we can remove all the previous code complexities to achieve the same.

Summaries:

  • The Task has greatly simplified the developer’s code to be able to perform asynchronous operation which returns values where previously it’s not straight forward to achieve the same (e.g. creaging a wrapper using threadpool)
  • Out of the box / It comes with the .Net 4 (directly shipped by microsoft), this very much makes TPL available for us (developers) and the users (client) without worrying a separate component to download
  • More good stuffs are on the way with latest .Net 4.5 Framework

The downsides:

  • Potential parallelism -> good stuff + not too good stuff, good stuff -> we don’t need to worry, not too good stuff -> there are times that we want it to happen in parallel but it’s too clever not to do parallel in heavy usage especially in low spec machine (e.g. client machine is not as good as our developer PC)
  • Unit testing -> It’s not so easy to test our code if we start mixing TPL into our code, although it’s true that we should separate the logic bit from the concurrency, but in reality sometimes we end up with unnecessary code to make it testable. I do hope more improvements to come for this, there are many being discussed in stackoverflow
Advertisements

Controlling the concurrency level using MaxDegreeOfParallelism in Parallel

Posted on

This is another nice available in the box feature when using the Parallel class, is the ability to basically limit the concurrency for the actions that we want to execute in parallel.

Why would we ever want to do this? wouldn’t it be best if we parallel some activities as many as possible?

For certain cases, we might want to be careful for not creating too many sudden requests at the same time, a good example will be if we’re triggering a long running process in our application server, you wouldn’t want to spawn too many requests at the same time to the server, as the server may not be able to handle so many requests at the same time and not surprisingly go down due to overload. This can be categorized as DOS attack, your back end guys will hate you for this, trust me :p

To set the concurrency level of the actions that you’re going to invoke with the Parallel class is pretty simple.

Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = concurrencyLevel }, actions);

Find more about the MaxDegreeOfParallelism property in msdn.
We don’t have this such of property in the TaskCreationOptions that we can simply pass for Task, but there’s a LimitedConcurrencyLevelTaskScheduler nicely available which we can achieve the same result below where i basically spawn 10 processes with 2 max concurrency level.


15-Apr-2011 21:04:33.948 - Thead#13 - Creating 10 process definitions
15-Apr-2011 21:04:33.948 - Thead#13 - Start queueing and invoking all 10 processes
15-Apr-2011 21:04:33.948 - Thead#13 - Doing something here
15-Apr-2011 21:04:33.948 - Thead#14 - Doing something here
15-Apr-2011 21:04:34.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:34.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:35.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:35.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:36.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:36.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:37.964 - Thead#13 - Doing something here
15-Apr-2011 21:04:37.964 - Thead#14 - Doing something here
15-Apr-2011 21:04:38.964 - Thead#13 - All processes have been completed

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace ParallelTests
{
    [TestClass]
    public class TaskSchedulerConcurrencyTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new ProcessorUsingParallel();
            p.DoProcess(processToCreate: 10, concurrencyLevel: 2);
        }
    }

    public class ProcessorUsingParallel
    {
        public void DoProcess(int processToCreate, int concurrencyLevel)
        {
            SetTurboMode();

            Debug("Creating {0} process definitions", processToCreate.ToString());

            var actions = new Action[processToCreate];
            for (int i = 0; i < processToCreate; i++)
            {
                actions[i] = () => DoSomething(1000);
            }

            Debug("Start queueing and invoking all {0} processes", processToCreate.ToString());
            var options = new ParallelOptions();
            options.MaxDegreeOfParallelism = concurrencyLevel;
            //options.TaskScheduler = new LimitedConcurrencyLevelTaskScheduler(concurrencyLevel); -- we can achieve the same result with this
            Parallel.Invoke(options, actions);

            Debug("All processes have been completed");
        }

        private void DoSomething(int Sleep)
        {
            Debug("Doing something here");
            Thread.Sleep(Sleep);
        }

        /// <summary>
        /// oh i just wish the framework would have this in place like Console.WriteLine
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private static void Debug(string format, params object[] args)
        {
            System.Diagnostics.Debug.WriteLine(
                string.Format("{0} - Thead#{1} - {2}",
                    DateTime.Now.ToString("dd-MMM-yyyy HH:mm:ss.fff"),
                    Thread.CurrentThread.ManagedThreadId.ToString(),
                    string.Format(format, args)));
        }
        /// <summary>
        /// This is not intended for production purpose
        /// </summary>
        private static void SetTurboMode()
        {
            int t, io;
            ThreadPool.GetMaxThreads(out t, out io);
            Debug("Default Max {0}, I/O: {1}", t, io);

            var success = ThreadPool.SetMinThreads(t, io);
            Debug("Successfully set Min {0}, I/O: {1}", t, io);
        }
    }
}

Using Parallel in .Net 4.0 without worrying WaitHandle.WaitAll 64 handles limitation

Posted on

As my previous post was providing an example of using the Task class from the TPL library to overcome the 64 waithandles limitation on WaitHandle.WaitAll, here’s the other alternative of code which leverage the Parallel class from the same library. Here’s the download link for the whole test project.

However, notice one thing below that I’m using ConcurrentBag<T> type which is a thread-safe bag implementation, optimized for scenarios where the same thread will be both producing and consuming data stored in the bag (taken from the msdn page).

An interesting fact with this, it takes around 74 secs to complete where if we’re not concern about return values (remove the usage of the ConcurrentBag<T>), it will only take around 28 seconds where using Task (with or without return values), it will only take 22 secs. I guess we can probably use a “state object” to pass into the invocation to hold the values for each process to avoid the locking costs, but since Task is already doing this well, it will need a justification why we need to do that manually ourselves with Parallel.Invoke solution. Moral of the story: always get the facts first by each alternative for your solution then decision will be easy and justifiable.

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

namespace ParallelismTests
{
    [TestClass]
    public class ParallelTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new ProcessorUsingParallel();
            var counts = p.DoProcess(hiLevelParallelism: 1000, loLevelParallelism: 1000);

            Assert.AreEqual(1000 * 1000, counts);
        }
    }

    public class ProcessorUsingParallel
    {
        public int DoProcess(int hiLevelParallelism, int loLevelParallelism)
        {
            Utility.SetTurboMode();

            Utility.Debug("Start queueing {0} high level processes", hiLevelParallelism.ToString());

            var counts = new ConcurrentBag<int>();
            var r = new Random();

            var actions = new Action[hiLevelParallelism];
            for (int i = 0; i < hiLevelParallelism; i++)
            {
                actions[i] = () => counts.Add(DoHighLevelProcess(r, loLevelParallelism));
            }

            Utility.Debug("Invoking all {0} high level tasks", hiLevelParallelism.ToString());
            Parallel.Invoke(actions);

            Utility.Debug("All processes have been completed");
            return counts.Sum(t => t);
        }

        private int DoHighLevelProcess(Random r, int loLevelParallelism)
        {
            var counts = new ConcurrentBag<int>();
            var actions = new Action[loLevelParallelism];
            for (int i = 0; i < loLevelParallelism; i++)
            {
                actions[i] = () => counts.Add(DoLowLevelProcess(r.Next(1, 10)));
            }
            Parallel.Invoke(actions);

            Utility.Debug("DoHighLevelProcess - Completed with {0} LowLeveProcesses", loLevelParallelism);
            return counts.Sum(t => t);
        }
        private int DoLowLevelProcess(int Sleep)
        {
            Thread.Sleep(Sleep);
            return 1;
        }
    }
}

Using Task in .Net 4.0 without worrying WaitHandle.WaitAll 64 handles limitation

Posted on

In my previous post, I was looking into having 2 level of concurrent processes, creating concurrent processes which will spawn another concurrent processes each.

Now by using WaitHandle.WaitAll, we can see there’s a limit of 64 waithandles for waiting, so with the new Task Parallel Library in .Net 4.0, we don’t have to worry about that kind of limitation anymore.

Using the code in the previous post, I then used the Task class to perform the concurrent processes. Notice the code below, I’m actually create 1.000 high level processes which will create 1.000 low level processes each, so in total 1.000.000 processes. And another very nice thing here is that we can actually return a value from each task very easily compared to the previous solution using Threadpool.

One more interesting here is that, noticed the line#40 & #54: Task.WaitAll(tasks.ToArray());, with that line in place, it would only take 20+ seconds to complete where without it (remove/comment those lines), it would then take 11 minutes. That a pretty huge difference ;)

Reason: I think it makes sense, because without that line, we will be waiting for the task completion sequentially in the SUM operation when Task.Result was being called, where the completion of 1 task with the next/previous task in the collection might be different, where Task.WaitAll will basically get all the signals by each task as when it completed, so when the SUM operation is being called, all the results are already available without any need to wait anymore :)

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using System.Threading;

namespace ParallelismTests
{
    [TestClass]
    public class TaskTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new ProcessorUsingTask();
            var counts = p.DoProcess(hiLevelParallelism: 1000, loLevelParallelism: 1000);

            Assert.AreEqual(1000*1000, counts);
        }
    }

    public class ProcessorUsingTask
    {
        public int DoProcess(int hiLevelParallelism, int loLevelParallelism)
        {
            SetTurboMode();

            Debug("Start queueing {0} high level processes", hiLevelParallelism.ToString());

            var tasks = new List<Task<int>>();
            var r = new Random();
            for (int i = 0; i < hiLevelParallelism; i++)
            {
                var task = Task.Factory.StartNew<int>(() => DoHighLevelProcess(r, loLevelParallelism));
                tasks.Add(task);
            }

            Debug("Waiting for all {0} high level tasks to complete", hiLevelParallelism.ToString());
            Task.WaitAll(tasks.ToArray()); // try comment this line out and see the performance impact :)

            Debug("All processes have been completed");
            return tasks.Sum(t => t.Result);
        }

        private int DoHighLevelProcess(Random r, int loLevelParallelism)
        {
            var tasks = new List<Task<int>>();
            for (int i = 0; i < loLevelParallelism; i++)
            {
                var task = Task.Factory.StartNew<int>(() => DoLowLevelProcess(r.Next(1, 10)));
                tasks.Add(task);
            }
            Task.WaitAll(tasks.ToArray()); // try comment this line out and see the performance impact :)
            
            Debug("DoHighLevelProcess - Completed with {0} LowLeveProcesses", loLevelParallelism);
            return tasks.Sum(t => t.Result);
        }
        private int DoLowLevelProcess(int Sleep)
        {
            Thread.Sleep(Sleep);
            //Debug("DoLowLevelProcess - Completed after {0} ms", Sleep.ToString());
            return 1;
        }

        /// <summary>
        /// oh i just wish the framework would have this in place like Console.WriteLine
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private static void Debug(string format, params object[] args)
        {
            System.Diagnostics.Debug.WriteLine(
                string.Format("{0} - Thead#{1} - {2}",
                    DateTime.Now.ToString("dd-MMM-yyyy HH:mm:ss.fff"),
                    Thread.CurrentThread.ManagedThreadId.ToString(),
                    string.Format(format, args)));
        }
        /// <summary>
        /// This is not intended for production purpose
        /// </summary>
        private static void SetTurboMode()
        {
            int t, io;
            ThreadPool.GetMaxThreads(out t, out io);
            Debug("Default Max {0}, I/O: {1}", t, io);

            var success = ThreadPool.SetMinThreads(t, io);
            Debug("Successfully set Min {0}, I/O: {1}", t, io);
        }
    }
}

Demystifying 64 handles limit in WaitHandle.WaitAll

Posted on

I have this doubt about this 64 handles limit recently for a while. We have been trying to increase the degree of parallelism in the existing application to optimize the performance and also to fully utilize the scaled out servers that we have on the grid.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

In the initial stage when I started the changes, I hit the following exception System.NotSupportedException: The number of WaitHandles must be less than or equal to 64. We then introduced some threshold limit for the degree of parallelism to avoid this exception. So all are working fine afterwards.

Now after one change to another, we started to see the opportunity of increasing the parallelism degree such as below, this brought me into a doubt where this 64 handles limitation is being applied (process / app domain level) and whether this recursive method will trigger the same issue if in the end we’re waiting for more than 64 handles for all layers.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Now if we RTFM, it basically says “The WaitAll method returns when all the handles are signaled. On some implementations, if more than 64 handles are passed, a NotSupportedException is thrown”

Just to confirm this, I created a test below which will basically execute x number of high level parallel processes which will then also execute x number low level parallel processes based on the provided parameters. So the code below will try to create around 64 * 64 = 4096 handles in totals

var p = new Processor();
p.DoProcess(hiLevelParallelism : 64, loLevelParallelism : 64);

Results

05-Apr-2011 00:42:06.781 - Thead#13 - Start queueing 64 high level processes
05-Apr-2011 00:42:06.815 - Thead#13 - Waiting for all 64 high level handles to complete
05-Apr-2011 00:42:07.049 - Thead#17 - DoHighLevelProcess - Completed with 64 LowLeveProcesses
05-Apr-2011 00:42:07.141 - Thead#15 - DoHighLevelProcess - Completed with 64 LowLeveProcesses
05-Apr-2011 00:42:07.287 - Thead#24 - DoHighLevelProcess - Completed with 64 LowLeveProcesses
..
.. omitted 58 similar lines here
..
05-Apr-2011 00:42:08.087 - Thead#28 - DoHighLevelProcess - Completed with 64 LowLeveProcesses
05-Apr-2011 00:42:08.179 - Thead#27 - DoHighLevelProcess - Completed with 64 LowLeveProcesses
05-Apr-2011 00:42:08.297 - Thead#17 - DoHighLevelProcess - Completed with 64 LowLeveProcesses
05-Apr-2011 00:42:08.455 - Thead#13 - All processes have been completed

It always feel much better if we see it running in the code, doesn’t it? :)

Source code

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading;
using System.Diagnostics;

namespace ParallelismTests
{
    [TestClass]
    public class WaitHandleTest
    {
        [TestMethod]
        public void TestMethod1()
        {
            var p = new Processor();
            p.DoProcess(hiLevelParallelism : 64, loLevelParallelism : 64);
        }
    }

    public class Processor
    {
        public void DoProcess(int hiLevelParallelism, int loLevelParallelism)
        {
            SetTurboMode();

            Debug("Start queueing {0} high level processes", hiLevelParallelism.ToString());

            var r = new Random();
            var hiLevelHandles = new AutoResetEvent[hiLevelParallelism];
            for (int i = 0; i < hiLevelParallelism; i++)
            {
                var hiLevelHandle = new AutoResetEvent(false);
                hiLevelHandles[i] = hiLevelHandle;
                ThreadPool.QueueUserWorkItem((s) =>
                {
                    DoHighLevelProcess(r, loLevelParallelism);
                    hiLevelHandle.Set();
                });
            }

            Debug("Waiting for all {0} high level handles to complete", hiLevelParallelism.ToString());
            WaitHandle.WaitAll(hiLevelHandles);

            Debug("All processes have been completed");
        }

        private void DoHighLevelProcess(Random r, int loLevelParallelism)
        {
            var loLevelHandles = new AutoResetEvent[loLevelParallelism];
            for (int i = 0; i < loLevelParallelism; i++)
            {
                var loLevelHandle = new AutoResetEvent(false);
                loLevelHandles[i] = loLevelHandle;
                ThreadPool.QueueUserWorkItem((s) =>
                {
                    DoLowLevelProcess(r.Next(1, 10));
                    loLevelHandle.Set();
                });
            }
            WaitHandle.WaitAll(loLevelHandles);

            Debug("DoHighLevelProcess - Completed with {0} LowLeveProcesses", loLevelParallelism);
        }
        private void DoLowLevelProcess(int Sleep)
        {
            Thread.Sleep(Sleep);
            //Debug("DoLowLevelProcess - Completed after {0} ms", Sleep.ToString());
        }

        /// <summary>
        /// oh i just wish the framework would have this in place like Console.WriteLine
        /// </summary>
        /// <param name="format"></param>
        /// <param name="<span class=" />args">
        private static void Debug(string format, params object[] args)
        {
            System.Diagnostics.Debug.WriteLine(
                string.Format("{0} - Thead#{1} - {2}",
                    DateTime.Now.ToString("dd-MMM-yyyy HH:mm:ss.fff"),
                    Thread.CurrentThread.ManagedThreadId.ToString(),
                    string.Format(format, args)));
        }
        /// <summary>
        /// This is not intended for production purpose
        /// </summary>
        private static void SetTurboMode()
        {
            int t, io;
            ThreadPool.GetMaxThreads(out t, out io);
            Debug("Default Max {0}, I/O: {1}", t, io);

            var success = ThreadPool.SetMinThreads(t, io);
            Debug("Successfully set Min {0}, I/O: {1}", t, io);
        }
    }
}

Extension over an extension for Dispatcher

Posted on

I’m working on a migration of an existing winform control in our WPF app into a new WPF shiny control ;)

Interestingly it was then breaking with the exception: The calling thread must be STA, because many UI components require this when trying to perform some action on the WPF control where it’s working fine for the existing winform version.

Turned out that this was due to the code was being invoked from a background thread through threadpool. Now it’s pretty easy to resolve this, we can use the Dispatcher property which is inherited by DispatcherObject abstract class which is the base class of WPF User Control.

 

 

 

 

 

 

 

Now the Dispatcher class itself turned out to have a DispatcherExtensions class (below) as well which accept Action type

 

 

 

 

 

 

So in order to fix the calling thread exception, we just need to have the following code.

void OnSomeRequest()
{
    if (!Dispatcher.CheckAccess())
    {
        Dispatcher.Invoke(() => OnSomeRequest());
    }
    else
    {
        someUserControl.DoSomething();
    }
}

Now if we only have 1 certain place which will perform some UI update in the code, this is okay, but imagine if you have many places in the code that you need to repeat the pattern above :p

So we can simplify the above again into below, we have reduced the code from 8 lines into just 1 line :)

void OnSomeRequest()
{
    Dispatcher.InvokeIfRequired(() => someUserControl.DoSomething());
}

The new extension over existing DispatcherExtension is simple.

using System;
using System.Windows.Threading;

namespace ExtensionTests
{
    public static class AnotherDispatcherExtensions
    {
        public static void InvokeIfRequired(this Dispatcher dispatcher, Action action)
        {
            if (!dispatcher.CheckAccess())
            {
                dispatcher.Invoke(action);
            }
            else
            {
                action();
            }
        }
    }
}

Note that we can always add more extensions into this class, for me, I only have one above as I only require it, simple and only introduce when you need it :)

Eric De Carufel has written the more complete extensions as well in his blog post
And if you’re wondering why VS intellisense is giving you a false alarm that CheckAccess is not available for Dispatcher, Claus Konrad explained it in his blog post

Bulk Processing strategy

Posted on

Have been working out some performance related stuffs lately, specifically about a page where the user can perform bulk upload in addition to manual entry by row.

From the analysis, we have found couple of hot spots to optimize:

  • No caching is being used, same calls are repeatedly being made which cause unnecessary round trips which is very costly especially if we’re on 3-tiers kind of architecture
  • Validation is being made by row, so again this causes round trips being made
  • Asynchronous/background processing is in place, but not optimized

By introducing caching and single bulk validation here, the performance gain is quite significant, especially when we’re doing bulk processing of > 1000 items.

The interesting part is actually the asynchronous/background processing. The existing processing of the data is actually by each row but we’re pushing each of them to the threadpool to process. So logically each of the row will try to fetch its own data slice and load into the data grid. Now, this is quite an interesting model to find for me.

Now when i realized about this, I quickly opened up the task manager and show the threads count column in it, when I start the bulk process, I can see that the thread count is increasing 1 by 1 at a second rate, now this looks like a slow start up / thread creation issue on the thread pool.

So I added some codes in during the app start up, basically to just set the min threads to a higher number, I tried to get the existing min thread value and it was returning only 2 :| So i then put the min to 500  for my machine (2 cores where @core = 250 threads).

As soon as I started the bulk processing, the process which would previously take 1-2 minutes, it completes in less than 10 seconds, how cool was that :D

So in the end everyone was pretty happy with this, imagine that with all code changes above, we can complete a process in 10 seconds where previously can take 5 minutes.

Now, the not so happy part, we can’t really set the max for the Threadpool (for most situation), why?

  • Each thread will consume 1 MB by default, so if you’re setting this to max, be ready to get the memory usage spiked ( 500 MB in seconds on my case). Would the user have the machine spec as powerful as us developers? hhhmmm… don’t think so :p
  • Are your services ready to serve these requests? imagine if you only have cashier queue in the store and suddenly there are 500 customers want to check out in almost a same time, imagine how the customers behind the queue will be pissed off and left without buying in the end (timed out in application)
  • Threadpool is a static class and it’s per app domain basis, so it would be used through out the app domain effectively, again related to the last point above, there are parts in the application that you would want to have low concurrent processing and there are parts in the application that need to have a high concurrent processing

So what’s the right magic number then? as all consultant would say, it depends :p but it sure it’s true, you need to analyse the factors above and make the judgement or assumption for it and make sure it’s configurable :)

If you’re not satisfied with the existing Threadpool, you might probably want to try the Smart Thread Pool by Ami Bar, especially if you have a scenario where you have low concurrent and high concurrent in different parts of the application, you can set the Concurrent level property which under the cover is just setting the max thread property.

I think it’s easier for me to do this optimization analysis because the functionality is already there and working, it will be a different situation if I were the developer of that page where I basically more worried about the complexity of the functionalities to deliver and they might have a lower number assumption of the bulk processing that the page needs to support (assumption of 100 is very far different with 10K).

It’s always interesting to discuss about functionality vs. performance, which one is the first to do? for me, it makes more sense to have the functionality first, get it right then make it fast. Although with the consequences that it’s too rigid for us to change when we need to speed it up, but hey, that’s why we get paid right? ;) just imagine how excited the user would be when they feel the new speed :)