Reporting events from F# Agents
Over the last year, I wrote quite a lot of articles about agent-based programming in F#. Agents (inspired by Erlang) provide a great abstraction for writing concurrent and scalable systems. They are a great fit for both server-side development (for example, handling a large number of concurrent requests), but also for user interface (for example, keeping state in an application with background tasks and interactive interface).
When writing reusable agents, we usually encapsulate agent in an F# object type. The type provides methods for sending messages to the agent. However, sometimes the agent also needs to report some state change that can be handled by another interested agent. This is done using F# events. However, F# events do not specify threading behaviour, so there is a number of options.
In this article (inspired by a recent email discussion), I describe three ways of reporting events from an agent. The options differ in what thread is used to report the event. Choosing the right option is important as it affects scalability and simplicity of your agent-based code.
Looking at batch processing agent
As an example, I'll use an agent that aggregates incoming messages into batches of specified size. When it receives given number of messages, it reports them (as an array) using an event. This is a simplified version of an MSDN example that I wrote some time ago - the functionality of this example is quite limited, but it is powerful enough to demonstrate different approaches to events:
/// Type alias that gives convenient name to F# agent type type Agent<'T> = MailboxProcessor<'T> /// Agent that implements batch processing type BatchProcessor<'T>(count) = // Event used to report aggregated batches to the user let batchEvent = new Event<'T[]>() let reportBatch batch = // TODO: Here we need to trigger the event printfn "Not implemented!" // Start an agent that implements the batching let agent = Agent<'T>.Start(fun inbox -> async { while true do // Repeatedly allocate a new queue let queue = new ResizeArray<_>() // Add specified number of messages to the queue for i in 1 .. count do let! msg = inbox.Receive() queue.Add(msg) // Report the batch as an array reportBatch (queue.ToArray()) }) /// Event that is triggered when a batch is collected member x.BatchProduced = batchEvent.Publish /// The method adds one object to the agent member x.Post(value) = agent.Post(value)
The above code implements a simple behaviour where the agent always waits
for a specified number of messages (without any timeouts). This can be
easily implemented using for
and while
loops. We use for
to repeatedly
receive message, before sending a batch, and while
to repeat the whole process
while the agent is running.
The above code misses the implementation of the reportBatch
function.
This is the interesting part where we need to trigger the event. Let's look
at the available options...
Reporting batches from the agent
Using the right threading model, when triggering an event, is important for a number of reasons. Code that throws exception or hangs can block or kill the running thread, which is a problem if the thread is in the middle of some important processing. On the other hand, triggering event on a different thread might have an unnecessary overhead.
F# events do not support any built-in mechanism for controlling where an event is triggered. By default, they execute handlers on the same thread on which they are triggered. In most of the cases, this is the right default, but it is easy to implement other behaviour.
Using agent's thread
The easiest option (on the implementation side) is to use the agent's current thread.
This is generally going to be some thread pool thread, although the threads where the
agent's body is running can change (for example, using the Async.SwitchToContext
operation).
The reportBatch
function would look like this:
let reportBatch batch = try batchEvent.Trigger(batch) with e -> printfn "Event handler failed: %A" e
I wrapped the Trigger
call in an event handler - if the event handler throws an exception,
it would terminate the agent. This is difficult to debug, because the agent just stops
responding. However, you can use agent.Error
event to catch unhandled exceptions.
More importantly, the handler could also block forever, in which case the agent would get
stuck.
On the other hand, calling Trigger
directly is the most efficient option. In most of the
cases, this is what you need. A typical event handler should be simple. It may just post a
message to another agent, which cannot block or fail, so sending a message via an additional
thread would be a notable overhead. Assuming you have another agent worker
with a
Post
method, you can just write batcher.BatchProduced.Add(worker.Post)
.
Nevertheless, it might be a good idea to document the threading behaviour and double-check that your event handlers do not throw and that they complete soon. If you need to perform some more complex processing (or run some operation on the user-interface thread), it is always possible to do that on the side of the handler. The techniques for doing that are explained in the next two sections.
Using thread pool
If you want to protect the body of your agent more, you can trigger the event in a thread pool.
Doing that will add some overhead - if this is a problem depends on what application you're writing
and how often the event is triggered. The easiest way to implement this behaviour is to create a
simple asynchronous workflow and start it using Async.Start
:
let reportBatch batch = async { batchEvent.Trigger(batch) } |> Async.Start
On the handler side, you'll know that the handler always runs in a thread pool. However, this still does not mean that the handler can do anything. Code running on a thread pool cannot access user-interface elements and blocking thread in a thread pool is bad, because the thread pool has a limit on maximal number of threads used (if you block all of them, the program will hang).
Using synchronization context
Finally, the third option is to give the user of the agent a way to control where the event
will be triggered. In .NET, this can be achieved using SynchronizationContext
class from
System.Threading
(MSDN). The class represents a context that can execute code - common examples
include thread pool and the main user-interface thread. The latter might be useful if you
always want to update the UI in response to an event.
There are various options for passing the SynchronizationContext
to the agent. If you want
to use a single mechanism during the whole agent lifetime, then you can pass it as an argument
to the constructor. To make the agent more versatile, it can be an optional argument:
type BatchProcessor<'T>(count, ?eventContext:SynchronizationContext) = /// Event used to report aggregated batches to the user let batchEvent = new Event<'T[]>() /// Triggers event using the specified synchronization context /// (or directly, if no synchronization context is specified) let reportBatch batch = match eventContext with | None -> // No synchronization context - trigger as in the first case batchEvent.Trigger(batch) | Some ctx -> // Use the 'Post' method of the context to trigger the event ctx.Post((fun _ -> batchEvent.Trigger(batch)), null) (unchanged agent body) /// Event that is triggered when a batch is collected member x.BatchProduced = batchEvent.Publish /// The method adds one object to the agent member x.Post(value) = agent.Post(value)
Optional parameters in F# are written using the ?eventContext
syntax and are passed as
option
values. This means that the reportBatch
function can use pattern matching to
choose different triggering mechanism. When the synchronization context is Some ctx
, we
use the Post
method to run a specified function on the thread selected by the synchronization
context. The method takes a delegate and an argument that is passed to the delegate - this
is not needed in F#, so I simply used null
as an additional argument and the delegate
ignores it.
If you wanted to use the agent in C#, it is a good idea to replace optional parameters with overloaded constructor. The F# mechanism behaves differently than optional parameters in C# and so it would not be convenient to use from C#.
How can you use the new version of the agent? If you construct the agent on a GUI thread (after the application loads), it is quite easy to capture the synchronization context representing the main thread and create the agent:
// Agent that will trigger events on the current (GUI) thread let sync = SynchronizationContext.Current let proc = BatchProcessor<_>(10, sync) // Start some background work that will report batches to GUI thread async { for i in 0 .. 1000 do proc.Post(i) } |> Async.Start
After creating the agent, the snippet starts a simple asynchronous workflow that generates
some values and sends them to the agent. The workflow is started using Async.Start
and
will run asynchronously on the background thread. However, all the events will be
reported on the thread where the agent was created - if that was the GUI thread, the
handler can safely access user interface elements.
Summary
This article gives a brief overview of different options for reporting events from an F# agent. I did not spent a long time introducing agents, because this has already been done elsewhere. My two articles for developerFusion provide a good introduction and a MSDN chapter has more details.
When reporting an event, the default option is to just call the Trigger
method. This is the
most efficient option and it is appropriate in most of the situations. However, if you want to
separate agent's body from caller's code, you can trigger the event in a thread pool. Finally,
if you want to provide finer control, you can parameterize the agent by SynchronizationContext
.
Then the user can, for example, automatically marshall all events to the user interface.
References & Source code
- Reporting Events from Agents - source code for this blog post (F# Snippet)
- The
SynchronizationContext
class (MSDN) - Server-Side Functional Programming - Describes how to write an F# agent and more (MSDN)
- An Introduction to F# Agents - Brief summary introducing F# agents (developerFusion)
Writing Concurrent Applications Using F# Agents - A higher-level perspective (developerFusion)
Full name: Blog.Template.Agent<_>
type: Agent<'T>
implements: System.IDisposable
Type alias that gives convenient name to F# agent type
class
interface System.IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member TryReceive : ?timeout:int -> Async<'Msg option>
member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member add_Error : Handler<System.Exception> -> unit
member CurrentQueueLength : int
member DefaultTimeout : int
member Error : IEvent<System.Exception>
member remove_Error : Handler<System.Exception> -> unit
member DefaultTimeout : int with set
static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
end
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
type: MailboxProcessor<'Msg>
implements: System.IDisposable
class
new : count:int -> BatchProcessor<'T>
member Post : value:'T -> unit
member BatchProduced : IEvent<'T []>
end
Full name: Blog.Template.BatchProcessor<_>
Agent that implements batch processing
type: int
implements: System.IComparable
implements: System.IFormattable
implements: System.IConvertible
implements: System.IComparable<int>
implements: System.IEquatable<int>
inherits: System.ValueType
module Event
from Microsoft.FSharp.Control
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> System.Delegate)> =
class
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
end
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
type Event<'T> =
class
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
end
Full name: Microsoft.FSharp.Control.Event<_>
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
type: MailboxProcessor<'T>
implements: System.IDisposable
type: MailboxProcessor<'T>
implements: System.IDisposable
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
type: ResizeArray<'T>
implements: System.Collections.Generic.IList<'T>
implements: System.Collections.Generic.ICollection<'T>
implements: seq<'T>
implements: System.Collections.IList
implements: System.Collections.ICollection
implements: System.Collections.IEnumerable
Full name: Microsoft.FSharp.Collections.ResizeArray<_>
type: ResizeArray<'T>
implements: System.Collections.Generic.IList<'T>
implements: System.Collections.Generic.ICollection<'T>
implements: seq<'T>
implements: System.Collections.IList
implements: System.Collections.ICollection
implements: System.Collections.IEnumerable
type: int
implements: System.IComparable
implements: System.IFormattable
implements: System.IConvertible
implements: System.IComparable<int>
implements: System.IEquatable<int>
inherits: System.ValueType
Full name: Blog.Template.BatchProcessor`1.BatchProduced
Event that is triggered when a batch is collected
Full name: Blog.Template.BatchProcessor`1.Post
The method adds one object to the agent
Full name: Blog.Agent<_>
type: Agent<'T>
implements: System.IDisposable
Type alias that gives convenient name to F# agent type
class
interface System.IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member TryReceive : ?timeout:int -> Async<'Msg option>
member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member add_Error : Handler<System.Exception> -> unit
member CurrentQueueLength : int
member DefaultTimeout : int
member Error : IEvent<System.Exception>
member remove_Error : Handler<System.Exception> -> unit
member DefaultTimeout : int with set
static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
end
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
type: MailboxProcessor<'Msg>
implements: System.IDisposable
Full name: Blog.batchEvent
Full name: Blog.AgentThread.reportBatch
type: 'T []
implements: System.ICloneable
implements: System.Collections.IList
implements: System.Collections.ICollection
implements: System.Collections.IStructuralComparable
implements: System.Collections.IStructuralEquatable
implements: System.Collections.Generic.IList<'T>
implements: System.Collections.Generic.ICollection<'T>
implements: seq<'T>
implements: System.Collections.IEnumerable
inherits: System.Array
val e : exn
type: exn
implements: System.Runtime.Serialization.ISerializable
implements: System.Runtime.InteropServices._Exception
--------------------
val e : exn
type: exn
implements: System.Runtime.Serialization.ISerializable
implements: System.Runtime.InteropServices._Exception
type: exn
implements: System.Runtime.Serialization.ISerializable
implements: System.Runtime.InteropServices._Exception
Full name: Blog.ThreadPool.reportBatch
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
--------------------
type Async
with
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * System.AsyncCallback * obj -> System.IAsyncResult) * (System.IAsyncResult -> 'T) * (System.IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> System.Delegate)
static member AwaitIAsyncResult : iar:System.IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Tasks.Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (System.OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<System.IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions * ?cancellationToken:CancellationToken -> Tasks.Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions -> Async<Tasks.Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(System.OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(System.OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
end
Full name: Microsoft.FSharp.Control.Async
class
new : count:int * ?eventContext:SynchronizationContext -> BatchProcessor<'T>
member Post : value:'T -> unit
member BatchProduced : IEvent<'T []>
end
Full name: Blog.BatchProcessor<_>
type: SynchronizationContext option
implements: System.Collections.IStructuralEquatable
implements: System.IComparable<Option<SynchronizationContext>>
implements: System.IComparable
implements: System.Collections.IStructuralComparable
class
new : unit -> System.Threading.SynchronizationContext
member CreateCopy : unit -> System.Threading.SynchronizationContext
member IsWaitNotificationRequired : unit -> bool
member OperationCompleted : unit -> unit
member OperationStarted : unit -> unit
member Post : System.Threading.SendOrPostCallback * obj -> unit
member Send : System.Threading.SendOrPostCallback * obj -> unit
member Wait : System.IntPtr [] * bool * int -> int
static member Current : System.Threading.SynchronizationContext
static member SetSynchronizationContext : System.Threading.SynchronizationContext -> unit
end
Full name: System.Threading.SynchronizationContext
Event used to report aggregated batches to the user
Triggers event using the specified synchronization context
(or directly, if no synchronization context is specified)
let agent = Agent<'T>.Start(fun inbox -> async {
while true do
// Repeatedly allocate a new queue
let queue = new ResizeArray<_>()
// Add specified number of messages to the queue
for i in 1 .. count do
let! msg = inbox.Receive()
queue.Add(msg)
// Report the batch as an array
reportBatch (queue.ToArray()) })
Full name: Blog.BatchProcessor`1.BatchProduced
Event that is triggered when a batch is collected
Full name: Blog.BatchProcessor`1.Post
The method adds one object to the agent
Full name: Blog.sync
Full name: Blog.proc
The method adds one object to the agent
Published: Saturday, 16 June 2012, 12:23 AM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: f#, asynchronous