WaitAllOneByOne pattern

I’ve just started to deal with parallel programming by watching a great video from Dr. Joe Hummel on pluralsight over here. He is a brilliant author and this course is perfect for learning the basics. I decided to base on his code snippets (which are far better than the ones provided here) to practice a little bit. I’ve also included exception handling.

One of the most interesting thing was in its second module to describe the WaitAllOneByOne pattern. I’ve decided to create a generic implementation of it based on the snippets provided in the course.

This is useful if

  • We would like to wait till all tasks finish, but process results as each one completes
  • When result processing may overlap with the computation
  • Some of the tasks may fail
    public class WaitAllOneByOne<TResult> : IDisposable
    {
        private readonly List<Task<TResult>> _tasks;
        private readonly Action<TResult> _processResult;
        private readonly HandleExceptions _handleExceptions;
        private readonly bool _handleUnobservedAggregateExceptions;

        /// <summary>
        /// Delegate for handling exceptions.
        /// </summary>
        /// <param name="flattenedExceptions">The flattened exceptions of the aggregate exaptions coming from the task hierarchy.</param>
        /// <param name="observed">Whether the exception was observed when dealing with the tasks result, or it was unexpected and not caught at the Wait or Result usage.</param>
        public delegate void HandleExceptions(List<Exception> flattenedExceptions, bool observed);

        /// <summary>
        /// Initializes a new instance of the WaitAllOneByOne class.
        /// </summary>
        /// <param name="processResult">Action for processing the result.</param>
        /// <param name="handleExceptions">Action to deal with exceptions.</param>
        /// <param name="handleUnobservedAggregateExceptions">Determines whether to handle unobserved task exceptions or not. 
        /// It is generally not a good idea to handle them at too many places of the application. So this should be false.
        /// It is just showing how to handle unobserved task exceptions.</param>
        /// <param name="tasks"></param>
        public WaitAllOneByOne(Action<TResult> processResult, HandleExceptions handleExceptions, bool handleUnobservedAggregateExceptions, params Task<TResult>[] tasks)
        {
            _handleUnobservedAggregateExceptions = handleUnobservedAggregateExceptions;
            _handleExceptions = handleExceptions;
            _processResult = processResult;
            _tasks = new List<Task<TResult>>(tasks);
            if (handleUnobservedAggregateExceptions)
            {
                HandleUnobservedTaskException();
            }
        }

        /// <summary>
        /// Starts the tasks and waits for all one by one.
        /// </summary>
        /// <returns>True if at least one task did finish without an uncaught exception in it.</returns>
        public bool StartTasks()
        {
            _tasks.ForEach(t => t.Start());
            bool oneSucceded = false;

            while (_tasks.Count > 0)
            {
                // ReSharper disable once CoVariantArrayConversion 
                // as we know by the generic type parameter that the results are of the same type
                var index = Task.WaitAny(_tasks.ToArray());
                var finishedTask = _tasks[index];

                // Looking at the exception property makes it observed so it's not rethrown at GC
                if (finishedTask.Exception == null)
                {
                    _processResult(finishedTask.Result);
                    oneSucceded = true;
                }
                else
                {
                    _handleExceptions(FlattenAggregateExceptions(finishedTask.Exception), true);
                }

                _tasks.RemoveAt(index);
            }

            return oneSucceded;
        }

        private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
        {
            // Prevent rethrow on garbage collection
            _handleExceptions(FlattenAggregateExceptions(e.Exception), false);
            e.SetObserved();
        }

        private void HandleUnobservedTaskException()
        {
            TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
        }

        /// <summary>
        /// Flattens the aggregate exaptions coming from the task hierarchy into a list.
        /// </summary>
        /// <returns>The flattened list of exceptions.</returns>
        private List<Exception> FlattenAggregateExceptions(AggregateException ae)
        {
            return ae.Flatten().InnerExceptions.ToList();
        }

        public void Dispose()
        {
            if (_handleUnobservedAggregateExceptions)
            {
                TaskScheduler.UnobservedTaskException -= TaskScheduler_UnobservedTaskException;
            }
        }
    }

The code which I’ve provided here is not widely tested yet. It’s important to say, that I’m just practicing by it, use it as such, and of course watch the pluralsight course of Joe. He also talks about cancellation and a lot of further caveats…

About Tamas Nemeth

Husband and proud father in Nürnberg. I'm working as a Software Architect and an enthusiastic Clean-Coder. I spend most of my free time with my family (playing, hiking, etc...). I also play table-tennis and badminton sometimes...
This entry was posted in Technical Interest and tagged , . Bookmark the permalink.

1 Response to WaitAllOneByOne pattern

  1. Praveen P R says:

    I found that using await Task.WhenAny() could be more efficient than Tasks.WaitAny().
    1. Task.WhenAny()can work with a List and don’t need a conversion to a task array.
    2. Resharper doesn’t complain
    3. It is more async, only that “public bool StartTasks()” need to change to “public async Task StartTasks()”

Leave a comment