TP

Reporting events from F# Agents

Over the last year, I wrote quite a lot of articles about agent-based programming in F#. Agents (inspired by Erlang) provide a great abstraction for writing concurrent and scalable systems. They are a great fit for both server-side development (for example, handling a large number of concurrent requests), but also for user interface (for example, keeping state in an application with background tasks and interactive interface).

When writing reusable agents, we usually encapsulate agent in an F# object type. The type provides methods for sending messages to the agent. However, sometimes the agent also needs to report some state change that can be handled by another interested agent. This is done using F# events. However, F# events do not specify threading behaviour, so there is a number of options.

In this article (inspired by a recent email discussion), I describe three ways of reporting events from an agent. The options differ in what thread is used to report the event. Choosing the right option is important as it affects scalability and simplicity of your agent-based code.

Looking at batch processing agent

As an example, I'll use an agent that aggregates incoming messages into batches of specified size. When it receives given number of messages, it reports them (as an array) using an event. This is a simplified version of an MSDN example that I wrote some time ago - the functionality of this example is quite limited, but it is powerful enough to demonstrate different approaches to events:

/// Type alias that gives convenient name to F# agent type
type Agent<'T> = MailboxProcessor<'T>

/// Agent that implements batch processing
type BatchProcessor<'T>(count) =
  // Event used to report aggregated batches to the user
  let batchEvent = new Event<'T[]>()
  let reportBatch batch =
    // TODO: Here we need to trigger the event
    printfn "Not implemented!"

  // Start an agent that implements the batching
  let agent = Agent<'T>.Start(fun inbox -> async {
    while true do
      // Repeatedly allocate a new queue 
      let queue = new ResizeArray<_>()
      // Add specified number of messages to the queue
      for i in 1 .. count do
        let! msg = inbox.Receive()
        queue.Add(msg)
      // Report the batch as an array
      reportBatch (queue.ToArray()) })

  /// Event that is triggered when a batch is collected
  member x.BatchProduced = batchEvent.Publish
  /// The method adds one object to the agent
  member x.Post(value) = agent.Post(value)

The above code implements a simple behaviour where the agent always waits for a specified number of messages (without any timeouts). This can be easily implemented using for and while loops. We use for to repeatedly receive message, before sending a batch, and while to repeat the whole process while the agent is running.

The above code misses the implementation of the reportBatch function. This is the interesting part where we need to trigger the event. Let's look at the available options...

Reporting batches from the agent

Using the right threading model, when triggering an event, is important for a number of reasons. Code that throws exception or hangs can block or kill the running thread, which is a problem if the thread is in the middle of some important processing. On the other hand, triggering event on a different thread might have an unnecessary overhead.

F# events do not support any built-in mechanism for controlling where an event is triggered. By default, they execute handlers on the same thread on which they are triggered. In most of the cases, this is the right default, but it is easy to implement other behaviour.

Using agent's thread

The easiest option (on the implementation side) is to use the agent's current thread. This is generally going to be some thread pool thread, although the threads where the agent's body is running can change (for example, using the Async.SwitchToContext operation). The reportBatch function would look like this:

let reportBatch batch =
  try
    batchEvent.Trigger(batch)
  with e ->
    printfn "Event handler failed: %A" e

I wrapped the Trigger call in an event handler - if the event handler throws an exception, it would terminate the agent. This is difficult to debug, because the agent just stops responding. However, you can use agent.Error event to catch unhandled exceptions. More importantly, the handler could also block forever, in which case the agent would get stuck.

On the other hand, calling Trigger directly is the most efficient option. In most of the cases, this is what you need. A typical event handler should be simple. It may just post a message to another agent, which cannot block or fail, so sending a message via an additional thread would be a notable overhead. Assuming you have another agent worker with a Post method, you can just write batcher.BatchProduced.Add(worker.Post).

Nevertheless, it might be a good idea to document the threading behaviour and double-check that your event handlers do not throw and that they complete soon. If you need to perform some more complex processing (or run some operation on the user-interface thread), it is always possible to do that on the side of the handler. The techniques for doing that are explained in the next two sections.

Using thread pool

If you want to protect the body of your agent more, you can trigger the event in a thread pool. Doing that will add some overhead - if this is a problem depends on what application you're writing and how often the event is triggered. The easiest way to implement this behaviour is to create a simple asynchronous workflow and start it using Async.Start:

let reportBatch batch =
  async { batchEvent.Trigger(batch) } 
  |> Async.Start

On the handler side, you'll know that the handler always runs in a thread pool. However, this still does not mean that the handler can do anything. Code running on a thread pool cannot access user-interface elements and blocking thread in a thread pool is bad, because the thread pool has a limit on maximal number of threads used (if you block all of them, the program will hang).

Using synchronization context

Finally, the third option is to give the user of the agent a way to control where the event will be triggered. In .NET, this can be achieved using SynchronizationContext class from System.Threading (MSDN). The class represents a context that can execute code - common examples include thread pool and the main user-interface thread. The latter might be useful if you always want to update the UI in response to an event.

There are various options for passing the SynchronizationContext to the agent. If you want to use a single mechanism during the whole agent lifetime, then you can pass it as an argument to the constructor. To make the agent more versatile, it can be an optional argument:

type BatchProcessor<'T>(count, ?eventContext:SynchronizationContext) =
  /// Event used to report aggregated batches to the user
  let batchEvent = new Event<'T[]>()

  /// Triggers event using the specified synchronization context
  /// (or directly, if no synchronization context is specified)
  let reportBatch batch =
    match eventContext with 
    | None -> 
        // No synchronization context - trigger as in the first case
        batchEvent.Trigger(batch)
    | Some ctx ->
        // Use the 'Post' method of the context to trigger the event
        ctx.Post((fun _ -> batchEvent.Trigger(batch)), null)

  (unchanged agent body)

  /// Event that is triggered when a batch is collected
  member x.BatchProduced = batchEvent.Publish
  /// The method adds one object to the agent
  member x.Post(value) = agent.Post(value)

Optional parameters in F# are written using the ?eventContext syntax and are passed as option values. This means that the reportBatch function can use pattern matching to choose different triggering mechanism. When the synchronization context is Some ctx, we use the Post method to run a specified function on the thread selected by the synchronization context. The method takes a delegate and an argument that is passed to the delegate - this is not needed in F#, so I simply used null as an additional argument and the delegate ignores it.

If you wanted to use the agent in C#, it is a good idea to replace optional parameters with overloaded constructor. The F# mechanism behaves differently than optional parameters in C# and so it would not be convenient to use from C#.

How can you use the new version of the agent? If you construct the agent on a GUI thread (after the application loads), it is quite easy to capture the synchronization context representing the main thread and create the agent:

// Agent that will trigger events on the current (GUI) thread
let sync = SynchronizationContext.Current
let proc = BatchProcessor<_>(10, sync)

// Start some background work that will report batches to GUI thread
async {
  for i in 0 .. 1000 do 
    proc.Post(i) } |> Async.Start

After creating the agent, the snippet starts a simple asynchronous workflow that generates some values and sends them to the agent. The workflow is started using Async.Start and will run asynchronously on the background thread. However, all the events will be reported on the thread where the agent was created - if that was the GUI thread, the handler can safely access user interface elements.

Summary

This article gives a brief overview of different options for reporting events from an F# agent. I did not spent a long time introducing agents, because this has already been done elsewhere. My two articles for developerFusion provide a good introduction and a MSDN chapter has more details.

When reporting an event, the default option is to just call the Trigger method. This is the most efficient option and it is appropriate in most of the situations. However, if you want to separate agent's body from caller's code, you can trigger the event in a thread pool. Finally, if you want to provide finer control, you can parameterize the agent by SynchronizationContext. Then the user can, for example, automatically marshall all events to the user interface.

References & Source code

type Agent<'T> = MailboxProcessor<'T>

Full name: Blog.Template.Agent<_>

  type: Agent<'T>
  implements: System.IDisposable



 Type alias that gives convenient name to F# agent type
type MailboxProcessor<'Msg> =
  class
    interface System.IDisposable
    new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
    member Post : message:'Msg -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
    member Receive : ?timeout:int -> Async<'Msg>
    member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
    member Start : unit -> unit
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
    member TryReceive : ?timeout:int -> Async<'Msg option>
    member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
    member add_Error : Handler<System.Exception> -> unit
    member CurrentQueueLength : int
    member DefaultTimeout : int
    member Error : IEvent<System.Exception>
    member remove_Error : Handler<System.Exception> -> unit
    member DefaultTimeout : int with set
    static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
  end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

  type: MailboxProcessor<'Msg>
  implements: System.IDisposable
type BatchProcessor<'T> =
  class
    new : count:int -> BatchProcessor<'T>
    member Post : value:'T -> unit
    member BatchProduced : IEvent<'T []>
  end

Full name: Blog.Template.BatchProcessor<_>


 Agent that implements batch processing
val count : int

  type: int
  implements: System.IComparable
  implements: System.IFormattable
  implements: System.IConvertible
  implements: System.IComparable<int>
  implements: System.IEquatable<int>
  inherits: System.ValueType
val batchEvent : Event<'T []>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> System.Delegate)> =
  class
    new : unit -> Event<'Delegate,'Args>
    member Trigger : sender:obj * args:'Args -> unit
    member Publish : IEvent<'Delegate,'Args>
  end

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
type Event<'T> =
  class
    new : unit -> Event<'T>
    member Trigger : arg:'T -> unit
    member Publish : IEvent<'T>
  end

Full name: Microsoft.FSharp.Control.Event<_>
val reportBatch : ('a -> unit)
val batch : 'a
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val agent : MailboxProcessor<'T>

  type: MailboxProcessor<'T>
  implements: System.IDisposable
val inbox : MailboxProcessor<'T>

  type: MailboxProcessor<'T>
  implements: System.IDisposable
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val queue : ResizeArray<'T>

  type: ResizeArray<'T>
  implements: System.Collections.Generic.IList<'T>
  implements: System.Collections.Generic.ICollection<'T>
  implements: seq<'T>
  implements: System.Collections.IList
  implements: System.Collections.ICollection
  implements: System.Collections.IEnumerable
type ResizeArray<'T> = System.Collections.Generic.List<'T>

Full name: Microsoft.FSharp.Collections.ResizeArray<_>

  type: ResizeArray<'T>
  implements: System.Collections.Generic.IList<'T>
  implements: System.Collections.Generic.ICollection<'T>
  implements: seq<'T>
  implements: System.Collections.IList
  implements: System.Collections.ICollection
  implements: System.Collections.IEnumerable
val i : int

  type: int
  implements: System.IComparable
  implements: System.IFormattable
  implements: System.IConvertible
  implements: System.IComparable<int>
  implements: System.IEquatable<int>
  inherits: System.ValueType
val msg : 'T
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
System.Collections.Generic.List.Add(item: 'T) : unit
System.Collections.Generic.List.ToArray() : 'T []
val x : BatchProcessor<'T>
member BatchProcessor.BatchProduced : IEvent<'T []>

Full name: Blog.Template.BatchProcessor`1.BatchProduced


 Event that is triggered when a batch is collected
property Event.Publish: IEvent<'T []>
member BatchProcessor.Post : value:'T -> unit

Full name: Blog.Template.BatchProcessor`1.Post


 The method adds one object to the agent
val value : 'T
member MailboxProcessor.Post : message:'Msg -> unit
namespace System
namespace System.Threading
type Agent<'T> = MailboxProcessor<'T>

Full name: Blog.Agent<_>

  type: Agent<'T>
  implements: System.IDisposable



 Type alias that gives convenient name to F# agent type
type MailboxProcessor<'Msg> =
  class
    interface System.IDisposable
    new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
    member Post : message:'Msg -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
    member Receive : ?timeout:int -> Async<'Msg>
    member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
    member Start : unit -> unit
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
    member TryReceive : ?timeout:int -> Async<'Msg option>
    member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
    member add_Error : Handler<System.Exception> -> unit
    member CurrentQueueLength : int
    member DefaultTimeout : int
    member Error : IEvent<System.Exception>
    member remove_Error : Handler<System.Exception> -> unit
    member DefaultTimeout : int with set
    static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

  type: MailboxProcessor<'Msg>
  implements: System.IDisposable
val batchEvent : Event<'T []>

Full name: Blog.batchEvent
val reportBatch : 'T [] -> unit

Full name: Blog.AgentThread.reportBatch
val batch : 'T []

  type: 'T []
  implements: System.ICloneable
  implements: System.Collections.IList
  implements: System.Collections.ICollection
  implements: System.Collections.IStructuralComparable
  implements: System.Collections.IStructuralEquatable
  implements: System.Collections.Generic.IList<'T>
  implements: System.Collections.Generic.ICollection<'T>
  implements: seq<'T>
  implements: System.Collections.IEnumerable
  inherits: System.Array
member Event.Trigger : arg:'T -> unit
Multiple items
val e : exn

  type: exn
  implements: System.Runtime.Serialization.ISerializable
  implements: System.Runtime.InteropServices._Exception


--------------------
val e : exn

  type: exn
  implements: System.Runtime.Serialization.ISerializable
  implements: System.Runtime.InteropServices._Exception
val e : exn

  type: exn
  implements: System.Runtime.Serialization.ISerializable
  implements: System.Runtime.InteropServices._Exception
val reportBatch : 'T [] -> unit

Full name: Blog.ThreadPool.reportBatch
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------
type Async
with
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * System.AsyncCallback * obj -> System.IAsyncResult) * (System.IAsyncResult -> 'T) * (System.IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> System.Delegate)
  static member AwaitIAsyncResult : iar:System.IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Tasks.Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member FromBeginEnd : beginAction:(System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (System.OperationCanceledException -> unit) -> unit) -> Async<'T>
  static member Ignore : computation:Async<'T> -> Async<unit>
  static member OnCancel : interruption:(unit -> unit) -> Async<System.IDisposable>
  static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
  static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
  static member Sleep : millisecondsDueTime:int -> Async<unit>
  static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions * ?cancellationToken:CancellationToken -> Tasks.Task<'T>
  static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
  static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions -> Async<Tasks.Task<'T>>
  static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(System.OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
  static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
  static member SwitchToNewThread : unit -> Async<unit>
  static member SwitchToThreadPool : unit -> Async<unit>
  static member TryCancelled : computation:Async<'T> * compensation:(System.OperationCanceledException -> unit) -> Async<'T>
  static member CancellationToken : Async<CancellationToken>
  static member DefaultCancellationToken : CancellationToken
end

Full name: Microsoft.FSharp.Control.Async
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
type BatchProcessor<'T> =
  class
    new : count:int * ?eventContext:SynchronizationContext -> BatchProcessor<'T>
    member Post : value:'T -> unit
    member BatchProduced : IEvent<'T []>
  end

Full name: Blog.BatchProcessor<_>
val eventContext : SynchronizationContext option

  type: SynchronizationContext option
  implements: System.Collections.IStructuralEquatable
  implements: System.IComparable<Option<SynchronizationContext>>
  implements: System.IComparable
  implements: System.Collections.IStructuralComparable
type SynchronizationContext =
  class
    new : unit -> System.Threading.SynchronizationContext
    member CreateCopy : unit -> System.Threading.SynchronizationContext
    member IsWaitNotificationRequired : unit -> bool
    member OperationCompleted : unit -> unit
    member OperationStarted : unit -> unit
    member Post : System.Threading.SendOrPostCallback * obj -> unit
    member Send : System.Threading.SendOrPostCallback * obj -> unit
    member Wait : System.IntPtr [] * bool * int -> int
    static member Current : System.Threading.SynchronizationContext
    static member SetSynchronizationContext : System.Threading.SynchronizationContext -> unit
  end

Full name: System.Threading.SynchronizationContext
val batchEvent : Event<'T []>


 Event used to report aggregated batches to the user
val reportBatch : ('T [] -> unit)


 Triggers event using the specified synchronization context
 (or directly, if no synchronization context is specified)
union case Option.None: Option<'T>
union case Option.Some: 'T -> Option<'T>
val ctx : SynchronizationContext
SynchronizationContext.Post(d: SendOrPostCallback, state: obj) : unit
// Start an agent that implements the batching
  let agent = Agent<'T>.Start(fun inbox -> async {
    while true do
      // Repeatedly allocate a new queue
      let queue = new ResizeArray<_>()
      // Add specified number of messages to the queue
      for i in 1 .. count do
        let! msg = inbox.Receive()
        queue.Add(msg)
      // Report the batch as an array
      reportBatch (queue.ToArray()) })
member BatchProcessor.BatchProduced : IEvent<'T []>

Full name: Blog.BatchProcessor`1.BatchProduced


 Event that is triggered when a batch is collected
member BatchProcessor.Post : value:'T -> unit

Full name: Blog.BatchProcessor`1.Post


 The method adds one object to the agent
val sync : SynchronizationContext

Full name: Blog.sync
property SynchronizationContext.Current: SynchronizationContext
val proc : BatchProcessor<int>

Full name: Blog.proc
member BatchProcessor.Post : value:'T -> unit


 The method adds one object to the agent

Published: Saturday, 16 June 2012, 12:23 AM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: f#, asynchronous