TryJoinads (III.): Agent-based programming
Another area where the match!
syntax can be used is when programming with F# agents,
implemented by the MailboxProcessor
type. Formally, agents do not form the monad
structure in a useful way - when programming with agents, we do not compose a new agents,
but instead we write code that (imperatively) receives messages from the agent's mailbox
and handles them.
This article demonstrates an agent { ... }
computation builder that can be used for
implementing the body of an agent. Normally, the body of an agent is an asynchronous
workflow. The code in the body uses let!
to perform asynchronous operations, most
importantly to call inbox.Receive
to get the next message from the inbox. When the
agent intends to handle only certain kinds of messages, it can use inbox.Scan
. When
using the agent
builder, pattern matching on messages can be written using match!
and
it is possible to write code that ignores certain types of messages simply by writing an
incomplete pattern matching.
Note: This blog post is a re-publication of a tutorial from the TryJoinads.org web page. If you read the article there, you can run the examples interactively and experiment with them: view the article on TryJoinads.
Implementing pausable agent
As an example, let's implement a simple agent that handles Print
messages by printing
a received string to the console. In addition, the agent also supports Pause
and
Resume
messages - when it receives Pause
, it stops accepting Print
messages until
it receives Resume
and so all Print
messages will be queued for later processing.
First of all, we need to reference the namespace with agent
builder and declare the type
of messages, which is a simple discriminated union:
open FSharp.Extensions.Joinads type Message = | Print of string | Pause | Resume
The body of the agent consists of two functions. The working
function represents the agent
in a state where it can handle all messages and the paused
function represents the state
when the agent waits for Resume
(and also dismisses Paused
messages, because it is
already paused). Using match!
this can be implemented as follows:
let printer = MailboxProcessor.Start(fun inbox -> /// Agent can receive and handle any message; after processing, /// it continues in 'working' state unless it receives 'Pause' let rec working() = agent { match! inbox with | Print msg -> printfn "%s" msg return! working() | Pause -> return! paused() | Resume -> return! working() } /// Agent is paused and waits for 'Resume' (and then continues in /// 'working' state). If it receives 'Pause', it remains paused and paused() = agent { match! inbox with | Pause -> return! paused() | Resume -> printfn "Resuming..." return! working() } // Start the agent in the working state working() )
The snippet creates an agent printer
and starts it. Both of the functions that implement
the body of the agent use match!
to receive the next message from the inbox. Inside the
agent { ... }
computation, the match!
keyword can be used to match on messages in the
agent's inbox (of type MailboxProcessor<'T>
). Otherwise, the computation builder behaves
just like async { ... }
, so the body can call asynchronous operations using let!
as usual
(in theory, it would be more clean to return async { ... }
blocks from the body of match!
,
but the agent
builder uses overloading to make the syntax nicer).
As already discussed, the working
state accepts all messages - when implementing F# agents
in the usual way, this would be done by calling inbox.Receive()
asynchronously and then
pattern matching on the received message. With the match!
construct, we can do both operations
at the same time.
However, the main benefit of using match!
is apparent when implementing states that only
accept certain messages. In the paused
state, the pattern matching handles only Pause
and Resume
. In response to Resume
, the agent also prints "Resuming..." to make the
code slightly more interesting. The match!
operation implemented by the agent
workflow
automatically leaves all unhandled messages in the inbox, so the pattern matching does not
have to be complete. In a normal implementation (shown below), this would have to be done
using inbox.Scan
.
To test the agent, you can try running the following commands. If you execute them one by
one, you can see that the second Print
message gets handled only after the Resume
message is received:
printer.Post(Print "hello world") printer.Post(Pause) printer.Post(Print "hello again!") printer.Post(Resume)
The agent discussed in this section is fairly simple, but it shows that handling messages
is largely simplified using match!
. The next section compares the previous code with
a version written using inbox.Scan
.
Comparison with standard style
When implementing agents without using match!
, states that do not handle all possible
messages need to be implemented using Scan
. The operation takes a function that
returns option<Async<unit>>
. The result is None
for messages that should be
left in the queue and Some(async { ... })
when a message can be handled.
In the above example, this only applies to the paused
state:
let printer = MailboxProcessor.Start(fun inbox -> (Working state omitted) // Use Scan to handle only Pause or Resume messages and paused() = inbox.Scan(function | Pause -> Some(paused()) | Resume -> Some(async { printfn "Resuming..." return! working() }) | _ -> None) (Start the agent) )
The syntax is definitely less obvious, especially in the Resume
case, where
we need to perform some operation before returning. This requires writing an
asynchronous workflow that is wrapped in Some
(which is a value returned by
a pattern matching inside a function).
Simplifying blocking queue
To give a more complex example of an agent using match!
, we can re-implement the
BlockingQueueAgent
from the MSDN tutorial.
The agent implements an asynchronous blocking queue similar to BlockingCollection
from .NET. It supports two messages - one for adding values to the queue and another
for removing values from the queue. Both of the messages should be sent asynchronously,
because their processing can be blocked. The Add
message cannot be processed when
the queue is full and the Get
message cannot be processed when the queue is empty.
We first reference a namespace that we'll need later and define the message type:
open System.Collections.Generic type BlockingAgentMessage<'T> = | Add of 'T * AsyncReplyChannel<unit> | Get of AsyncReplyChannel<'T>
Both messages also carry AsyncReplyChannel
, because the agent needs to reply to
both of them. The reply to Get
carries the obtained value, while a reply to Add
is just a notification that the value was added to the queue.
The implementation of the queue uses a mutable Queue<'T>
to keep the items. This
is perfectly fine when writing an F# agent, because its body is not executed
concurrently and the blocking queue agent does not need to expose the
entire queue of messages.
The body of the agent consists of three functions that implement three states. The
agent can be empty (waiting for Put
message); full (waiting for the Get
message)
or it is able to handle both messages:
let createQueue maxLength = MailboxProcessor.Start(fun inbox -> // Private queue that stores the values let items = new Queue<_>() // Continue in one of the states, depending on the queue let rec chooseState() = (...) // When the agent is empty, it can only handle 'Add' and emptyQueue() = agent { match! inbox with | Add(value, reply) -> items.Enqueue(value) reply.Reply() return! chooseState() } // When the agent is full, it can only handle 'Get' and fullQueue() = agent { match! inbox with | Get(reply) -> reply.Reply(items.Dequeue()) return! chooseState() } // A state in which the agent can handle both messages and runningQueue() = agent { match! inbox with | Add(value, reply) -> reply.Reply() items.Enqueue(value) return! chooseState() | Get(reply) -> reply.Reply(items.Dequeue()) return! chooseState() } // Start with an empty queue emptyQueue() )
Compared with the original version of the agent in the MSDN tutorial, the code
is more consistent. We don't need to use different programming style for functions
that can handle all messages (runningQueue
) and for functions that only handle
certain messages (emptyQueue
and fullQueue
).
Summary
This article demonstrated how to use match!
construct when implementing agents
using the MailboxProcessor
type in F#. This use of match!
does not follow the
usual formal notion of joinad as explained in the recent publications,
because the agent { ... }
computation does not implement parallel composition
(it is not possible to match on multiple mailboxes) and the monadic bind works
over two different types (normal asynchronous workflows and agent's inbox). However,
from the practical point of view, this use definitely simplifies programming with
agents.
| Print of string
| Pause
| Resume
Full name: TryJoinads.Message
type: Message
val string : 'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
type: string
Full name: TryJoinads.printer
type: MailboxProcessor<Message>
class
interface System.IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member TryReceive : ?timeout:int -> Async<'Msg option>
member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member add_Error : Handler<System.Exception> -> unit
member CurrentQueueLength : int
member DefaultTimeout : int
member Error : IEvent<System.Exception>
member remove_Error : Handler<System.Exception> -> unit
member DefaultTimeout : int with set
static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
end
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
type: MailboxProcessor<'Msg>
type: MailboxProcessor<Message>
Agent can receive and handle any message; after processing,
it continues in 'working' state unless it receives 'Pause'
Full name: FSharp.Extensions.Joinads.TopLevelAgentValues.agent
type: string
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Agent is paused and waits for 'Resume' (and then continues in
'working' state). If it receives 'Pause', it remains paused
Full name: TryJoinads.Comparison.printer
type: MailboxProcessor<Message>
let! msg = inbox.Receive()
match msg with
| Print msg ->
printfn "%s" msg
return! working()
| Pause -> return! paused()
| Resume -> return! working() }
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
working()
| Add of 'T * AsyncReplyChannel<unit>
| Get of AsyncReplyChannel<'T>
Full name: TryJoinads.BlockingAgentMessage<_>
type: BlockingAgentMessage<'T>
with
member Reply : value:'Reply -> unit
end
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Full name: Microsoft.FSharp.Core.unit
type: unit
Full name: TryJoinads.createQueue
type: int
inherits: System.ValueType
type: MailboxProcessor<BlockingAgentMessage<'a>>
type: Queue<'a>
class
new : unit -> System.Collections.Generic.Queue<'T>
new : int -> System.Collections.Generic.Queue<'T>
new : System.Collections.Generic.IEnumerable<'T> -> System.Collections.Generic.Queue<'T>
member Clear : unit -> unit
member Contains : 'T -> bool
member CopyTo : 'T [] * int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : 'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T []
member TrimExcess : unit -> unit
type Enumerator =
struct
member Current : 'T
member Dispose : unit -> unit
member MoveNext : unit -> bool
end
end
Full name: System.Collections.Generic.Queue<_>
type: Queue<'T>
elif items.Count < maxLength then runningQueue()
else fullQueue()
Published: Monday, 20 February 2012, 12:36 PM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: joinads, research, f#, parallel, asynchronous