TryJoinads (II.): Task-based parallelism
The implementation of joinad operations for the Task<'T>
type is quite similar to the
implementation of Async<'T>
, because the two types have similar properties. They both
produce at most one value (or an exception) and they both take some time to complete.
Just like for asynchronous workflows, pattern matching on multiple computations using
match!
gives us a parallel composition (with the two tasks running in parallel) and
choice between clauses is non-deterministic, depending on which clause completes first.
Unlike asynchronous workflows, the Task<'T>
type does not require any support for
aliasing. A value of type Task<'T>
represents a running computation that can be
accessed from multiple parts of program. In this sense, the type Async<'T>
is more
similar to a function unit -> Task<'T>
than to the type Task<'T>
itself.
The key difference between tasks and asynchronous workflows is that the latter provides better support for writing non-blocking computations that involve asynchronous long-running operations such as I/O or waiting for a certain event. Tasks are more suitable for high-performance CPU-intensive computations.
Note: This blog post is a re-publication of a tutorial from the TryJoinads.org web page. If you read the article there, you can run the examples interactively and experiment with them: view the article on TryJoinads.
Parallel list processing
The main example in this article is a tree processing function that can be used to test whether all values in leafs satisfy a given predicate. However, we'll build the example step-by-step, exploring several other examples along the way.
To generate inputs for testing, we will calculate a list containing several large prime
numbers. We will use functional map
and filter
combinators, but we first implement
parallel version of map
(in practice, this is already available in F# libraries or in
the F# PowerPack, but it is an interesting example
of using joinads:
open System.Threading.Tasks open FSharp.Extensions.Joinads /// Applies the specified function to all /// elements of the input list in parallel. let parallelMap (f:'T -> 'R) input = // Recursively process list and spawn tasks let rec loop input = future { match input with | [] -> return [] | x::xs -> // Process current element & the rest of the list match! future { return f x }, loop xs with | y, ys -> return y::ys } // Start the work and wait until it completes (loop input).Result
The function parallelMap
contains a nested recursive function loop
that returns
a task, which is processing a part of the list in parallel. The declaration of
loop
uses a computation builder future { ... }
(which is defined in the
FSharp.Extensions.Joinads
namespace).
The loop
function uses ordinary match
to handle the end of the list. If the
list is non-empty, it uses match!
to perform two tasks in parallel:
- First, it runs a computation that evaluates
f x
- Second, it recursively calls
loop xs
to process the rest of the list
When both computations complete, the body of the clause constructs a list
to be returned. The return type of loop
is Task<list<'T>>
, so the
body of parallelMap
uses the Result
property to obtain the processed list.
The following snippet defines a function that tests whether a number is
prime and compares the performance of sequential and parallel map
function:
// Create a list containing 1000 big numbers let nums = [ for i in 0L .. 1000L -> i + 5000000000000L ] /// Tests whether the specified 64 bit int is a prime let isPrime num = seq { 2L .. int64 (sqrt (float num)) } |> Seq.forall (fun div -> num % div <> 0L) // Turn on the timing and compare the performance #time List.map isPrime nums parallelMap isPrime nums
When you load the code in F# Interactive, you can select and run the #time
directive to enable simple performance measuring. F# Interactive then prints
the result of every entered command, together with the time it took to calculate
it. On the author's machine, the time needed to run the sequential version is
about 8 seconds and the time of the parallel version is about 4.5 seconds.
Building a ballanced tree
Before we can look at tree processing, we need to define a list type and
we need to write a function for constructing lists. The following snippet shows
a standard binary tree declaration together with a function ballancedOfList
that
creates ballanced tree from a non-empty list:
type Tree<'T> = | Leaf of 'T | Node of Tree<'T> * Tree<'T> /// Creates a ballanced tree from a non-empty list /// (odd elements are added to the left and even to the right) let rec ballancedOfList list = match list with | [] -> failwith "Cannot create tree of empty list" | [n] -> Leaf n | _ -> // Split the elements into odd and even using their index let left, right = list |> List.mapi (fun i v -> i, v) |> List.partition (fun (i, v) -> i%2 = 0) // Create ballanced trees for both parts let left, right = List.map snd left, List.map snd right Node(ballancedOfList left, ballancedOfList right)
The function is quite simple and it is only shown to make the sample complete.
We use it to construct two trees that we'll later want to process. The first
tree is generated by taking all primes from the nums
list shown earlier and
the other contains several additional non-prime numbers:
// Create a list with large prime numbers let primes = nums |> parallelMap (fun v -> isPrime v, v) |> List.filter fst |> List.map snd // Create a list with some additional non-primes let mixed = primes @@ [ 2L .. 20L ] // Created ballanced trees from both lists let primeTree = ballancedOfList primes let mixedTree = ballancedOfList mixed
The primeTree
tree contains only large prime numbers, so checking if it
contains only prime numbers will take relatively long. The mixedTree
contains
several additional numbers, some of them are not primes. This means that
running isPrime
on all of the values would take longer, but if we can
return the result immediately after we find a non-prime, the processing is
likely to complete quite quickly.
Parallel tree processing
Let's now implement a forall
function that takes a tree and a predicate
and tests whether the predicate holds for all leafs. We use the future { ... }
computation builder and we use match!
to handle the Node
case by
checking both sub-trees in parallel:
/// Checks whether the specified predicate 'f' /// holds for all Leaf elements of the tree. let forall f tree = let rec loop tree = future { match tree with | Leaf v -> return f v | Node(left, right) -> // Process left and right branch in parallel match! loop left, loop right with | l, r -> return l && r } // Start the recursive processing & wait for the result (loop tree).Result // Test processing on two sample trees forall isPrime mixedTree forall isPrime primeTree
If you run the processing on both mixedTree
and primeTree
, they will
take similarly long time to complete. However, a sequential version of
the function would be faster for mixedTree
, because it would return
false
immediately after finding the first non-prime number.
Adding short-circuiting
Implementing the same functionality using Task<'T>
sounds difficult,
but using joinads, the problem becomes quite simple. We add two additional
clauses that handle the case when one branch completes returning false
.
Aside from this simple change, we also need to make sure that all remaining
tasks, which are not required to complete, get cancelled.
The cancellation is implemented by creating a .NET CancellationTokenSource
before starting the recursive processing. In the body of loop
we then check
if the processing has completed and we throw an exception if it has:
open System.Threading /// Behaves like previous 'forall' function, but returns /// immediately when one of the branches returns 'false' let forall f tree = // Create cancellation token for checking let cts = new CancellationTokenSource() let rec loop tree = future { // Stop processing if the function already returned if cts.Token.IsCancellationRequested then failwith "cancelled" match tree with | Leaf v -> return f v | Node(left, right) -> match! loop left, loop right with | false, ? -> return false | ?, false -> return false | l, r -> return l && r } // Wait for the result & cancel all pending work let res = (loop tree).Result cts.Cancel() res // Processing 'mixedTree' is significanlty faster, // because it returns after first non-prime is found! forall isPrime mixedTree forall isPrime primeTree
The changes required to implement short-circuiting are quite small. As already
mentioned, we added two clauses with patterns false, ?
and ?, false
. These
will match when one of the computation completes and returns false
while the
other is still running. When that happens, the function loop
can return the
final result, but the other task may still continue running.
To actually save CPU power, we need to cancel the other task. This is done using
the standard .NET mechanism. After the task that processes the entire tree
completes, we call cts.Cancel()
to trigger the cancellation. All tasks that
are started from that point will throw an exception (which is okay, because
non-deterministic choice ignores exceptions if the first computation succeeds).
As a result, the processing of mixedTree
is now significantly faster than the
processing of primeTree
. On the author's machine, the first one requires about
0.3s, while the second takes 4 seconds to complete. You can easily test the performance
for different inputs yourself using the #time
directive.
Summary
In principle, the implementation of joinad operations for the Task<'T>
type is
very similar to the implementation for asynchronous workflows as discussed in the
previous article. The main difference is that the underlying type is
different - tasks are designed for CPU-intensive computations. Therefore the applications
in this article were quite different. We used tasks to write a parallel map
operation for lists and then to implement forall
function for trees. The second
was particularly interesting, because joinads make it very easy to implement
shortcircuiting behaviour thanks to the non-deterministic choice between clauses.
Full name: TryJoinads.parallelMap
Applies the specified function to all
elements of the input list in parallel.
type: 'T list
Full name: FSharp.Extensions.Joinads.TopLevelValues.future
type: 'T list
type: 'R list
Full name: TryJoinads.nums
type: int64 list
type: int64
inherits: System.ValueType
Full name: TryJoinads.isPrime
Tests whether the specified 64 bit int is a prime
type: int64
inherits: System.ValueType
val seq : seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
type: seq<'T>
inherits: System.Collections.IEnumerable
val int64 : 'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
type: int64<'Measure>
inherits: System.ValueType
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
type: int64
inherits: System.ValueType
Full name: Microsoft.FSharp.Core.Operators.sqrt
val float : 'T -> float (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.float
--------------------
type float<'Measure> = float
Full name: Microsoft.FSharp.Core.float<_>
type: float<'Measure>
inherits: System.ValueType
--------------------
type float = System.Double
Full name: Microsoft.FSharp.Core.float
type: float
inherits: System.ValueType
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.Seq.forall
type: int64
inherits: System.ValueType
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of 'T * 'T list
with
interface System.Collections.IEnumerable
interface System.Collections.Generic.IEnumerable<'T>
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
end
Full name: Microsoft.FSharp.Collections.List<_>
type: List<'T>
Full name: Microsoft.FSharp.Collections.List.map
| Leaf of 'T
| Node of Tree<'T> * Tree<'T>
Full name: TryJoinads.Tree<_>
type: Tree<'T>
Full name: TryJoinads.ballancedOfList
Creates a ballanced tree from a non-empty list
(odd elements are added to the left and even to the right)
val list : 'a list
type: 'a list
--------------------
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
type: 'T list
Full name: Microsoft.FSharp.Core.Operators.failwith
type: (int * 'a) list
type: (int * 'a) list
Full name: Microsoft.FSharp.Collections.List.mapi
type: int
inherits: System.ValueType
Full name: Microsoft.FSharp.Collections.List.partition
type: 'a list
type: 'a list
Full name: Microsoft.FSharp.Core.Operators.snd
Full name: TryJoinads.primes
type: int64 list
type: int64
inherits: System.ValueType
Full name: Microsoft.FSharp.Collections.List.filter
Full name: Microsoft.FSharp.Core.Operators.fst
Full name: TryJoinads.mixed
type: int64 list
Full name: TryJoinads.primeTree
type: Tree<int64>
Full name: TryJoinads.mixedTree
type: Tree<int64>
Full name: TryJoinads.Parallel.forall
Checks whether the specified predicate 'f'
holds for all Leaf elements of the tree.
type: Tree<'a>
type: Tree<'a>
type: Tree<'a>
type: bool
inherits: System.ValueType
type: bool
inherits: System.ValueType
Full name: TryJoinads.Shortcircuiting.forall
Behaves like previous 'forall' function, but returns
immediately when one of the branches returns 'false'
type: CancellationTokenSource
class
interface System.IDisposable
new : unit -> CancellationTokenSource
member Cancel : unit -> unit
member Dispose : unit -> unit
member Token : CancellationToken
static member CreateLinkedTokenSource : token1:CancellationToken * token2:CancellationToken -> CancellationTokenSource
end
Full name: System.Threading.CancellationTokenSource
type: CancellationTokenSource
type: bool
inherits: System.ValueType
Published: Friday, 17 February 2012, 1:10 PM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: research, f#, parallel, joinads