TryJoinads (VII.) - Implementing joinads for async workflows
The article Asynchronous workflows and joinads gives numerous
examples of programming with asynchronous workflows using the match!
construct.
Briefly, when matching on multiple asynchronous workflows, they are executed in
parallel. When pattern matching consists of multiple clauses, the clause that matches
on computations that complete first gets executed. These two behaviours are
implemented by the Merge
and the Choose
operation of joinads. Additionally,
asynchronous workflows require the Alias
operation, which makes it possible to
share the result of a started asynchronous workflow in multiple clauses.
In this article, we look at the definition of the additional AsyncBuilder
operations that enable the match!
syntax. We do not look at additional examples
of using the syntax, because these can be found in a previous article.
Note: This blog post is a re-publication of a tutorial from the TryJoinads.org web page. If you read the article there, you can run the examples interactively and experiment with them: view the article on TryJoinads.
Non-deterministic choice and parallel composition
Parallel composition of asynchronous workflows is implemented in the standard F#
library by a combinator Async.Parallel
, but a choice operation that returns the
result of a first workflow is not available out-of-box. The following snippet shows
a simple implementation that uses locking (and is based on an F# Snippet
implementing Async.Choose
).
Next, the snippet extends standard F# async
computation builder with Fail
,
Choose
and Merge
:
open System /// Add 'WhenAny' member to the 'Async' type type Async = /// Creates an asynchronous workflow that non-deterministically returns the /// result of one of the two specified workflows (the one that completes /// first). This is similar to Task.WhenAny. static member WhenAny([<ParamArray>] works:Async<'T>[]) : Async<'T> = (Implementation omitted) /// Extend the standard F# 'async' builder with /// additional operations to support joinads type Microsoft.FSharp.Control.AsyncBuilder with /// Non-deterministically choose the first computation /// that succeeds; fails only when both computations fail member x.Choose(a, b) = Async.WhenAny(a, b) /// Represents a failed computation. It holds that: /// async.Choose(work, async.Fail()) = work member x.Fail<'T>() : Async<'T> = async { return failwith "failed!" } /// Run the specified two computations in parallel and /// return their results as a tuple member x.Merge(a:Async<'T1>, b:Async<'T2>) : Async<'T1 * 'T2> = async { let map f v = async.Bind(v, f >> async.Return) let works = [ map box a; map box b ] let! res = Async.Parallel works return unbox res.[0], unbox res.[1] }
The operations implemented in the above snippet are fairly straightforward. For simplicity, the
Fail
operation is implemented using exceptions. This serves the required purpose, because Fail
is used to represent failed pattern matching. When combined using Choose
, the exception is
only propagated if all clauses fail. The Merge
operation is implemented using Async.Parallel
and uses just a bit of additional wrapping and unwrapping, because Parallel
takes an
array of computations instead of a tuple.
The snippet itentionally does not add the Alias
operation in order to demonstrate the difference
in the behaviour. If you opened the namespace FSharp.Extensions.Joinads
before, you might want
to restart the F# Interactive session (right click in the console and select "Reset Session").
To get the expected behaviour, evaluating the expression async.Alias
should say that it is undefined.
Example: The initial attempt
The following example demonstrates the behaviour of match!
for asynchronous workflows without
the definition of Alias
operation. It is similar to the parallel programming example from
the introduction. The after
function produces a given value after certain time
of waiting and it prints a message before returning, so that we can track how many times it was
started. The rest of the snippet implements "or" operator for Async<bool>
values with
shortcircuiting:
/// Returns the specified value 'v' after 'time' milliseconds let after time v = async { do! Async.Sleep(time) // Print when returning to show how many // times the function is being executed printfn "Returning: %A" v return v } // Run this example and check how many times 'after' gets called async { match! after 1000 false, after 2000 false with | true, ? -> printfn "First true" | ?, true -> printfn "Second true" | a, b -> printfn "Final: %b" (a || b) } |> Async.Start
When you run the example, you can see that it prints "Returning: false" four times. This may
be slightly unexpected, because match!
is used with just two arguments! To understand how this
is possible, let's look how the expression above is translated. The following is not complete
translation, but it should give you the idea:
let after1000 = after 1000 false let after2000 = after 2000 false async.Choose (async.Bind(after1000, function ...) async.Choose (async.Bind(after2000, function ...) async.Bind (async.Merge(after1000, after2000) function ...)
For every clause, the translation identifies all arguments of match!
that are matched against
a binding pattern (any pattern other than ?
). If there are multiple such patterns (i.e. the
last clause), they are combined using Merge
. The workflows are then passed to Bind
, together
with a function that performs the pattern matching.
Unlike Task<'T>
, the type Async<'T>
represents a delayed computation. When composing
computations using Marge
or Choose
, we get a computation that starts both of the arguments
(as new instances of the workflow). This means that the computation in the above example starts
both after1000
and after2000
two times. This is not a problem for tasks, because a value
Task<'T>
represents a running computation that can be accessed multiple times. In order to get
the same behaviour for asynchronous workflows, we need to explicitly implement aliasing.
Adding computation aliasing
The aliasing can be added by implementing an operation Alias
of type Async<'T> -> Async<Async<'T>>
.
The operation returns a computation that, when executed, starts the workflow specified as
argument and returns a new computation that can be used to wait for the result (but without
starting a new instance of the argument).
In F#, this is exactly what Async.StartChild
does, so adding the operation to AsyncBuilder
is simple:
/// Adds aliasing to the F# 'async' builder type Microsoft.FSharp.Control.AsyncBuilder with /// Returns a computation that, when started, runs the workflow /// given as an argument and returns a new computation that /// can be used to wait for the result of the started workflow member x.Alias(a) = Async.StartChild(a)
When the computation builder defines the Alias
operation, the translation automatically applies
the operation to all computations that are passed as arguments to match!
. For the previous
example, this means that both workflows after 1000 false
and after 2000 false
will be
started just once. A workflow that represents aliased computation waiting for their completion
can be used repeatedly. If you run the example again, the message is printed just once for
each computation:
// Run this example and check how many times 'after' gets called async { match! after 1000 false, after 2000 false with | true, ? -> printfn "First true" | ?, true -> printfn "Second true" | a, b -> printfn "Final: %b" (a || b) } |> Async.Start
When you run the example (after running the previous snippet which defines Alias
), the
"Returning" message should be printed only twice. Again, I won't show the exact translation,
but it looks roughly as follows:
async.Bind(async.Alias(after 1000 false), fun after1000 -> async.Bind(async.Alias(after 2000 false), fun after2000 -> async.Choose (async.Bind(after1000, function ...) async.Choose (async.Bind(after2000, function ...) async.Bind (async.Merge(after1000, after2000) function ...)))
When executed, the computation applies Alias
to all arguments and then binds on the outer
monadic value (the Alias
operation returns a value of type M<M<'T>>
). For asynchronous
workflows, this starts the computation in background. The value assigned to after1000
and
after2000
has a type Async<'T>
. When started, it simply waits until the background
computation completes.
The Alias
operation deserves attention on its own. In particular, it can be used to abstract
away the evaluation strategy of a monadic computation. If it is inserted at appropriate places
in monadic code, it can be defined to give call-by-name or call-by-value evaluation
strategy. You can find more information in a related paper on the publications page.
Summary
This article demonstrated how to add support for match!
to F# asynchronous workflows. This
can be used to implement various programming patterns including I/O and user interfaces. The
examples can be found in another article.
To support joinads in asynchronous workflows, we need to define Merge
, Choose
and Fail
.
This makes it possible to compose computations in both directions - horizontally and vertically.
As asynchronous workflows represent delayed computations, we also need to explicitly implement
the support for aliasing. This is done by defining Alias
member of type M<'T> -> M<M<'T>>
.
For asynchronous workflows, this operation is already provided by Async.StartChild
.
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
--------------------
type Async =
class
static member WhenAny : works:Async<'T> [] -> Async<'T>
end
Full name: Blog.Async
Add 'WhenAny' member to the 'Async' type
Full name: Blog.Async.WhenAny
Creates an asynchronous workflow that non-deterministically returns the
result of one of the two specified workflows (the one that completes
first). This is similar to Task.WhenAny.
class
inherit System.Attribute
new : unit -> System.ParamArrayAttribute
end
Full name: System.ParamArrayAttribute
type: ParamArrayAttribute
inherits: Attribute
type: Async<'T> []
inherits: Array
// Results from the two
let results = Array.map (fun _ -> Choice1Of3()) works
let handled = ref false
let lockObj = new obj()
let synchronized f = lock lockObj f
// Called when one of the workflows completes
let complete () =
let op =
synchronized (fun () ->
// If we already handled result (and called continuation)
// then ignore. Otherwise, if the computation succeeds, then
// run the continuation and mark state as handled.
// Only throw if all workflows failed.
if !handled then ignore
else
let succ = Seq.tryPick (function Choice2Of3 v -> Some v | _ -> None) results
match succ with
| Some value -> handled := true; (fun () -> cont value)
| _ ->
if Seq.forall (function Choice3Of3 _ -> true | _ -> false) results then
let exs = Array.map (function Choice3Of3 ex -> ex | _ -> failwith "!") results
(fun () -> econt (AggregateException(exs)))
else ignore )
// Actually run the continuation
// (this shouldn't be done in the lock)
op()
// Run a workflow and write result (or exception to a ref cell)
let run index workflow = async {
try
let! res = workflow
synchronized (fun () -> results.[index] <- Choice2Of3 res)
with e ->
synchronized (fun () -> results.[index] <- Choice3Of3 e)
complete() }
// Start all work items - using StartImmediate, because it
// should be started on the current synchronization context
works |> Seq.iteri (fun index work ->
Async.StartImmediate(run index work)) )
class
private new : unit -> AsyncBuilder
member Bind : computation:Async<'T> * binder:('T -> Async<'U>) -> Async<'U>
member Combine : computation1:Async<unit> * computation2:Async<'T> -> Async<'T>
member Delay : generator:(unit -> Async<'T>) -> Async<'T>
member For : sequence:seq<'T> * body:('T -> Async<unit>) -> Async<unit>
member Return : value:'T -> Async<'T>
member ReturnFrom : computation:Async<'T> -> Async<'T>
member TryFinally : computation:Async<'T> * compensation:(unit -> unit) -> Async<'T>
member TryWith : computation:Async<'T> * catchHandler:(exn -> Async<'T>) -> Async<'T>
member Using : resource:'T * binder:('T -> Async<'U>) -> Async<'U> (requires 'T :> IDisposable)
member While : guard:(unit -> bool) * computation:Async<unit> -> Async<unit>
member Zero : unit -> Async<unit>
end
Full name: Microsoft.FSharp.Control.AsyncBuilder
Full name: Blog.Choose
Non-deterministically choose the first computation
that succeeds; fails only when both computations fail
Creates an asynchronous workflow that non-deterministically returns the
result of one of the two specified workflows (the one that completes
first). This is similar to Task.WhenAny.
Full name: Blog.Fail
Represents a failed computation. It holds that:
async.Choose(work, async.Fail()) = work
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Full name: Microsoft.FSharp.Core.Operators.failwith
Full name: Blog.Merge
Run the specified two computations in parallel and
return their results as a tuple
type: Async<obj> list
Full name: Microsoft.FSharp.Core.Operators.box
type: obj []
inherits: Array
Full name: Microsoft.FSharp.Core.Operators.unbox
Full name: Blog.after
Returns the specified value 'v' after 'time' milliseconds
type: int
inherits: ValueType
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
type: bool
inherits: ValueType
type: bool
inherits: ValueType
Full name: Blog.Alias
Returns a computation that, when started, runs the workflow
given as an argument and returns a new computation that
can be used to wait for the result of the started workflow
Published: Friday, 23 March 2012, 5:21 PM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: asynchronous, f#, research, joinads