F# Parallel Extras (II.): Agent-based blocking queue
In the previous article, we 
  briefly introduced the BlockingQueueAgent<T> type and we used it to implement the
  pipeline pattern (from Chapter 7 of Parallel Programming with Microsoft .NET)
  using asynchronous workflows. The type was used to represent intermediate buffers with a limited size.
  In this article we'll take a look at the implementation of the type. The type implements a very useful
  pattern in agent-based parallel programming, so you can use it in your work, but it could be also 
  interesting as a demonstration of the F# Agent<T> type (an recommended alias 
  for the MailboxProcessor<T> type).
The BlockingQueueAgent<T> type is similar to BlockingCollection<T> 
  from .NET 4.0. It has methods for adding and removing elements that block when the operation cannot be
  done (e.g. adding when the queue is full or removing when the queue is empty). The most important 
  difference is that it can be used asynchronously. This means that when we call its operations
  form F# asynchronous workflow (using let! and do!), the operation will block
  the calling workflow, but it will not block any physical thread. We start by looking at the overall
  structure of the agent and then look at the body of the agent which implements its behavior (using
  a state machine)...
Structure of the blocking agent
An agent is simply an object that can receive messages of some type. The best way to represent messages is to use an F# discriminated union type which lists all the possible commands that the agent can react to. To make the use of agent more convenient, it is quite common to wrap the agent in a type declaration, so that the messages can be send just by calling a member.
Defining the message type
Let's start by defining a type alias Agent<T> and defining a message type for
  our agent. As you can see, the agent is surprisingly simple and it needs only two messages:
1: /// A convenience type alias for 'MailboxProcessor<T>' type 2: type Agent<'T> = MailboxProcessor<'T> 3: 4: /// Type of messages internally used by 'BlockingQueueAgent<T>' 5: type internal BlockingAgentMessage<'T> = 6: | Add of 'T * AsyncReplyChannel<unit> 7: | Get of AsyncReplyChannel<'T>F# Web Snippets
The first message is used to add a new element to the blocking queue. It carries a value to be added and a reply channel. The channel is used to notify the caller that the element was added (and so the caller can continue producing more elements). The second message is used to read elements from the queue is using asynchronous reply channel to send the obtained value to the caller. Now that we have the message type, we can look at the structure of the wrapping type...
Wrapping agent in a type
The BlockingQueueAgent<T> type takes a maximal number of elements that can be stored
  in the queue as a constructor parameter. It has a single property Count, which returns the 
  current number of elements in the queue and it has two methods. The AsyncAdd method
  sends the Add message to the agent (and asynchronously waits for the reply) and the
  AsyncGet method does similar thing with the Get message. In addition, both of 
  the methods that an optional parameter ?timeout that can be used to limit the waiting time.
  Let's now look at the structure and we'll discuss the private state of the agent as well as the 
  state machine that implements its behavior later:
1: /// Agent that implements an asynchronous blocking queue 2: type BlockingQueueAgent<'T>(maxLength) = 3: (initialization and private state omitted) 4: let agent = Agent.Start(fun agent -> 5: // Keeps a list of items that are in the queue 6: let queue = new Queue<_>() 7: // Keeps a list of blocked callers and additional values 8: let pending = new Queue<_>() 9: 10: // State machine that implements agent's behavior 11: let rec emptyQueue() = (...) 12: and nonEmptyQueue() = (...) 13: 14: // Start with an empty queue 15: emptyQueue() ) 16: 17: /// Returns the number of items in the queue (immediately) 18: member x.Count = count 19: 20: /// Asynchronously adds item; blocks if queue is full 21: member x.AsyncAdd(v:'T, ?timeout) = 22: agent.PostAndAsyncReply((fun ch -> Add(v, ch)), ?timeout=timeout) 23: 24: /// Asynchronously gets item; blocks if queue is empty 25: member x.AsyncGet(?timeout) = 26: agent.PostAndAsyncReply(Get, ?timeout=timeout)F# Web Snippets
The type contains a single private field that is marked as VolatileFiled and is used to track
  the number of elements in the queue. The value can be obtained using the Count member, which can 
  be safely accessed from multiple threads. Next, the queue creates an agent that is used to keep the state
  of the queue and handles the messages that we declared earlier.
Inside the agent, we use mutable state, because the mutable Queue<T> type from .NET libraries
  is faster than a functional implementation. Note that this is also perfectly safe because the body of the agent will never execute multiple times
  in parallel. We use two queues in the implementation of the agent. The first one is used to keep items that
  are currently in the queue (as the tool tips shows if you place mouse pointer over the queue
  identifier, it contains values of type 'T). The second queue keeps a list of pending calls to the
  Add method - that is, calls that wanted to add some value, but couldn't finish, because the queue
  was full (as we'll see later, we'll add values together with reply channels to this queue when a call is done
  and the queue is full and we'll process the pending calls when an element is removed).
The body of the agent contains two functions (both implemented using asynchronous workflows) that implement
  the behavior of the agent. As the names suggest, the function emptyQueue represents a state when
  the queue is empty (and cannot handle Get messages) and the other function (nonEmptyQueue)
  is used when the queue contains some elements. I omitted the body of the functions in the previous snippet to 
  show the overall structure, but we'll look at the implementation next...
Encoding agent's behavior
Let's start with the function that implements an empty queue. As already mentioned, in this state we can
  only handle the Add message, so the implementation uses the Scan member of the
  mailbox to create computation that reacts only to some messages. The function passed to the Scan
  member returns None to ignore message and Some with some asynchronous workflow
  as an argument to handle a message:
1: // If the queue is empty we handle only 'Add' message 2: let rec emptyQueue() = 3: agent.Scan(fun msg -> 4: match msg with 5: | Add(value, reply) -> Some <| async { 6: queue.Enqueue(value) 7: count <- queue.Count 8: reply.Reply() 9: return! nonEmptyQueue() } 10: | _ -> None )F# Web Snippets
When we receive the Add message, we return a workflow that adds the value to a queue, 
  updates the number of elements in the queue and sends reply to the caller (as a notification that the 
  value was added). Consequently, the queue is no longer empty, so it switches to the other state.
  Note that any Get messages that are sent to the agent while in the emptyQueue
  state will be automatically buffered by the agent and will be processed when the agent switches the state.
The second function (nonEmptyQueue) can handle both of the messages, so we can implement
  it as an asynchronous workflow that awaits the first message (using Receive method of the
  agent) and then reacts to the message. When adding a value, we need to handle a case when the 
  queue is already full. When removing a value, we then check if there are any pending calls that
  have been blocked while attempting to add a value. These blocked calls are stored in the pending
  queue:
1: // If the queue is non-empty, we can handle all messages 2: and nonEmptyQueue() = async { 3: let! msg = agent.Receive() 4: match msg with 5: | Add(value, reply) -> 6: // Add immediately if possibley, otherwise block caller 7: if queue.Count < maxLength then 8: queue.Enqueue(value) 9: count <- queue.Count 10: reply.Reply() 11: else 12: pending.Enqueue(value, reply) 13: return! nonEmptyQueue() 14: | Get(reply) -> 15: let item = queue.Dequeue() 16: // Unblock some blocked callers as we have some space now 17: while queue.Count < maxLength && pending.Count > 0 do 18: let itm, caller = pending.Dequeue() 19: queue.Enqueue(itm) 20: caller.Reply() 21: count <- queue.Count 22: reply.Reply(item) 23: // If the queue is empty then switch the state, otherwise loop 24: if queue.Count = 0 then return! emptyQueue() 25: else return! nonEmptyQueue() }F# Web Snippets
When adding a value to a queue that is not full, we can immediately add the element using queue.Enqueue,
  update the number of elements and notify the caller that a value was successfully added. When the 
  queue is empty, we store the request in the pending queue, which keeps both a value
  to be added and a reply channel that can be later used to notify the caller.
When removing an element, we can always pick an element using the queue.Dequeue method,
  because we're in a state when the queue is not empty. After removing an element, we check if there
  are any pending calls. We use a loop that runs until the queue becomes full or until we process 
  all pending calls. In the body of the loop, we remove one pending call, add value to the queue and
  unblock the caller by replying to the asynchronous reply channel. Finally, we update count
  and send value to the original sender of the Get message. Then we continue in one
  of the two states, depending on whether the queue is now empty or not. 
Summary
In this article, we looked at the implementation of an F# agent that implements a blocking queue
  that has a maximal number of elements. When getting an element from an empty queue or adding an 
  element to a full queue, the caller will be blocked until an element becomes available or an element
  is removed, respectively. The BlockingQueueAgent<T> type can be quite useful in 
  parallel programming using asynchronous workflows. I demonstrated how to work with the type
  another article, which 
  uses it to implement an image processing pipeline. However, the agent can be useful in many
  parallel patterns other than pipeline processing.
Finally, the article is also a good demonstration of message-based parallelism using agents in 
  F#. We've seen a common overall structure of an agent encapsulated in a type. We used two 
  operations for receiving messages (Scan and Receive) and we also 
  used mutually recursive functions to encode a state machine that implements the behavior of an agent.
Source Code
- Download F# Parallel Extras samples (ZIP)
- Get the latest version of samples from F# Community Samples at CodePlex
Full name: Untitled.Agent<_>
type: Agent<'T>
implements: IDisposable
A convenience type alias for 'MailboxProcessor<T>' type
class
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member TryReceive : ?timeout:int -> Async<'Msg option>
member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member add_Error : Handler<Exception> -> unit
member CurrentQueueLength : int
member DefaultTimeout : int
member Error : IEvent<Exception>
member remove_Error : Handler<Exception> -> unit
member DefaultTimeout : int with set
static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
end
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
type: MailboxProcessor<'Msg>
implements: IDisposable
| Add of 'T * AsyncReplyChannel<unit>
| Get of AsyncReplyChannel<'T>
Full name: Untitled.BlockingAgentMessage<_>
type: BlockingAgentMessage<'T>
implements: IEquatable<BlockingAgentMessage<'T>>
implements: Collections.IStructuralEquatable
Type of messages internally used by 'BlockingQueueAgent<T>'
with
member Reply : value:'Reply -> unit
end
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Full name: Microsoft.FSharp.Core.unit
type: unit
implements: IComparable
class
new : maxLength:int -> BlockingQueueAgent<'T>
member AsyncAdd : v:'T * ?timeout:int -> Async<unit>
member AsyncGet : ?timeout:int -> Async<'T>
member Count : int
end
Full name: Untitled.BlockingQueueAgent<_>
Agent that implements an asynchronous blocking queue
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
if maxLength <= 0 then
invalidArg "maxLenght" "Maximal length of the queue should be positive."
// We keep the number of elements in the queue in a local field to
// make it immediately available (so that 'Count' property doesn't
// have to use messages - which would be a bit slower)
[<VolatileField>]
let mutable count = 0
type: MailboxProcessor<BlockingAgentMessage<'T>>
implements: IDisposable
type: Queue<'T>
implements: seq<'T>
implements: Collections.ICollection
implements: Collections.IEnumerable
class
new : unit -> System.Collections.Generic.Queue<'T>
new : int -> System.Collections.Generic.Queue<'T>
new : System.Collections.Generic.IEnumerable<'T> -> System.Collections.Generic.Queue<'T>
member Clear : unit -> unit
member Contains : 'T -> bool
member CopyTo : 'T [] * int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : 'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T []
member TrimExcess : unit -> unit
type Enumerator =
struct
member Current : 'T
member Dispose : unit -> unit
member MoveNext : unit -> bool
end
end
Full name: System.Collections.Generic.Queue<_>
type: Queue<'T>
implements: seq<'T>
implements: Collections.ICollection
implements: Collections.IEnumerable
type: Queue<'T * AsyncReplyChannel<unit>>
implements: seq<'T * AsyncReplyChannel<unit>>
implements: Collections.ICollection
implements: Collections.IEnumerable
match msg with
| Add(value, reply) -> Some <| async {
queue.Enqueue(value)
count <- queue.Count
reply.Reply()
return! nonEmptyQueue() }
| _ -> None )
let! msg = agent.Receive()
match msg with
| Add(value, reply) ->
// Add immediately if possibley, otherwise block caller
if queue.Count < maxLength then
queue.Enqueue(value)
count <- queue.Count
reply.Reply()
else
pending.Enqueue(value, reply)
return! nonEmptyQueue()
| Get(reply) ->
let item = queue.Dequeue()
// Unblock some blocked callers as we have some space now
while queue.Count < maxLength && pending.Count > 0 do
let itm, caller = pending.Dequeue()
queue.Enqueue(itm)
caller.Reply()
count <- queue.Count
reply.Reply(item)
// If the queue is empty then switch the state, otherwise loop
if queue.Count = 0 then return! emptyQueue()
else return! nonEmptyQueue() }
Full name: Untitled.BlockingQueueAgent`1.Count
Returns the number of items in the queue (immediately)
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
Full name: Untitled.BlockingQueueAgent`1.AsyncAdd
Asynchronously adds item; blocks if queue is full
type: int option
implements: Collections.IStructuralEquatable
implements: IComparable<Option<int>>
implements: IComparable
implements: Collections.IStructuralComparable
Full name: Untitled.BlockingQueueAgent`1.AsyncGet
Asynchronously gets item; blocks if queue is empty
type: BlockingAgentMessage<'T>
implements: IEquatable<BlockingAgentMessage<'T>>
implements: Collections.IStructuralEquatable
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
      Published: Wednesday, 27 October 2010, 11:12 AM
      Author: Tomas Petricek
      Typos: Send me a pull request!
      
        Tags: functional, parallel, asynchronous, f#