TP

# TryJoinads (III.): Agent-based programming

Another area where the match! syntax can be used is when programming with F# agents, implemented by the MailboxProcessor type. Formally, agents do not form the monad structure in a useful way - when programming with agents, we do not compose a new agents, but instead we write code that (imperatively) receives messages from the agent's mailbox and handles them.

This article demonstrates an agent { ... } computation builder that can be used for implementing the body of an agent. Normally, the body of an agent is an asynchronous workflow. The code in the body uses let! to perform asynchronous operations, most importantly to call inbox.Receive to get the next message from the inbox. When the agent intends to handle only certain kinds of messages, it can use inbox.Scan. When using the agent builder, pattern matching on messages can be written using match! and it is possible to write code that ignores certain types of messages simply by writing an incomplete pattern matching.

Note: This blog post is a re-publication of a tutorial from the TryJoinads.org web page. If you read the article there, you can run the examples interactively and experiment with them: view the article on TryJoinads.

## Implementing pausable agent

As an example, let's implement a simple agent that handles Print messages by printing a received string to the console. In addition, the agent also supports Pause and Resume messages - when it receives Pause, it stops accepting Print messages until it receives Resume and so all Print messages will be queued for later processing.

First of all, we need to reference the namespace with agent builder and declare the type of messages, which is a simple discriminated union:

open FSharp.Extensions.Joinads

type Message =
| Print of string
| Pause
| Resume

The body of the agent consists of two functions. The working function represents the agent in a state where it can handle all messages and the paused function represents the state when the agent waits for Resume (and also dismisses Paused messages, because it is already paused). Using match! this can be implemented as follows:

let printer = MailboxProcessor.Start(fun inbox ->

/// Agent can receive and handle any message; after processing,
/// it continues in 'working' state unless it receives 'Pause'
let rec working() = agent {
match! inbox with
| Print msg ->
printfn "%s" msg
return! working()
| Pause -> return! paused()
| Resume -> return! working() }

/// Agent is paused and waits for 'Resume' (and then continues in
/// 'working' state). If it receives 'Pause', it remains paused
and paused() = agent {
match! inbox with
| Pause -> return! paused()
| Resume ->
printfn "Resuming..."
return! working() }

// Start the agent in the working state
working() )

The snippet creates an agent printer and starts it. Both of the functions that implement the body of the agent use match! to receive the next message from the inbox. Inside the agent { ... } computation, the match! keyword can be used to match on messages in the agent's inbox (of type MailboxProcessor<'T>). Otherwise, the computation builder behaves just like async { ... }, so the body can call asynchronous operations using let! as usual (in theory, it would be more clean to return async { ... } blocks from the body of match!, but the agent builder uses overloading to make the syntax nicer).

As already discussed, the working state accepts all messages - when implementing F# agents in the usual way, this would be done by calling inbox.Receive() asynchronously and then pattern matching on the received message. With the match! construct, we can do both operations at the same time.

However, the main benefit of using match! is apparent when implementing states that only accept certain messages. In the paused state, the pattern matching handles only Pause and Resume. In response to Resume, the agent also prints "Resuming..." to make the code slightly more interesting. The match! operation implemented by the agent workflow automatically leaves all unhandled messages in the inbox, so the pattern matching does not have to be complete. In a normal implementation (shown below), this would have to be done using inbox.Scan.

To test the agent, you can try running the following commands. If you execute them one by one, you can see that the second Print message gets handled only after the Resume message is received:

printer.Post(Print "hello world")
printer.Post(Pause)
printer.Post(Print "hello again!")
printer.Post(Resume)

The agent discussed in this section is fairly simple, but it shows that handling messages is largely simplified using match!. The next section compares the previous code with a version written using inbox.Scan.

### Comparison with standard style

When implementing agents without using match!, states that do not handle all possible messages need to be implemented using Scan. The operation takes a function that returns option<Async<unit>>. The result is None for messages that should be left in the queue and Some(async { ... }) when a message can be handled.

In the above example, this only applies to the paused state:

let printer = MailboxProcessor.Start(fun inbox ->

(Working state omitted)

// Use Scan to handle only Pause or Resume messages
and paused() = inbox.Scan(function
| Pause -> Some(paused())
| Resume -> Some(async {
printfn "Resuming..."
return! working() })
| _ -> None)

(Start the agent) )

The syntax is definitely less obvious, especially in the Resume case, where we need to perform some operation before returning. This requires writing an asynchronous workflow that is wrapped in Some (which is a value returned by a pattern matching inside a function).

## Simplifying blocking queue

To give a more complex example of an agent using match!, we can re-implement the BlockingQueueAgent from the MSDN tutorial. The agent implements an asynchronous blocking queue similar to BlockingCollection from .NET. It supports two messages - one for adding values to the queue and another for removing values from the queue. Both of the messages should be sent asynchronously, because their processing can be blocked. The Add message cannot be processed when the queue is full and the Get message cannot be processed when the queue is empty.

We first reference a namespace that we'll need later and define the message type:

open System.Collections.Generic

type BlockingAgentMessage<'T> =
| Get of AsyncReplyChannel<'T>

Both messages also carry AsyncReplyChannel, because the agent needs to reply to both of them. The reply to Get carries the obtained value, while a reply to Add is just a notification that the value was added to the queue.

The implementation of the queue uses a mutable Queue<'T> to keep the items. This is perfectly fine when writing an F# agent, because its body is not executed concurrently and the blocking queue agent does not need to expose the entire queue of messages.

The body of the agent consists of three functions that implement three states. The agent can be empty (waiting for Put message); full (waiting for the Get message) or it is able to handle both messages:

let createQueue maxLength = MailboxProcessor.Start(fun inbox ->
// Private queue that stores the values
let items = new Queue<_>()
// Continue in one of the states, depending on the queue
let rec chooseState() = (...)

// When the agent is empty, it can only handle 'Add'
and emptyQueue() = agent {
match! inbox with
items.Enqueue(value)
return! chooseState() }
// When the agent is full, it can only handle 'Get'
and fullQueue() = agent {
match! inbox with
return! chooseState() }
// A state in which the agent can handle both messages
and runningQueue() = agent {
match! inbox with
items.Enqueue(value)
return! chooseState()
return! chooseState() }

emptyQueue() )

Compared with the original version of the agent in the MSDN tutorial, the code is more consistent. We don't need to use different programming style for functions that can handle all messages (runningQueue) and for functions that only handle certain messages (emptyQueue and fullQueue).

## Summary

This article demonstrated how to use match! construct when implementing agents using the MailboxProcessor type in F#. This use of match! does not follow the usual formal notion of joinad as explained in the recent publications, because the agent { ... } computation does not implement parallel composition (it is not possible to match on multiple mailboxes) and the monadic bind works over two different types (normal asynchronous workflows and agent's inbox). However, from the practical point of view, this use definitely simplifies programming with agents.

namespace FSharp
namespace FSharp.Extensions
type Message =
| Print of string
| Pause
| Resume

type: Message
union case Message.Print: string -> Message
Multiple items
val string : 'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
type: string
union case Message.Pause: Message
union case Message.Resume: Message
val printer : MailboxProcessor<Message>

type: MailboxProcessor<Message>
type MailboxProcessor<'Msg> =
class
interface System.IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryReceive : ?timeout:int -> Async<'Msg option>
member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member add_Error : Handler<System.Exception> -> unit
member CurrentQueueLength : int
member DefaultTimeout : int
member Error : IEvent<System.Exception>
member remove_Error : Handler<System.Exception> -> unit
member DefaultTimeout : int with set
static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
type: MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message>
type: MailboxProcessor<Message>
val working : (unit -> Async<'a>)

Agent can receive and handle any message; after processing,
it continues in 'working' state unless it receives 'Pause'
val agent : AgentBuilder

val msg : string
type: string
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val paused : (unit -> Async<'a>)

Agent is paused and waits for 'Resume' (and then continues in
'working' state). If it receives 'Pause', it remains paused
member MailboxProcessor.Post : message:'Msg -> unit
val printer : MailboxProcessor<Message>

type: MailboxProcessor<Message>
let rec working() = agent {
let! msg = inbox.Receive()
match msg with
| Print msg ->
printfn "%s" msg
return! working()
| Pause -> return! paused()
| Resume -> return! working() }
val paused : (unit -> Async<'a>)
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
union case Option.Some: 'T -> Option<'T>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val working : (unit -> Async<'a>)
union case Option.None: Option<'T>
// Start the agent in the working state
working()
namespace System
namespace System.Collections
namespace System.Collections.Generic
type BlockingAgentMessage<'T> =
| Get of AsyncReplyChannel<'T>

type: BlockingAgentMessage<'T>
union case BlockingAgentMessage.Add: 'T * AsyncReplyChannel<unit> -> BlockingAgentMessage<'T>
with
end

type unit = Unit

Full name: Microsoft.FSharp.Core.unit
type: unit
union case BlockingAgentMessage.Get: AsyncReplyChannel<'T> -> BlockingAgentMessage<'T>
val createQueue : int -> MailboxProcessor<BlockingAgentMessage<'a>>

val maxLength : int
type: int
inherits: System.ValueType
val inbox : MailboxProcessor<BlockingAgentMessage<'a>>
type: MailboxProcessor<BlockingAgentMessage<'a>>
val items : Queue<'a>
type: Queue<'a>
type Queue<'T> =
class
new : unit -> System.Collections.Generic.Queue<'T>
new : int -> System.Collections.Generic.Queue<'T>
new : System.Collections.Generic.IEnumerable<'T> -> System.Collections.Generic.Queue<'T>
member Clear : unit -> unit
member Contains : 'T -> bool
member CopyTo : 'T [] * int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : 'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T []
member TrimExcess : unit -> unit
type Enumerator =
struct
member Current : 'T
member Dispose : unit -> unit
member MoveNext : unit -> bool
end
end

Full name: System.Collections.Generic.Queue<_>
type: Queue<'T>
val chooseState : (unit -> Async<'b>)
if items.Count = 0 then emptyQueue()
elif items.Count < maxLength then runningQueue()
else fullQueue()
val emptyQueue : (unit -> Async<'b>)
val value : 'a