by Jos Hickson
20 March, 2017 - 14 minute read

The actor is a widely used pattern in concurrency with strong support in languages such as Erlang and Scala. Rather than sharing data between threads and having to use locking mechanisms to synchronise access, the actor is a single entity that is responsible for a particular unshared set of data. Threads of execution that require work to be performed upon the data pass request messages to the actor. If a response is required then it is sent back by the actor to the requester via another message. These communications should be asynchronous and non-blocking. There are a number of advantages to avoiding sharing data in this way most notably, reducing code complexity and dodging the usual runtime issues such as deadlocks, which can require great care to avoid in other approaches to multi-threading.

This blog looks at issues around implementing an actor pattern that integrates with the .NET Task Parallel Library (TPL) such that the actor behaves as expected when used with C#'s async and await keywords. The aim is to provide an insight to the inner workings of the TPL as much as the actor pattern itself.

Enqueuing work

To start, let’s define an API for an actor. The API allows messages to be sent by "enqueuing work" via one of the Enqueue methods. The "work" takes the form of delegates that either return a result or not. It can also be synchronous or asynchronous, with the intention being that the work is executed on the actor’s thread with any result relayed back to the caller via a returned task object.

Actor Pattern:

using System;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class Actor
    {
        // Enqueue a piece of work that returns no result
        public Task Enqueue(Action work);

        // Enqueue a piece of work that returns a result
        public Task<T> Enqueue<T>(Func<T> work);

        // Enqueue an async piece of work that returns no result
        public Task Enqueue(Func<Task> work);

        // Enqueue an async piece of work that returns a result
        public Task<T> Enqueue<T>(Func<Task<T>> work);
    }
}

An actor should ensure that messages passed to it are handled in the order in which they are received, with no two messages being processed at the same time. Essentially, it should appear that an actor has a single dedicated thread processing messages in its queue. In reality, however, a single dedicated thread would be wasteful of resources given that most of the time it would be quiescent. Therefore it is preferable to use a thread from the .NET thread pool when required. To achieve this using the TPL it is necessary that all work passed to Enqueue is scheduled to run as a Task with an implementation of TaskScheduler. Scheduling the task via Task.Factory.StartNew would mean the TaskScheduler instance returned by TaskScheduler.Current would be used. In most circumstances this would be TaskScheduler.Default which schedules tasks in parallel on the .NET thread pool but could be something else entirely. If the simpler Task.Run were used instead, then TaskScheduler.Default would always be used. Thus using either Task.Run or Task.Factory.StartNew would provide no control over when the task were run and thus tasks associated with the same actor could be run in parallel not what we want.

Let’s illustrate this with a simple example. The code below prints "START" to console then uses an actor to calculate the square of a number and print it to the console and finally prints "END" to the console once all of the tasks returned by the actor have completed.

Example 1:

Console.WriteLine("START");

var actor = new Actor();
var tasks = Enumerable.Range(1, 10)
                      .Select(x => actor.Enqueue(() =>
                                                 {
                                                     var y = x * x;
                                                     Console.WriteLine($"{x} x {x} = {y}");
                                                 }))
                      .ToArray();

await Task.WhenAll(tasks);

Console.WriteLine("END");

We would expect the calculation results to be printed in numerical order. However, if our implementation of Enqueue were simply to start a task on the current scheduler then this would not occur, as the output below shows.

Naive implementation of Enqueue:

public Task Enqueue(Action work)
{
    return Task.Factory.StartNew(work);
}

Sample output - could be any order:

START
1 x 1 = 1
3 x 3 = 9
4 x 4 = 16
7 x 7 = 49
2 x 2 = 4
9 x 9 = 81
10 x 10 = 100
5 x 5 = 25
8 x 8 = 64
6 x 6 = 36
END

Scheduling actor tasks

The first part of the solution is to provide our own implementation of TaskScheduler called ActorTaskScheduler.

ActorTaskScheduler - version 1:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal sealed class ActorTaskScheduler : TaskScheduler
    {
        private readonly Queue<Task> _taskQueue = new Queue<Task>();
        private readonly object _syncObject = new object();
        private bool _isActive = false;

        public override int MaximumConcurrencyLevel => 1;

        protected override IEnumerable<Task> GetScheduledTasks() { throw new NotSupportedException(); }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;

        protected override void QueueTask(Task task)
        {
            lock (_syncObject)
            {
                _taskQueue.Enqueue(task);
                if (!_isActive)
                {
                    _isActive = true;
                    ThreadPool.QueueUserWorkItem(
                        _ =>
                        {
                            Task nextTask = null;
                            while ((nextTask = TryGetNextTask()) != null)
                            {
                                TryExecuteTask(nextTask);
                            }
                        });
                }
            }
        }

        private Task TryGetNextTask()
        {
            lock (_syncObject)
            {
                if (_taskQueue.Count > 0)
                {
                    return _taskQueue.Dequeue();
                }
                else
                {
                    _isActive = false;
                    return null;
                }
            }
        }
    }
}

The main work when implementing a TaskScheduler is in providing an implementation of the QueueTask method. In our case, implementing the other members is pretty trivial - the maximum concurrency level for an actor is always going to be one and we don’t want to execute tasks inline. GetScheduledTasks is only present for debugger support and throwing a NotSupportedException as we do here is perfectly acceptable.

The prime objectives of QueueTask are to add new tasks to a queue so they are processed in order and to ensure that a thread is actively processing the queued tasks. The second objective is achieved by getting a thread from the .NET thread pool to start executing the tasks in the queue in turn, unless such a thread is already active. The implementation is fairly straightforward though care is needed to avoid spawning more than one processing thread. For this reason the _isActive flag is important as one can’t simply check if the size of the queue is zero. In reality the implementation can become more complex once other considerations are taken into account. For instance, it’s probably best to not hog a thread from the thread pool for a long time whilst processing a particularly busy queue of actor work as we would do here. However, addressing such issues is beyond the scope of this blog.

Each Actor instance will have its own instance of ActorTaskScheduler and each task created will be scheduled explicitly to run on that scheduler instance. ActorTaskScheduler will do the meat of the work we require, and the implementation of the Actor class itself is mostly straightforward as shown.

Actor implementation using ActorTaskScheduler:

namespace Demo
{
    public sealed class Actor
    {
        private readonly TaskFactory _taskFactory = new TaskFactory(new ActorTaskScheduler());

        public Task Enqueue(Action work)
        {
            return _taskFactory.StartNew(work);
        }

        public Task<T> Enqueue<T>(Func<T> work)
        {
            return _taskFactory.StartNew(work);
        }

        public async Task Enqueue(Func<Task> work)
        {
            await await _taskFactory.StartNew(work).ConfigureAwait(false);
        }

        public async Task<T> Enqueue<T>(Func<Task<T>> work)
        {
            return await await _taskFactory.StartNew(work).ConfigureAwait(false);
        }
    }
}

Output using specialised scheduler:

START
1 x 1 = 1
2 x 2 = 4
3 x 3 = 9
4 x 4 = 16
5 x 5 = 25
6 x 6 = 36
7 x 7 = 49
8 x 8 = 64
9 x 9 = 81
10 x 10 = 100
END

NB: Using ConfigureAwait(false) is not integral to the implementation but is considered good practice in libraries.

So what’s with the "await await"?!

You’ll notice that in the Actor implementation the forms of Enqueue taking async delegates have "await await" as part of their implementation. We’ll look further at using these Enqueue methods later as they expose other issues but, for now, let’s look at an example usage of Enqueue(Func<Task<T>> work) to illustrate the need for "await await" as opposed to simply "await":

Example: wait 1000 milliseconds then return a number:

var value =
    await actor.Enqueue(
        async () =>
        {
            await Task.Delay(1000);
            return 47;
        });

From the point-of-view of the caller awaiting actor.Enqueue, the task of interest is not the one returned by _taskFactory.StartNew(work) but instead the task returned by the async delegate "work". A single await would simply await the completion of the former task and return its result which is the Task returned by "work". So, a further await on that returned Task is required in order to get the result of interest that is, the int value of 47 returned by "work".

In the overload of Enqueue discussed above we get some help from the compiler. If a single await were used then the compiler would expect the method’s return type to be Task<T> rather than Task. However, for the Enqueue(Func<Task> work) overload both _taskFactory.StartNew(work) and "work" have a return type of Task and so the compiler will happily let us use a single await.

To achieve this without using the await keyword twice, one would need to implement Enqueue in the following manner:

Actor.Enqueue without the await keyword:

public Task<T> Enqueue<T>(Func<Task<T>> work)
{
    var task1 = _taskFactory.StartNew(work);
    var taskCompletionSource = new TaskCompletionSource<T>();

    task1.ContinueWith(x =>
                      {
                          var task2 = x.Result;
                          task2.ContinueWith(y => taskCompletionSource.SetResult(y.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
                          task2.ContinueWith(y => taskCompletionSource.SetException(y.Exception), TaskContinuationOptions.OnlyOnFaulted);
                          task2.ContinueWith(y => taskCompletionSource.SetCanceled(), TaskContinuationOptions.OnlyOnCanceled);
                      }, TaskContinuationOptions.OnlyOnRanToCompletion);

    task1.ContinueWith(x => taskCompletionSource.SetException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
    task1.ContinueWith(x => taskCompletionSource.SetCanceled(), TaskContinuationOptions.OnlyOnCanceled);

    return taskCompletionSource.Task;
}

I think you’ll agree that the above is somewhat more involved. It does, however, provide a nice illustration of how much the await keyword can improve clarity and maintainability of code even if using "await await" requires some thought.

A leaky scheduler

There is an issue with the implementation above in that it doesn’t prevent non-actor work from being scheduled on the actor’s instance of the ActorTaskScheduler. This can be illustrated by adding Console.WriteLine("Running actor task.") just before an actor task is executed in ActorTaskScheduler.QueueTask and then running the example code below.

Example of a leaky scheduler:

var actor = new Actor();
await actor.Enqueue(() =>
                    {
                        Console.WriteLine("Hello!");
                        Task.Factory.StartNew(() => Console.WriteLine("Bonjour!"));
                    });

Output showing that scheduler is used twice rather than once:

Running actor task.
Hello!
Running actor task.
Bonjour!

Here, one would hope to see "Running actor task." just the once as Enqueue is only called once. Instead it appears twice because when Task.Factory.StartNew is called, it uses TaskScheduler.Current to schedule the task it creates. But as this is running within a task scheduled on the ActorTaskScheduler instance TaskScheduler.Current is set to that instance and not TaskScheduler.Default. The solution is simple and involves ensuring that when a task is scheduled by Enqueue then an option is set to "hide" the scheduler being used so that TaskScheduler.Current is not set to the actor’s scheduler.

Actor implementation updated to hide scheduler:

using System;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class Actor
    {
        private const TaskCreationOptions TaskCreationOptions = System.Threading.Tasks.TaskCreationOptions.HideScheduler;
        private readonly TaskFactory _taskFactory = new TaskFactory(new ActorTaskScheduler());

        public Task Enqueue(Action work)
        {
            return _taskFactory.StartNew(work, TaskCreationOptions);
        }

        public Task<T> Enqueue<T>(Func<T> work)
        {
            return _taskFactory.StartNew(work, TaskCreationOptions);
        }

        public async Task Enqueue(Func<Task> work)
        {
            await await _taskFactory.StartNew(work, TaskCreationOptions).ConfigureAwait(false);
        }

        public async Task<T> Enqueue<T>(Func<Task<T>> work)
        {
            return await await _taskFactory.StartNew(work, TaskCreationOptions).ConfigureAwait(false);
        }
    }
}

Correctly handling asynchronous work

The overloads of Enqueue that take asynchronous delegates give us one further issue that is illustrated by another simple example:

Async continuation issue example:

var actor = new Actor();
await actor.Enqueue(async () =>
                    {
                        Console.WriteLine("Should be on actor.");
                        await Task.Run(() => Console.WriteLine("Should be off actor."));
                        Console.WriteLine("Should be on actor.");
                    });

Output showing continuation not scheduled correctly:

Running actor task..
Should be on actor.
Should be off actor.
Should be on actor.

Here we have work that awaits another task partway through. Under-the-hood the lines 4 and 6 are scheduled as two distinct tasks with the code on line 6 scheduled as a continuation of the Task.Run. From the natural flow of the code it seems clear the intention is that line 6 would be run on the actor. The flow we would expect is illustrated by the diagram below:

Flow we expect

However, from the output we can see that "Running actor task" is only output once (for line 4) and so line 6 is not executed on the actor. The flow we actually get is illustrated below.

Flow we get

Why isn’t this working? Well, when a task is “awaited” then unless otherwise configured the current "execution context" is captured. When the time comes for the code after the await to be resumed, this execution context is used to obtain a "synchronization context" which is used, in turn, to schedule the code to be resumed. If we weren’t hiding the actor’s task scheduler to solve the previous problem, then the execution context captured would have a reference to that scheduler and execution would flow back to the actor’s scheduler as desired. However, we need to hide the actor’s scheduler. In order to get execution flowing back to the actor when appropriate the solution is to provide a SynchronizationContext implementation to route things correctly.

Providing a SynchronizationContext

When implementing a SynchronizationContext there are a number of methods that can be overridden but the only one of interest here is Post(SendOrPostCallback callback, object state). This is called when an asynchronous message needs dispatching and is what is called in order to resume work after an await. The code for ActorSynchronisationContext below shows how to ensure that this call to Post can get work scheduled with the actor’s task scheduler:

Implementation of ActorSynchonizationContext:

using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal sealed class ActorSynchronizationContext : SynchronizationContext
    {
        private readonly TaskFactory _scheduler;

        public ActorSynchronizationContext(ActorTaskScheduler scheduler)
        {
            _scheduler = new TaskFactory(scheduler);
        }

        public override void Post(SendOrPostCallback callback, object state)
        {
            _scheduler.StartNew(() => callback(state), TaskCreationOptions.HideScheduler);
        }
    }
}

This is used by amending the implementation of ActorTaskScheduler so that the context is set for the duration of executing tasks in the task queue:

Using ActorSynchronizationContext:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal sealed class ActorTaskScheduler : TaskScheduler
    {
        private readonly Queue<Task> _taskQueue = new Queue<Task>();
        private readonly object _syncObject = new object();
        private readonly ActorSynchronizationContext _synchronizationContext;
        private bool _isActive = false;

        public ActorTaskScheduler()
        {
            _synchronizationContext = new ActorSynchronizationContext(this);
        }

        public override int MaximumConcurrencyLevel => 1;

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            throw new NotSupportedException();
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;

        protected override void QueueTask(Task task)
        {
            lock (_syncObject)
            {
                _taskQueue.Enqueue(task);
                if (!_isActive)
                {
                    _isActive = true;
                    ThreadPool.QueueUserWorkItem(
                        _ =>
                        {
                            SynchronizationContext.SetSynchronizationContext(_synchronizationContext);
                            Task nextTask = null;

                            while ((nextTask = TryGetNextTask()) != null)
                            {
                                Console.WriteLine("Running actor task.");
                                TryExecuteTask(nextTask);
                            }

                            SynchronizationContext.SetSynchronizationContext(null);
                        });
                }
            }
        }

        private Task TryGetNextTask()
        {
            lock (_syncObject)
            {
                if (_taskQueue.Count > 0)
                {
                    return _taskQueue.Dequeue();
                }
                else
                {
                    _isActive = false;
                    return null;
                }
            }
        }
    }
}

Now our example above gives the following correct output:

Output:

Running actor task.
Should be on actor.
Should be off actor.
Running actor task.
Should be on actor.

When providing SynchronizationContext doesn’t work

Sadly, providing a custom SynchronizationContext as above won’t always work. If we change the example code so that a call to ConfigureAwait(false) is appended to Task.Run then the implementation is broken again:

Async continuation issue example using ConfigureAwait(False):

var actor = new Actor();
await actor.Enqueue(async () =>
                    {
                        Console.WriteLine("Should be on actor.");
                        await Task.Run(() => Console.WriteLine("Should be off actor.")).ConfigureAwait(false);
                        Console.WriteLine("Should be on actor???");
                    });

Output showing continuation not scheduled correctly:

Running actor task.
Should be on actor.
Should be off actor.
Should be on actor???

Using ConfigureAwait was mentioned briefly above. Basically, passing a value of false to ConfigureAwait tells the execution flow through the await to ignore the current synchronization context and use a thread from the thread pool to execute the code after the await. It is considered good practice to use ConfigureAwait(false) in library code unless you know you need the current synchronization context. However, in the actor you do need the synchronization context most of the time, so simply avoiding the extra typing involved in adding ConfigureAwait(false) is the way to avoid the problem shown above. Of course, if you’re happy for the resumed execution not to be scheduled on the actor, then go ahead and use ConfigureAwait(false).

Straightforward but potential for complexity

We have seen here that with a little knowledge of the internals of the TPL and how the async/await mechanism works, one can implement an actor pattern that integrates with async/await in a fashion that is fairly natural. The implementation is relatively straightforward; although, as noted, it becomes more complex once further factors are taken into consideration. For a much more thorough explanation of execution and synchronization contexts than I provide here, please see this MSDN article. And for a discussion on whether or not to use ConfigureAwait(false), see this this StackOverflow entry and this blog.

Open-source

The code that goes with this blog can be accessed at Winton’s GitHub page in the Winton.Blogs.TPLActor. In addition, Winton has open-sourced the full version of the actor. This is available in the GitHub project Winton.Extensions.Threading.Actor. The full version adds support for starting and stopping an actor as well as scheduling repeated tasks.