F# Parallel Extras (II.): Agent-based blocking queue

In the previous article, we briefly introduced the BlockingQueueAgent<T> type and we used it to implement the pipeline pattern (from Chapter 7 of Parallel Programming with Microsoft .NET) using asynchronous workflows. The type was used to represent intermediate buffers with a limited size. In this article we'll take a look at the implementation of the type. The type implements a very useful pattern in agent-based parallel programming, so you can use it in your work, but it could be also interesting as a demonstration of the F# Agent<T> type (an recommended alias for the MailboxProcessor<T> type).

The BlockingQueueAgent<T> type is similar to BlockingCollection<T> from .NET 4.0. It has methods for adding and removing elements that block when the operation cannot be done (e.g. adding when the queue is full or removing when the queue is empty). The most important difference is that it can be used asynchronously. This means that when we call its operations form F# asynchronous workflow (using let! and do!), the operation will block the calling workflow, but it will not block any physical thread. We start by looking at the overall structure of the agent and then look at the body of the agent which implements its behavior (using a state machine)...

Structure of the blocking agent

An agent is simply an object that can receive messages of some type. The best way to represent messages is to use an F# discriminated union type which lists all the possible commands that the agent can react to. To make the use of agent more convenient, it is quite common to wrap the agent in a type declaration, so that the messages can be send just by calling a member.

Defining the message type

Let's start by defining a type alias Agent<T> and defining a message type for our agent. As you can see, the agent is surprisingly simple and it needs only two messages:

1: /// A convenience type alias for 'MailboxProcessor<T>' type
2: type Agent<'T> = MailboxProcessor<'T>
3: 
4: /// Type of messages internally used by 'BlockingQueueAgent<T>'
5: type internal BlockingAgentMessage<'T> = 
6:   | Add of 'T * AsyncReplyChannel<unit> 
7:   | Get of AsyncReplyChannel<'T>F# Web Snippets

The first message is used to add a new element to the blocking queue. It carries a value to be added and a reply channel. The channel is used to notify the caller that the element was added (and so the caller can continue producing more elements). The second message is used to read elements from the queue is using asynchronous reply channel to send the obtained value to the caller. Now that we have the message type, we can look at the structure of the wrapping type...

Wrapping agent in a type

The BlockingQueueAgent<T> type takes a maximal number of elements that can be stored in the queue as a constructor parameter. It has a single property Count, which returns the current number of elements in the queue and it has two methods. The AsyncAdd method sends the Add message to the agent (and asynchronously waits for the reply) and the AsyncGet method does similar thing with the Get message. In addition, both of the methods that an optional parameter ?timeout that can be used to limit the waiting time. Let's now look at the structure and we'll discuss the private state of the agent as well as the state machine that implements its behavior later:

 1: /// Agent that implements an asynchronous blocking queue
 2: type BlockingQueueAgent<'T>(maxLength) =
 3:   (initialization and private state omitted)
 4:   let agent = Agent.Start(fun agent ->
 5:     // Keeps a list of items that are in the queue
 6:     let queue = new Queue<_>()
 7:     // Keeps a list of blocked callers and additional values
 8:     let pending = new Queue<_>()
 9: 
10:     // State machine that implements agent's behavior
11:     let rec emptyQueue() = (...)
12:     and nonEmptyQueue() = (...)
13: 
14:     // Start with an empty queue
15:     emptyQueue() )
16: 
17:   /// Returns the number of items in the queue (immediately)
18:   member x.Count = count
19: 
20:   /// Asynchronously adds item; blocks if queue is full
21:   member x.AsyncAdd(v:'T, ?timeout) = 
22:     agent.PostAndAsyncReply((fun ch -> Add(v, ch)), ?timeout=timeout)
23: 
24:   /// Asynchronously gets item; blocks if queue is empty
25:   member x.AsyncGet(?timeout) = 
26:     agent.PostAndAsyncReply(Get, ?timeout=timeout)F# Web Snippets

The type contains a single private field that is marked as VolatileFiled and is used to track the number of elements in the queue. The value can be obtained using the Count member, which can be safely accessed from multiple threads. Next, the queue creates an agent that is used to keep the state of the queue and handles the messages that we declared earlier.

Inside the agent, we use mutable state, because the mutable Queue<T> type from .NET libraries is faster than a functional implementation. Note that this is also perfectly safe because the body of the agent will never execute multiple times in parallel. We use two queues in the implementation of the agent. The first one is used to keep items that are currently in the queue (as the tool tips shows if you place mouse pointer over the queue identifier, it contains values of type 'T). The second queue keeps a list of pending calls to the Add method - that is, calls that wanted to add some value, but couldn't finish, because the queue was full (as we'll see later, we'll add values together with reply channels to this queue when a call is done and the queue is full and we'll process the pending calls when an element is removed).

The body of the agent contains two functions (both implemented using asynchronous workflows) that implement the behavior of the agent. As the names suggest, the function emptyQueue represents a state when the queue is empty (and cannot handle Get messages) and the other function (nonEmptyQueue) is used when the queue contains some elements. I omitted the body of the functions in the previous snippet to show the overall structure, but we'll look at the implementation next...

Encoding agent's behavior

Let's start with the function that implements an empty queue. As already mentioned, in this state we can only handle the Add message, so the implementation uses the Scan member of the mailbox to create computation that reacts only to some messages. The function passed to the Scan member returns None to ignore message and Some with some asynchronous workflow as an argument to handle a message:

 1: // If the queue is empty we handle only 'Add' message
 2: let rec emptyQueue() = 
 3:   agent.Scan(fun msg ->
 4:     match msg with 
 5:     | Add(value, reply) -> Some <| async {
 6:         queue.Enqueue(value)
 7:         count <- queue.Count
 8:         reply.Reply()
 9:         return! nonEmptyQueue() }
10:     | _ -> None )F# Web Snippets

When we receive the Add message, we return a workflow that adds the value to a queue, updates the number of elements in the queue and sends reply to the caller (as a notification that the value was added). Consequently, the queue is no longer empty, so it switches to the other state. Note that any Get messages that are sent to the agent while in the emptyQueue state will be automatically buffered by the agent and will be processed when the agent switches the state.

The second function (nonEmptyQueue) can handle both of the messages, so we can implement it as an asynchronous workflow that awaits the first message (using Receive method of the agent) and then reacts to the message. When adding a value, we need to handle a case when the queue is already full. When removing a value, we then check if there are any pending calls that have been blocked while attempting to add a value. These blocked calls are stored in the pending queue:

 1: // If the queue is non-empty, we can handle all messages
 2: and nonEmptyQueue() = async {
 3:   let! msg = agent.Receive()
 4:   match msg with 
 5:   | Add(value, reply) -> 
 6:       // Add immediately if possibley, otherwise block caller
 7:       if queue.Count < maxLength then 
 8:         queue.Enqueue(value)
 9:         count <- queue.Count
10:         reply.Reply()
11:       else 
12:         pending.Enqueue(value, reply) 
13:       return! nonEmptyQueue()
14:   | Get(reply) -> 
15:       let item = queue.Dequeue()
16:       // Unblock some blocked callers as we have some space now
17:       while queue.Count < maxLength && pending.Count > 0 do
18:         let itm, caller = pending.Dequeue()
19:         queue.Enqueue(itm)
20:         caller.Reply()
21:       count <- queue.Count
22:       reply.Reply(item)
23:       // If the queue is empty then switch the state, otherwise loop
24:       if queue.Count = 0 then return! emptyQueue()
25:       else return! nonEmptyQueue() }F# Web Snippets

When adding a value to a queue that is not full, we can immediately add the element using queue.Enqueue, update the number of elements and notify the caller that a value was successfully added. When the queue is empty, we store the request in the pending queue, which keeps both a value to be added and a reply channel that can be later used to notify the caller.

When removing an element, we can always pick an element using the queue.Dequeue method, because we're in a state when the queue is not empty. After removing an element, we check if there are any pending calls. We use a loop that runs until the queue becomes full or until we process all pending calls. In the body of the loop, we remove one pending call, add value to the queue and unblock the caller by replying to the asynchronous reply channel. Finally, we update count and send value to the original sender of the Get message. Then we continue in one of the two states, depending on whether the queue is now empty or not.

Summary

In this article, we looked at the implementation of an F# agent that implements a blocking queue that has a maximal number of elements. When getting an element from an empty queue or adding an element to a full queue, the caller will be blocked until an element becomes available or an element is removed, respectively. The BlockingQueueAgent<T> type can be quite useful in parallel programming using asynchronous workflows. I demonstrated how to work with the type another article, which uses it to implement an image processing pipeline. However, the agent can be useful in many parallel patterns other than pipeline processing.

Finally, the article is also a good demonstration of message-based parallelism using agents in F#. We've seen a common overall structure of an agent encapsulated in a type. We used two operations for receiving messages (Scan and Receive) and we also used mutually recursive functions to encode a state machine that implements the behavior of an agent.

Source Code

type Agent<'T> = MailboxProcessor<'T>

Full name: Untitled.Agent<_>

  type: Agent<'T>
  implements: IDisposable


A convenience type alias for 'MailboxProcessor<T>' type
type MailboxProcessor<'Msg> =
  class
    interface IDisposable
    new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
    member Post : message:'Msg -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
    member Receive : ?timeout:int -> Async<'Msg>
    member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
    member Start : unit -> unit
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
    member TryReceive : ?timeout:int -> Async<'Msg option>
    member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
    member add_Error : Handler<Exception> -> unit
    member CurrentQueueLength : int
    member DefaultTimeout : int
    member Error : IEvent<Exception>
    member remove_Error : Handler<Exception> -> unit
    member DefaultTimeout : int with set
    static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

  type: MailboxProcessor<'Msg>
  implements: IDisposable
type internal BlockingAgentMessage<'T> =
  | Add of 'T * AsyncReplyChannel<unit>
  | Get of AsyncReplyChannel<'T>

Full name: Untitled.BlockingAgentMessage<_>

  type: BlockingAgentMessage<'T>
  implements: IEquatable<BlockingAgentMessage<'T>>
  implements: Collections.IStructuralEquatable


Type of messages internally used by 'BlockingQueueAgent<T>'
union case BlockingAgentMessage.Add: 'T * AsyncReplyChannel<unit> -> BlockingAgentMessage<'T>
type AsyncReplyChannel<'Reply>
with
  member Reply : value:'Reply -> unit
end

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit

  type: unit
  implements: IComparable
union case BlockingAgentMessage.Get: AsyncReplyChannel<'T> -> BlockingAgentMessage<'T>
type BlockingQueueAgent<'T> =
  class
    new : maxLength:int -> BlockingQueueAgent<'T>
    member AsyncAdd : v:'T * ?timeout:int -> Async<unit>
    member AsyncGet : ?timeout:int -> Async<'T>
    member Count : int
  end

Full name: Untitled.BlockingQueueAgent<_>

Agent that implements an asynchronous blocking queue
val maxLength : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
do
  if maxLength <= 0 then
    invalidArg "maxLenght" "Maximal length of the queue should be positive."

// We keep the number of elements in the queue in a local field to
// make it immediately available (so that 'Count' property doesn't
// have to use messages - which would be a bit slower)
[<VolatileField>]
let mutable count = 0
val agent : MailboxProcessor<BlockingAgentMessage<'T>>

  type: MailboxProcessor<BlockingAgentMessage<'T>>
  implements: IDisposable
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val queue : Queue<'T>

  type: Queue<'T>
  implements: seq<'T>
  implements: Collections.ICollection
  implements: Collections.IEnumerable
type Queue<'T> =
  class
    new : unit -> System.Collections.Generic.Queue<'T>
    new : int -> System.Collections.Generic.Queue<'T>
    new : System.Collections.Generic.IEnumerable<'T> -> System.Collections.Generic.Queue<'T>
    member Clear : unit -> unit
    member Contains : 'T -> bool
    member CopyTo : 'T [] * int -> unit
    member Count : int
    member Dequeue : unit -> 'T
    member Enqueue : 'T -> unit
    member GetEnumerator : unit -> Enumerator<'T>
    member Peek : unit -> 'T
    member ToArray : unit -> 'T []
    member TrimExcess : unit -> unit
    type Enumerator =
      struct
        member Current : 'T
        member Dispose : unit -> unit
        member MoveNext : unit -> bool
      end
  end

Full name: System.Collections.Generic.Queue<_>

  type: Queue<'T>
  implements: seq<'T>
  implements: Collections.ICollection
  implements: Collections.IEnumerable
val pending : Queue<'T * AsyncReplyChannel<unit>>

  type: Queue<'T * AsyncReplyChannel<unit>>
  implements: seq<'T * AsyncReplyChannel<unit>>
  implements: Collections.ICollection
  implements: Collections.IEnumerable
val emptyQueue : (unit -> Async<'a>)
agent.Scan(fun msg ->
  match msg with
  | Add(value, reply) -> Some <| async {
      queue.Enqueue(value)
      count <- queue.Count
      reply.Reply()
      return! nonEmptyQueue() }
  | _ -> None )
val nonEmptyQueue : (unit -> Async<'a>)
async {
  let! msg = agent.Receive()
  match msg with
  | Add(value, reply) ->
      // Add immediately if possibley, otherwise block caller
      if queue.Count < maxLength then
        queue.Enqueue(value)
        count <- queue.Count
        reply.Reply()
      else
        pending.Enqueue(value, reply)
      return! nonEmptyQueue()
  | Get(reply) ->
      let item = queue.Dequeue()
      // Unblock some blocked callers as we have some space now
      while queue.Count < maxLength && pending.Count > 0 do
        let itm, caller = pending.Dequeue()
        queue.Enqueue(itm)
        caller.Reply()
      count <- queue.Count
      reply.Reply(item)
      // If the queue is empty then switch the state, otherwise loop
      if queue.Count = 0 then return! emptyQueue()
      else return! nonEmptyQueue() }
val x : BlockingQueueAgent<'T>
member BlockingQueueAgent.Count : int

Full name: Untitled.BlockingQueueAgent`1.Count

Returns the number of items in the queue (immediately)
val mutable count : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
member BlockingQueueAgent.AsyncAdd : v:'T * ?timeout:int -> Async<unit>

Full name: Untitled.BlockingQueueAgent`1.AsyncAdd

Asynchronously adds item; blocks if queue is full
val v : 'T
val timeout : int option

  type: int option
  implements: Collections.IStructuralEquatable
  implements: IComparable<Option<int>>
  implements: IComparable
  implements: Collections.IStructuralComparable
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val ch : AsyncReplyChannel<unit>
member BlockingQueueAgent.AsyncGet : ?timeout:int -> Async<'T>

Full name: Untitled.BlockingQueueAgent`1.AsyncGet

Asynchronously gets item; blocks if queue is empty
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
val msg : BlockingAgentMessage<'T>

  type: BlockingAgentMessage<'T>
  implements: IEquatable<BlockingAgentMessage<'T>>
  implements: Collections.IStructuralEquatable
val value : 'T
val reply : AsyncReplyChannel<unit>
union case Option.Some: 'T -> Option<'T>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Queue.Enqueue(item: 'T) : unit
property Queue.Count: int
member AsyncReplyChannel.Reply : value:'Reply -> unit
union case Option.None: Option<'T>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
Queue.Enqueue(item: 'T * AsyncReplyChannel<unit>) : unit
val reply : AsyncReplyChannel<'T>
val item : 'T
Queue.Dequeue() : 'T
val itm : 'T
val caller : AsyncReplyChannel<unit>
Queue.Dequeue() : 'T * AsyncReplyChannel<unit>

Discuss on twitter, .
Send corrections via GitHub pull requests.