F# Parallel Extras (II.): Agent-based blocking queue
In the previous article, we
briefly introduced the BlockingQueueAgent<T>
type and we used it to implement the
pipeline pattern (from Chapter 7 of Parallel Programming with Microsoft .NET)
using asynchronous workflows. The type was used to represent intermediate buffers with a limited size.
In this article we'll take a look at the implementation of the type. The type implements a very useful
pattern in agent-based parallel programming, so you can use it in your work, but it could be also
interesting as a demonstration of the F# Agent<T>
type (an recommended alias
for the MailboxProcessor<T>
type).
The BlockingQueueAgent<T>
type is similar to BlockingCollection<T>
from .NET 4.0. It has methods for adding and removing elements that block when the operation cannot be
done (e.g. adding when the queue is full or removing when the queue is empty). The most important
difference is that it can be used asynchronously. This means that when we call its operations
form F# asynchronous workflow (using let!
and do!
), the operation will block
the calling workflow, but it will not block any physical thread. We start by looking at the overall
structure of the agent and then look at the body of the agent which implements its behavior (using
a state machine)...
Structure of the blocking agent
An agent is simply an object that can receive messages of some type. The best way to represent messages is to use an F# discriminated union type which lists all the possible commands that the agent can react to. To make the use of agent more convenient, it is quite common to wrap the agent in a type declaration, so that the messages can be send just by calling a member.
Defining the message type
Let's start by defining a type alias Agent<T>
and defining a message type for
our agent. As you can see, the agent is surprisingly simple and it needs only two messages:
1: /// A convenience type alias for 'MailboxProcessor<T>' type 2: type Agent<'T> = MailboxProcessor<'T> 3: 4: /// Type of messages internally used by 'BlockingQueueAgent<T>' 5: type internal BlockingAgentMessage<'T> = 6: | Add of 'T * AsyncReplyChannel<unit> 7: | Get of AsyncReplyChannel<'T>F# Web Snippets
The first message is used to add a new element to the blocking queue. It carries a value to be added and a reply channel. The channel is used to notify the caller that the element was added (and so the caller can continue producing more elements). The second message is used to read elements from the queue is using asynchronous reply channel to send the obtained value to the caller. Now that we have the message type, we can look at the structure of the wrapping type...
Wrapping agent in a type
The BlockingQueueAgent<T>
type takes a maximal number of elements that can be stored
in the queue as a constructor parameter. It has a single property Count
, which returns the
current number of elements in the queue and it has two methods. The AsyncAdd
method
sends the Add
message to the agent (and asynchronously waits for the reply) and the
AsyncGet
method does similar thing with the Get
message. In addition, both of
the methods that an optional parameter ?timeout
that can be used to limit the waiting time.
Let's now look at the structure and we'll discuss the private state of the agent as well as the
state machine that implements its behavior later:
1: /// Agent that implements an asynchronous blocking queue 2: type BlockingQueueAgent<'T>(maxLength) = 3: (initialization and private state omitted) 4: let agent = Agent.Start(fun agent -> 5: // Keeps a list of items that are in the queue 6: let queue = new Queue<_>() 7: // Keeps a list of blocked callers and additional values 8: let pending = new Queue<_>() 9: 10: // State machine that implements agent's behavior 11: let rec emptyQueue() = (...) 12: and nonEmptyQueue() = (...) 13: 14: // Start with an empty queue 15: emptyQueue() ) 16: 17: /// Returns the number of items in the queue (immediately) 18: member x.Count = count 19: 20: /// Asynchronously adds item; blocks if queue is full 21: member x.AsyncAdd(v:'T, ?timeout) = 22: agent.PostAndAsyncReply((fun ch -> Add(v, ch)), ?timeout=timeout) 23: 24: /// Asynchronously gets item; blocks if queue is empty 25: member x.AsyncGet(?timeout) = 26: agent.PostAndAsyncReply(Get, ?timeout=timeout)F# Web Snippets
The type contains a single private field that is marked as VolatileFiled
and is used to track
the number of elements in the queue. The value can be obtained using the Count
member, which can
be safely accessed from multiple threads. Next, the queue creates an agent that is used to keep the state
of the queue and handles the messages that we declared earlier.
Inside the agent, we use mutable state, because the mutable Queue<T>
type from .NET libraries
is faster than a functional implementation. Note that this is also perfectly safe because the body of the agent will never execute multiple times
in parallel. We use two queues in the implementation of the agent. The first one is used to keep items that
are currently in the queue (as the tool tips shows if you place mouse pointer over the queue
identifier, it contains values of type 'T
). The second queue keeps a list of pending calls to the
Add
method - that is, calls that wanted to add some value, but couldn't finish, because the queue
was full (as we'll see later, we'll add values together with reply channels to this queue when a call is done
and the queue is full and we'll process the pending calls when an element is removed).
The body of the agent contains two functions (both implemented using asynchronous workflows) that implement
the behavior of the agent. As the names suggest, the function emptyQueue
represents a state when
the queue is empty (and cannot handle Get
messages) and the other function (nonEmptyQueue
)
is used when the queue contains some elements. I omitted the body of the functions in the previous snippet to
show the overall structure, but we'll look at the implementation next...
Encoding agent's behavior
Let's start with the function that implements an empty queue. As already mentioned, in this state we can
only handle the Add
message, so the implementation uses the Scan
member of the
mailbox to create computation that reacts only to some messages. The function passed to the Scan
member returns None
to ignore message and Some
with some asynchronous workflow
as an argument to handle a message:
1: // If the queue is empty we handle only 'Add' message 2: let rec emptyQueue() = 3: agent.Scan(fun msg -> 4: match msg with 5: | Add(value, reply) -> Some <| async { 6: queue.Enqueue(value) 7: count <- queue.Count 8: reply.Reply() 9: return! nonEmptyQueue() } 10: | _ -> None )F# Web Snippets
When we receive the Add
message, we return a workflow that adds the value to a queue,
updates the number of elements in the queue and sends reply to the caller (as a notification that the
value was added). Consequently, the queue is no longer empty, so it switches to the other state.
Note that any Get
messages that are sent to the agent while in the emptyQueue
state will be automatically buffered by the agent and will be processed when the agent switches the state.
The second function (nonEmptyQueue
) can handle both of the messages, so we can implement
it as an asynchronous workflow that awaits the first message (using Receive
method of the
agent) and then reacts to the message. When adding a value, we need to handle a case when the
queue is already full. When removing a value, we then check if there are any pending calls that
have been blocked while attempting to add a value. These blocked calls are stored in the pending
queue:
1: // If the queue is non-empty, we can handle all messages 2: and nonEmptyQueue() = async { 3: let! msg = agent.Receive() 4: match msg with 5: | Add(value, reply) -> 6: // Add immediately if possibley, otherwise block caller 7: if queue.Count < maxLength then 8: queue.Enqueue(value) 9: count <- queue.Count 10: reply.Reply() 11: else 12: pending.Enqueue(value, reply) 13: return! nonEmptyQueue() 14: | Get(reply) -> 15: let item = queue.Dequeue() 16: // Unblock some blocked callers as we have some space now 17: while queue.Count < maxLength && pending.Count > 0 do 18: let itm, caller = pending.Dequeue() 19: queue.Enqueue(itm) 20: caller.Reply() 21: count <- queue.Count 22: reply.Reply(item) 23: // If the queue is empty then switch the state, otherwise loop 24: if queue.Count = 0 then return! emptyQueue() 25: else return! nonEmptyQueue() }F# Web Snippets
When adding a value to a queue that is not full, we can immediately add the element using queue.Enqueue
,
update the number of elements and notify the caller that a value was successfully added. When the
queue is empty, we store the request in the pending
queue, which keeps both a value
to be added and a reply channel that can be later used to notify the caller.
When removing an element, we can always pick an element using the queue.Dequeue
method,
because we're in a state when the queue is not empty. After removing an element, we check if there
are any pending calls. We use a loop that runs until the queue becomes full or until we process
all pending calls. In the body of the loop, we remove one pending call, add value to the queue and
unblock the caller by replying to the asynchronous reply channel. Finally, we update count
and send value to the original sender of the Get
message. Then we continue in one
of the two states, depending on whether the queue is now empty or not.
Summary
In this article, we looked at the implementation of an F# agent that implements a blocking queue
that has a maximal number of elements. When getting an element from an empty queue or adding an
element to a full queue, the caller will be blocked until an element becomes available or an element
is removed, respectively. The BlockingQueueAgent<T>
type can be quite useful in
parallel programming using asynchronous workflows. I demonstrated how to work with the type
another article, which
uses it to implement an image processing pipeline. However, the agent can be useful in many
parallel patterns other than pipeline processing.
Finally, the article is also a good demonstration of message-based parallelism using agents in
F#. We've seen a common overall structure of an agent encapsulated in a type. We used two
operations for receiving messages (Scan
and Receive
) and we also
used mutually recursive functions to encode a state machine that implements the behavior of an agent.
Source Code
- Download F# Parallel Extras samples (ZIP)
- Get the latest version of samples from F# Community Samples at CodePlex
Full name: Untitled.Agent<_>
type: Agent<'T>
implements: IDisposable
A convenience type alias for 'MailboxProcessor<T>' type
class
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member TryReceive : ?timeout:int -> Async<'Msg option>
member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member add_Error : Handler<Exception> -> unit
member CurrentQueueLength : int
member DefaultTimeout : int
member Error : IEvent<Exception>
member remove_Error : Handler<Exception> -> unit
member DefaultTimeout : int with set
static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
end
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
type: MailboxProcessor<'Msg>
implements: IDisposable
| Add of 'T * AsyncReplyChannel<unit>
| Get of AsyncReplyChannel<'T>
Full name: Untitled.BlockingAgentMessage<_>
type: BlockingAgentMessage<'T>
implements: IEquatable<BlockingAgentMessage<'T>>
implements: Collections.IStructuralEquatable
Type of messages internally used by 'BlockingQueueAgent<T>'
with
member Reply : value:'Reply -> unit
end
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Full name: Microsoft.FSharp.Core.unit
type: unit
implements: IComparable
class
new : maxLength:int -> BlockingQueueAgent<'T>
member AsyncAdd : v:'T * ?timeout:int -> Async<unit>
member AsyncGet : ?timeout:int -> Async<'T>
member Count : int
end
Full name: Untitled.BlockingQueueAgent<_>
Agent that implements an asynchronous blocking queue
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
if maxLength <= 0 then
invalidArg "maxLenght" "Maximal length of the queue should be positive."
// We keep the number of elements in the queue in a local field to
// make it immediately available (so that 'Count' property doesn't
// have to use messages - which would be a bit slower)
[<VolatileField>]
let mutable count = 0
type: MailboxProcessor<BlockingAgentMessage<'T>>
implements: IDisposable
type: Queue<'T>
implements: seq<'T>
implements: Collections.ICollection
implements: Collections.IEnumerable
class
new : unit -> System.Collections.Generic.Queue<'T>
new : int -> System.Collections.Generic.Queue<'T>
new : System.Collections.Generic.IEnumerable<'T> -> System.Collections.Generic.Queue<'T>
member Clear : unit -> unit
member Contains : 'T -> bool
member CopyTo : 'T [] * int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : 'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T []
member TrimExcess : unit -> unit
type Enumerator =
struct
member Current : 'T
member Dispose : unit -> unit
member MoveNext : unit -> bool
end
end
Full name: System.Collections.Generic.Queue<_>
type: Queue<'T>
implements: seq<'T>
implements: Collections.ICollection
implements: Collections.IEnumerable
type: Queue<'T * AsyncReplyChannel<unit>>
implements: seq<'T * AsyncReplyChannel<unit>>
implements: Collections.ICollection
implements: Collections.IEnumerable
match msg with
| Add(value, reply) -> Some <| async {
queue.Enqueue(value)
count <- queue.Count
reply.Reply()
return! nonEmptyQueue() }
| _ -> None )
let! msg = agent.Receive()
match msg with
| Add(value, reply) ->
// Add immediately if possibley, otherwise block caller
if queue.Count < maxLength then
queue.Enqueue(value)
count <- queue.Count
reply.Reply()
else
pending.Enqueue(value, reply)
return! nonEmptyQueue()
| Get(reply) ->
let item = queue.Dequeue()
// Unblock some blocked callers as we have some space now
while queue.Count < maxLength && pending.Count > 0 do
let itm, caller = pending.Dequeue()
queue.Enqueue(itm)
caller.Reply()
count <- queue.Count
reply.Reply(item)
// If the queue is empty then switch the state, otherwise loop
if queue.Count = 0 then return! emptyQueue()
else return! nonEmptyQueue() }
Full name: Untitled.BlockingQueueAgent`1.Count
Returns the number of items in the queue (immediately)
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
Full name: Untitled.BlockingQueueAgent`1.AsyncAdd
Asynchronously adds item; blocks if queue is full
type: int option
implements: Collections.IStructuralEquatable
implements: IComparable<Option<int>>
implements: IComparable
implements: Collections.IStructuralComparable
Full name: Untitled.BlockingQueueAgent`1.AsyncGet
Asynchronously gets item; blocks if queue is empty
type: BlockingAgentMessage<'T>
implements: IEquatable<BlockingAgentMessage<'T>>
implements: Collections.IStructuralEquatable
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Published: Wednesday, 27 October 2010, 11:12 AM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: functional, parallel, asynchronous, f#