TP

TryJoinads (VII.) - Implementing joinads for async workflows

The article Asynchronous workflows and joinads gives numerous examples of programming with asynchronous workflows using the match! construct. Briefly, when matching on multiple asynchronous workflows, they are executed in parallel. When pattern matching consists of multiple clauses, the clause that matches on computations that complete first gets executed. These two behaviours are implemented by the Merge and the Choose operation of joinads. Additionally, asynchronous workflows require the Alias operation, which makes it possible to share the result of a started asynchronous workflow in multiple clauses.

In this article, we look at the definition of the additional AsyncBuilder operations that enable the match! syntax. We do not look at additional examples of using the syntax, because these can be found in a previous article.

Note: This blog post is a re-publication of a tutorial from the TryJoinads.org web page. If you read the article there, you can run the examples interactively and experiment with them: view the article on TryJoinads.

Non-deterministic choice and parallel composition

Parallel composition of asynchronous workflows is implemented in the standard F# library by a combinator Async.Parallel, but a choice operation that returns the result of a first workflow is not available out-of-box. The following snippet shows a simple implementation that uses locking (and is based on an F# Snippet implementing Async.Choose).

Next, the snippet extends standard F# async computation builder with Fail, Choose and Merge:

open System

/// Add 'WhenAny' member to the 'Async' type
type Async =
  /// Creates an asynchronous workflow that non-deterministically returns the 
  /// result of one of the two specified workflows (the one that completes
  /// first). This is similar to Task.WhenAny.
  static member WhenAny([<ParamArray>] works:Async<'T>[]) : Async<'T> = 
    (Implementation omitted)

/// Extend the standard F# 'async' builder with 
/// additional operations to support joinads
type Microsoft.FSharp.Control.AsyncBuilder with

  /// Non-deterministically choose the first computation
  /// that succeeds; fails only when both computations fail
  member x.Choose(a, b) = Async.WhenAny(a, b)

  /// Represents a failed computation. It holds that:
  /// async.Choose(work, async.Fail()) = work
  member x.Fail<'T>() : Async<'T> = async { 
    return failwith "failed!" }

  /// Run the specified two computations in parallel and
  /// return their results as a tuple
  member x.Merge(a:Async<'T1>, b:Async<'T2>) : Async<'T1 * 'T2> = async {
    let map f v = async.Bind(v, f >> async.Return)
    let works = [ map box a; map box b ]
    let! res = Async.Parallel works
    return unbox res.[0], unbox res.[1] }

The operations implemented in the above snippet are fairly straightforward. For simplicity, the Fail operation is implemented using exceptions. This serves the required purpose, because Fail is used to represent failed pattern matching. When combined using Choose, the exception is only propagated if all clauses fail. The Merge operation is implemented using Async.Parallel and uses just a bit of additional wrapping and unwrapping, because Parallel takes an array of computations instead of a tuple.

The snippet itentionally does not add the Alias operation in order to demonstrate the difference in the behaviour. If you opened the namespace FSharp.Extensions.Joinads before, you might want to restart the F# Interactive session (right click in the console and select "Reset Session"). To get the expected behaviour, evaluating the expression async.Alias should say that it is undefined.

Example: The initial attempt

The following example demonstrates the behaviour of match! for asynchronous workflows without the definition of Alias operation. It is similar to the parallel programming example from the introduction. The after function produces a given value after certain time of waiting and it prints a message before returning, so that we can track how many times it was started. The rest of the snippet implements "or" operator for Async<bool> values with shortcircuiting:

/// Returns the specified value 'v' after 'time' milliseconds
let after time v = async {
  do! Async.Sleep(time)
  // Print when returning to show how many 
  // times the function is being executed
  printfn "Returning: %A" v
  return v }

// Run this example and check how many times 'after' gets called
async { 
  match! after 1000 false, after 2000 false with
  | true, ? -> printfn "First true"
  | ?, true -> printfn "Second true"
  | a, b -> printfn "Final: %b" (a || b) }
|> Async.Start

When you run the example, you can see that it prints "Returning: false" four times. This may be slightly unexpected, because match! is used with just two arguments! To understand how this is possible, let's look how the expression above is translated. The following is not complete translation, but it should give you the idea:

let after1000 = after 1000 false
let after2000 = after 2000 false

async.Choose
  (async.Bind(after1000, function ...)
   async.Choose
     (async.Bind(after2000, function ...)
      async.Bind
        (async.Merge(after1000, after2000)
         function ...)

For every clause, the translation identifies all arguments of match! that are matched against a binding pattern (any pattern other than ?). If there are multiple such patterns (i.e. the last clause), they are combined using Merge. The workflows are then passed to Bind, together with a function that performs the pattern matching.

Unlike Task<'T>, the type Async<'T> represents a delayed computation. When composing computations using Marge or Choose, we get a computation that starts both of the arguments (as new instances of the workflow). This means that the computation in the above example starts both after1000 and after2000 two times. This is not a problem for tasks, because a value Task<'T> represents a running computation that can be accessed multiple times. In order to get the same behaviour for asynchronous workflows, we need to explicitly implement aliasing.

Adding computation aliasing

The aliasing can be added by implementing an operation Alias of type Async<'T> -> Async<Async<'T>>. The operation returns a computation that, when executed, starts the workflow specified as argument and returns a new computation that can be used to wait for the result (but without starting a new instance of the argument).

In F#, this is exactly what Async.StartChild does, so adding the operation to AsyncBuilder is simple:

/// Adds aliasing to the F# 'async' builder
type Microsoft.FSharp.Control.AsyncBuilder with
  /// Returns a computation that, when started, runs the workflow 
  /// given as an argument and returns a new computation that 
  /// can be used to wait for the result of the started workflow
  member x.Alias(a) = Async.StartChild(a)

When the computation builder defines the Alias operation, the translation automatically applies the operation to all computations that are passed as arguments to match!. For the previous example, this means that both workflows after 1000 false and after 2000 false will be started just once. A workflow that represents aliased computation waiting for their completion can be used repeatedly. If you run the example again, the message is printed just once for each computation:

// Run this example and check how many times 'after' gets called
async { 
  match! after 1000 false, after 2000 false with
  | true, ? -> printfn "First true"
  | ?, true -> printfn "Second true"
  | a, b -> printfn "Final: %b" (a || b) }
|> Async.Start

When you run the example (after running the previous snippet which defines Alias), the "Returning" message should be printed only twice. Again, I won't show the exact translation, but it looks roughly as follows:

async.Bind(async.Alias(after 1000 false), fun after1000 ->
  async.Bind(async.Alias(after 2000 false), fun after2000 ->
    async.Choose
      (async.Bind(after1000, function ...)
       async.Choose
         (async.Bind(after2000, function ...)
          async.Bind
            (async.Merge(after1000, after2000)
             function ...)))

When executed, the computation applies Alias to all arguments and then binds on the outer monadic value (the Alias operation returns a value of type M<M<'T>>). For asynchronous workflows, this starts the computation in background. The value assigned to after1000 and after2000 has a type Async<'T>. When started, it simply waits until the background computation completes.

The Alias operation deserves attention on its own. In particular, it can be used to abstract away the evaluation strategy of a monadic computation. If it is inserted at appropriate places in monadic code, it can be defined to give call-by-name or call-by-value evaluation strategy. You can find more information in a related paper on the publications page.

Summary

This article demonstrated how to add support for match! to F# asynchronous workflows. This can be used to implement various programming patterns including I/O and user interfaces. The examples can be found in another article.

To support joinads in asynchronous workflows, we need to define Merge, Choose and Fail. This makes it possible to compose computations in both directions - horizontally and vertically. As asynchronous workflows represent delayed computations, we also need to explicitly implement the support for aliasing. This is done by defining Alias member of type M<'T> -> M<M<'T>>. For asynchronous workflows, this operation is already provided by Async.StartChild.

namespace System
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------
type Async =
  class
    static member WhenAny : works:Async<'T> [] -> Async<'T>
  end

Full name: Blog.Async


 Add 'WhenAny' member to the 'Async' type
static member Async.WhenAny : works:Async<'T> [] -> Async<'T>

Full name: Blog.Async.WhenAny


 Creates an asynchronous workflow that non-deterministically returns the
 result of one of the two specified workflows (the one that completes
 first). This is similar to Task.WhenAny.
type ParamArrayAttribute =
  class
    inherit System.Attribute
    new : unit -> System.ParamArrayAttribute
  end

Full name: System.ParamArrayAttribute
  type: ParamArrayAttribute
  inherits: Attribute
val works : Async<'T> []
  type: Async<'T> []
  inherits: Array
Async.FromContinuations(fun (cont, econt, ccont) ->
      // Results from the two
      let results = Array.map (fun _ -> Choice1Of3()) works
      let handled = ref false
      let lockObj = new obj()
      let synchronized f = lock lockObj f

      // Called when one of the workflows completes
      let complete () =
        let op =
          synchronized (fun () ->
            // If we already handled result (and called continuation)
            // then ignore. Otherwise, if the computation succeeds, then
            // run the continuation and mark state as handled.
            // Only throw if all workflows failed.
            if !handled then ignore
            else
              let succ = Seq.tryPick (function Choice2Of3 v -> Some v | _ -> None) results
              match succ with
              | Some value -> handled := true; (fun () -> cont value)
              | _ ->
                  if Seq.forall (function Choice3Of3 _ -> true | _ -> false) results then
                    let exs = Array.map (function Choice3Of3 ex -> ex | _ -> failwith "!") results
                    (fun () -> econt (AggregateException(exs)))
                  else ignore )
        // Actually run the continuation
        // (this shouldn't be done in the lock)
        op()

      // Run a workflow and write result (or exception to a ref cell)
      let run index workflow = async {
        try
          let! res = workflow
          synchronized (fun () -> results.[index] <- Choice2Of3 res)
        with e ->
          synchronized (fun () -> results.[index] <- Choice3Of3 e)
        complete() }

      // Start all work items - using StartImmediate, because it
      // should be started on the current synchronization context
      works |> Seq.iteri (fun index work ->
        Async.StartImmediate(run index work)) )
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
type AsyncBuilder =
  class
    private new : unit -> AsyncBuilder
    member Bind : computation:Async<'T> * binder:('T -> Async<'U>) -> Async<'U>
    member Combine : computation1:Async<unit> * computation2:Async<'T> -> Async<'T>
    member Delay : generator:(unit -> Async<'T>) -> Async<'T>
    member For : sequence:seq<'T> * body:('T -> Async<unit>) -> Async<unit>
    member Return : value:'T -> Async<'T>
    member ReturnFrom : computation:Async<'T> -> Async<'T>
    member TryFinally : computation:Async<'T> * compensation:(unit -> unit) -> Async<'T>
    member TryWith : computation:Async<'T> * catchHandler:(exn -> Async<'T>) -> Async<'T>
    member Using : resource:'T * binder:('T -> Async<'U>) -> Async<'U> (requires 'T :> IDisposable)
    member While : guard:(unit -> bool) * computation:Async<unit> -> Async<unit>
    member Zero : unit -> Async<unit>
  end

Full name: Microsoft.FSharp.Control.AsyncBuilder
val x : AsyncBuilder
member AsyncBuilder.Choose : a:Async<'a> * b:Async<'a> -> Async<'a>

Full name: Blog.Choose


 Non-deterministically choose the first computation
 that succeeds; fails only when both computations fail
val a : Async<'a>
val b : Async<'a>
static member Async.WhenAny : works:Async<'T> [] -> Async<'T>


 Creates an asynchronous workflow that non-deterministically returns the
 result of one of the two specified workflows (the one that completes
 first). This is similar to Task.WhenAny.
member AsyncBuilder.Fail : unit -> Async<'T>

Full name: Blog.Fail


 Represents a failed computation. It holds that:
 async.Choose(work, async.Fail()) = work
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val failwith : string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
member AsyncBuilder.Merge : a:Async<'T1> * b:Async<'T2> -> Async<'T1 * 'T2>

Full name: Blog.Merge


 Run the specified two computations in parallel and
 return their results as a tuple
val a : Async<'T1>
val b : Async<'T2>
val map : (('a -> 'b) -> Async<'a> -> Async<'b>)
val f : ('a -> 'b)
val v : Async<'a>
member AsyncBuilder.Bind : computation:Async<'T> * binder:('T -> Async<'U>) -> Async<'U>
member AsyncBuilder.Return : value:'T -> Async<'T>
val works : Async<obj> list
  type: Async<obj> list
val box : 'T -> obj

Full name: Microsoft.FSharp.Core.Operators.box
val res : obj []
  type: obj []
  inherits: Array
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
val unbox : obj -> 'T

Full name: Microsoft.FSharp.Core.Operators.unbox
val after : int -> 'a -> Async<'a>

Full name: Blog.after


 Returns the specified value 'v' after 'time' milliseconds
val time : int
  type: int
  inherits: ValueType
val v : 'a
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val a : bool
  type: bool
  inherits: ValueType
val b : bool
  type: bool
  inherits: ValueType
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit
member AsyncBuilder.Alias : a:Async<'a> -> Async<Async<'a>>

Full name: Blog.Alias


 Returns a computation that, when started, runs the workflow
 given as an argument and returns a new computation that
 can be used to wait for the result of the started workflow
static member Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>

Published: Friday, 23 March 2012, 5:21 PM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: asynchronous, f#, research, joinads