by Jos Hickson
on 20th Mar 2017
Estimated reading time: 20 minutes
The actor is a widely used pattern in concurrency with strong support in languages such as Erlang and Scala. Rather than sharing data between threads and having to use locking mechanisms to synchronise access, the actor is a single entity that is responsible for a particular unshared set of data. Threads of execution that require work to be performed upon the data pass request messages to the actor. If a response is required then it is sent back by the actor to the requester via another message. These communications should be asynchronous and non-blocking. There are a number of advantages to avoiding sharing data in this way most notably, reducing code complexity and dodging the usual runtime issues such as deadlocks, which can require great care to avoid in other approaches to multi-threading.
This blog looks at issues around implementing an actor pattern that integrates with the .NET Task Parallel Library (TPL) such that the actor behaves as expected when used with C#'s async and await keywords. The aim is to provide an insight to the inner workings of the TPL as much as the actor pattern itself.
To start, let's define an API for an actor. The API allows messages to be sent by "enqueuing work" via one of the Enqueue methods. The "work" takes the form of delegates that either return a result or not. It can also be synchronous or asynchronous, with the intention being that the work is executed on the actor's thread with any result relayed back to the caller via a returned task object.
An actor should ensure that messages passed to it are handled in the order in which they are received, with no two messages being processed at the same time. Essentially, it should appear that an actor has a single dedicated thread processing messages in its queue. In reality, however, a single dedicated thread would be wasteful of resources given that most of the time it would be quiescent. Therefore it is preferable to use a thread from the .NET thread pool when required. To achieve this using the TPL it is necessary that all work passed to Enqueue is scheduled to run as a Task with an implementation of TaskScheduler. Scheduling the task via Task.Factory.StartNew would mean the TaskScheduler instance returned by TaskScheduler.Current would be used. In most circumstances this would be TaskScheduler.Default which schedules tasks in parallel on the .NET thread pool but could be something else entirely. If the simpler Task.Run were used instead, then TaskScheduler.Default would always be used. Thus using either Task.Run or Task.Factory.StartNew would provide no control over when the task were run and thus tasks associated with the same actor could be run in parallel not what we want.
Let's illustrate this with a simple example. The code below prints "START" to console then uses an actor to calculate the square of a number and print it to the console and finally prints "END" to the console once all of the tasks returned by the actor have completed.
We would expect the calculation results to be printed in numerical order. However, if our implementation of Enqueue were simply to start a task on the current scheduler then this would not occur, as the output below shows.
The first part of the solution is to provide our own implementation of TaskScheduler called ActorTaskScheduler:
The main work when implementing a TaskScheduler is in providing an implementation of the QueueTask method. In our case, implementing the other members is pretty trivial - the maximum concurrency level for an actor is always going to be one and we don't want to execute tasks inline. GetScheduledTasks is only present for debugger support and throwing a NotSupportedException as we do here is perfectly acceptable.
The prime objectives of QueueTask are to add new tasks to a queue so they are processed in order and to ensure that a thread is actively processing the queued tasks. The second objective is achieved by getting a thread from the .NET thread pool to start executing the tasks in the queue in turn, unless such a thread is already active. The implementation is fairly straightforward though care is needed to avoid spawning more than one processing thread. For this reason the _isActive flag is important as one can't simply check if the size of the queue is zero. In reality the implementation can become more complex once other considerations are taken into account. For instance, it's probably best to not hog a thread from the thread pool for a long time whilst processing a particularly busy queue of actor work as we would do here. However, addressing such issues is beyond the scope of this blog.
Each Actor instance will have its own instance of ActorTaskScheduler and each task created will be scheduled explicitly to run on that scheduler instance. ActorTaskScheduler will do the meat of the work we require, and the implementation of the Actor class itself is mostly straightforward as shown.
NB: Using ConfigureAwait(false) is not integral to the implementation but is considered good practice in libraries.
You'll notice that in the Actor implementation the forms of Enqueue taking async delegates have "await await" as part of their implementation. We'll look further at using these Enqueue methods later as they expose other issues but, for now, let's look at an example usage of Enqueue(Func<Task<T>> work) to illustrate the need for "await await" as opposed to simply "await":
From the point-of-view of the caller awaiting actor.Enqueue, the task of interest is not the one returned by _taskFactory.StartNew(work) but instead the task returned by the async delegate "work". A single "await" would simply await the completion of the former task and return its result which is the Task returned by "work". So, a further await on that returned Task is required in order to get the result of interest that is, the int value of 47 returned by "work".
In the overload of Enqueue discussed above we get some help from the compiler. If a single await were used then the compiler would expect the method's return type to be Task<T> rather than Task. However, for the Enqueue(Func<Task> work) overload both _taskFactory.StartNew(work) and "work" have a return type of Task and so the compiler will happily let us use a single await.
To achieve this without using the await keyword twice, one would need to implement Enqueue in the following manner:
I think you'll agree that the above is somewhat more involved. It does, however, provide a nice illustration of how much the await keyword can improve clarity and maintainability of code even if using await await requires some thought.
There is an issue with the implementation above in that it doesn't prevent non-actor work from being scheduled on the actor's instance of the ActorTaskScheduler. This can be illustrated by adding Console.WriteLine("Running actor task.") just before an actor task is executed in ActorTaskScheduler.QueueTask and then running the example code below.
Here, one would hope to see "Running actor task." just the once as Enqueue is only called once. Instead it appears twice because when Task.Factory.StartNew is called, it uses TaskScheduler.Current to schedule the task it creates. But as this is running within a task scheduled on the ActorTaskScheduler instance TaskScheduler.Current is set to that instance and not TaskScheduler.Default. The solution is simple and involves ensuring that when a task is scheduled by Enqueue then an option is set to "hide" the scheduler being used so that TaskScheduler.Current is not set to the actor's scheduler.
The overloads of Enqueue that take asynchronous delegates give us one further issue that is illustrated by another simple example:
Here we have work that awaits another task partway through. Under-the-hood the lines 4 and 6 are scheduled as two distinct tasks with the code on line 6 scheduled as a continuation of the Task.Run. From the natural flow of the code it seems clear the intention is that line 6 would be run on the actor. The flow we would expect is illustrated by the diagram below:
However, from the output we can see that "Running actor task" is only output once (for line 4) and so line 6 is not executed on the actor. The flow we actually get is illustrated below.
Why isn't this working? Well, when a task is “awaited” then unless otherwise configured the current "execution context" is captured. When the time comes for the code after the await to be resumed, this execution context is used to obtain a "synchronization context" which is used, in turn, to schedule the code to be resumed. If we weren't hiding the actor's task scheduler to solve the previous problem, then the execution context captured would have a reference to that scheduler and execution would flow back to the actor's scheduler as desired. However, we need to hide the actor's scheduler. In order to get execution flowing back to the actor when appropriate the solution is to provide a SynchronizationContext implementation to route things correctly.
When implementing a SynchronizationContext there are a number of methods that can be overridden but the only one of interest here is Post(SendOrPostCallback callback, object state). This is called when an asynchronous message needs dispatching and is what is called in order to resume work after an await. The code for ActorSynchronisationContext below shows how to ensure that this call to Post can get work scheduled with the actor's task scheduler:
This is used by amending the implementation of ActorTaskScheduler so that the context is set for the duration of executing tasks in the task queue:
Now our example above gives the following correct output:
Sadly, providing a custom SynchronizationContext as above won't always work. If we change the example code so that a call to ConfigureAwait(false) is appended to Task.Run then the implementation is broken again:
Using ConfigureAwait was mentioned briefly above. Basically, passing a value of false to ConfigureAwait tells the execution flow through the await to ignore the current synchronization context and use a thread from the thread pool to execute the code after the await. It is considered good practice to use ConfigureAwait(false) in library code unless you know you need the current synchronization context. However, in the actor you do need the synchronization context most of the time, so simply avoiding the extra typing involved in adding ConfigureAwait(false) is the way to avoid the problem shown above. Of course, if you're happy for the resumed execution not to be scheduled on the actor, then go ahead and use ConfigureAwait(false).
We have seen here that with a little knowledge of the internals of the TPL and how the async/await mechanism works, one can implement an actor pattern that integrates with async/await in a fashion that is fairly natural. The implementation is relatively straightforward; although, as noted, it becomes more complex once further factors are taken into consideration. For a much more thorough explanation of execution and synchronization contexts than I provide here, please see this MSDN article. And for a discussion on whether or not to use ConfigureAwait(false), see this this StackOverflow entry and this blog.
The code that goes with this blog can be accessed at Winton's GitHub page in the Winton.Blogs.TPLActor. In addition, Winton has open-sourced the full version of the actor. This is available in the GitHub project Winton.Extensions.Threading.Actor.The full version adds support for starting and stopping an actor as well as scheduling repeated tasks.
Tags: .NET actor-pattern async async/await await C# concurrency dotnet task-parallel-library TPL