Explicit speculative parallelism for Haskell's Par monad

Haskell provides quite a few ways for writing parallel programs, but none of them is fully automatic. The programmer has to use some annotations or library to run computations in parallel explicitly. The most recent paper (and library) for writing parallel programs follows the latter approach. You can find more information about the library in a paper by Simon Marlow et al. A monad for deterministic parallelism [1] and it is also available on Hackage. However, I'll explain all the important bits that I'll use in this article.

The library provides an explicit way for writing parallel programs using a Par monad. The library contains constructs for spawning computations and sharing state using blocking variables. However, the whole programming model is fully deterministic. I believe that it is sometimes useful to lift the determinacy requirement. Some programs are deterministic, but the fact cannot be (easily) automatically verified. For example, say you have two functions fib1 and fib2. They both give the same result, but each of them is more efficient than the other one on certain inputs. To calculate a Fibonacci number, the program could speculatively try to evaluate both functions in parallel and return the result of the one that finishes first.

Unfortunately, this cannot be safely implemented using a fully deterministic library. In this article, I'll show some examples where speculative parallelism can be useful and I'll talk about an extension to the Par monad that I implemented (available on GitHub). The extension allows programmers to write speculative computations, provided that they manually verify that their code is deterministic.

Introduction

In this section, I'll show a basic example that demonstrates how the Par monad works. I'll use a contrived example of calculating Fibonacci numbers to keep things simple. You can find better examples in the Par monad examples repository. The performance measurements that I'll include in this article are just for demonstration (executed on Intel Core 2 Duo CPU). I'll also briefly introduce the unamb operator [3] designed by Conal Elliott. This solves a very important problem, originally in Functional Reactive Programming [2], which is closely related to speculative computations.

Deterministic parallelism monad

The parallelism wraps a computation inside a monad. A parallel version of a function that returns Int64 will have a return type Par Int64. The type represents a computation that can be started (using the runPar function).

Here is a basic example:

-- Naive calculation of Fibonacci numbers
fib :: Int64 -> Int64
fib x | x < 1 = 1
fib x = fib (x-2) + fib (x-1)

-- Parallel version of the function
pfib :: Int64 -> Par Int64
pfib n | n < 25 = return (fib n)
pfib n = do 
  av <- spawn (pfib (n - 1))
  b  <- pfib (n - 2)  
  a  <- get av
  return (a + b)

The fib function is a usual (non-parallel) function for calculating Fibonacci numbers. The pfib function is a parallel version (as you can see from the type signature). When the argument is smaller than 25, it runs sequential fib to avoid generating too many small work items. If the argument is sufficiently large, the function spawns a computation to calculate pfib (n-1) in parallel. The type of the spawn function is:

spawn :: NFData a => Par a -> Par (IVar a)

The NFData requirement specifies that it should be possible to fully evaluate the data structure. The rest of the signature says that the function takes some computations and returns a variable (wrapped in Par) that will eventually contain the result of the computation. When writing code in the Par monad, we can get a value from the variable using get function. The function blocks the computation until a value becomes available and has the following type:

get :: IVar a -> Par a

Now it should be easy to understand the rest of the pfib body. It spawns one recursive computation (as a background task), then runs the second recursive call on the main thread and then waits until the background operation completes. Finally, it returns the sum of the results.

The following simple snippet shows how to run the two functions and it shows some approximate performance measurements:

main = do 
  -- Naive sequential fibonacci (1.8 sec)
  print $ fib 32
  -- Parallel version with 25 threshold (1.0 sec)
  print $ runPar $ pfib 32

When we have a value of type Par a, we can evaluate it (to get an a value) using the runPar function. When running the parallel function on dual-core machine, it gives you a slightly less than 2x speedup. For comparison, a version implemented using the par and pseq combinators (which is a lower-level parallelism mechanism in GHC) completes in approximately 0.95 seconds.

Finally, it is worth mentioning that the programming model used by the Par monad is very similar to the model used by async in F#. The spawn function corresponds to Async.StartChild and the runPar does the same thing as Async.RunSynchronously.

Unambiguous choice

Before discussing the support for speculative programming, I'd like to shortly discuss the unambiguous choice operator, which was created by Conal Elliott for his Functional Reactive Programming (FRP) implementation, but is also available as a separate package on Hackage.

The operator is a function of type a -> a -> a. It takes two arguments, starts evaluating them concurrently and returns a value that becomes available first. When using the operator, the programmer needs to make sure that the two arguments are compatible. This means that if they both finish evaluating, their values are equal. Formally, the requirement looks like this:

compatible a b = (a = ⊥) | (b = ⊥) | (a = b)

The operator can be used for two purposes. First, it can be used to deal with computations that do not terminate. For example, say you have an expression a && b. When b is False, the result of the expression should be (logically) also False. However, when a doesn't terminate, the overall expression will also never yield a value, because it will start evaluating a first. Using unamb, you can write:

import Data.Unamb

c = unamb (a && b) (b && a)

This use clearly matches the compatibility requirement. When both arguments terminate, the result is same in both of the cases. However, the expression returns False even if a is non-terminating computation, because the second argument will evaluate to False.

The second possible use of unamb is for simple speculative evaluation. I described a typical scenario earlier. We have two functions and one is faster for some of the inputs, but not for all of them. Using unamb, we can write the following helper that takes two functions and returns the value that is produced first:

tryBoth :: (t -> a) -> (t -> a) -> t -> a
tryBoth f1 f2 n = unamb (f1 n) (f2 n)

To demonstrate how the function works, I implemented a more clever function for calculating Fibonacci numbers (ffib), which counts from zero and keeps adding last two generated numbers. If you want to play with it, you can find the complete source code at the end of the article. The following example shows the results:

main = do
  -- Naive recursive implementation (~1.7 sec)
  print fib 32
  -- Efficient implementation (~0.01 sec)
  print ffib 32
  -- Tries evaluating result using both functions (~0.01 sec)
  tryBoth fib ffib 32 

In this example, we could just call ffib, because it will be always faster. However, that's not always the case. You might have a very fast function that almost always works, but sometimes fails and a backup function that is slow, but always works.

The unamb operator doesn't break determinacy of code only when it is used properly. The programmer needs to make sure that the arguments are compatible. However, I believe that this kind of construct is very useful and I hope that the previous examples demonstrated that. In the rest of the article, I'll describe my extension that allows you to use similar concepts in the Par monad.

Speculative parallelism for Par monad

The extension (available on GitHub) adds support for explicit cancellation of computations. When starting a computation, you can give it a cancellation token. The token can be later used (from another computation) to cancel the background task. The design of the extension is very similar to how cancellation works in F# asynchronous workflows and in Task Parallel Library (TPL) in .NET.

Parallel unambiguous choice

The support for cancellation is a low-level control structure and it is not expected that it will be used directly very often. However, you can use it to implement your own functions that provide higher-level of abstraction. For example, it is quite easy to implement unamb operator for Par a values:

-- Unambiguous choice between two parallel computations
-- Assumes that computations p1 and p2 are compatible
punamb :: NFData a => Par a -> Par a -> Par a
punamb p1 p2 = do
  res <- newBlocking
  ct1 <- newCancelToken
  ct2 <- newCancelToken
  forkWith ct1 (do
    a <- p1; put res a; cancel ct2)
  forkWith ct2 (do
    a <- p2; put res a; cancel ct1)
  v <- get res
  return v

The type of the function specifies that the value returned by the function can be fully evaluated, but it is equally easy to define a version that evaluates the value to a head-strict form (using put_ instead of put). The body of the punamb first creates some object for implementing the synchronization. It uses newBlocking to create a variable (of type IVar a) for storing the result.

A variable created using newBlocking behaves slightly differently than standard variables created using new. When you attempt to write a value to a standard variable for the second time, the writing fails and causes an exception. However, variables created using newBlocking behave differently - instead of causing an exception, the second write is just blocked (indefinitely) and the operation never completes. This makes it possible to implement a race between two computations. The next two values ct1 and ct2 are cancellation tokens that are later used to cancel running computations.

After the initialization, the body starts two background computations using the forkWith function. The function takes a CancelToken as the first argument and a computation of type Par () as the second argument. When called (from the Par) monad, it starts the computation in background and associates the cancellation token with it. This allows others to cancel the computation.

The two computations started using forkWith are quite simple. Each of them evaluates one of the punamb's arguments, then attempts to cancel the other computation and stores the result to the res variable. In a typical case, one of the computations completes first and canceles the other (which is still running). However, if both complete at the same time, then one wins the race and writes value to res. The other attempts to write the result, but gets blocked. The race doesn't affect determinism, as long as the two computations given as arguments to punamb are compatible.

The following snippet uses punamb to implement an example similar to the earlier demonstration of unamb. The pffib function (not shown in this article) is a faster implementation of Fibonacci numbers embedded in the Par monad:

main = do
  -- Naive recursion using Par monad (~1.1 sec)
  print $ runPar $ pfib 32
  -- Efficient recursion using Par monad (~0.01 sec)
  print $ runPar $ pffib 32
  -- Tries evaluating both & returns the first (~0.04 sec)
  print $ runPar $ punamb (pfib 32) (pffib 32)

As you would expect, the punamb operation behaves similarly to unamb. When given two computations, it finishes after the first one completes. The time is slightly larger, because the first computation (running in parallel) also occupies some of the resources.

You may be wondering how the cancellation works - at which point is a Par computation cancelled when another computation calls cancel. The following section briefly explains how is the cancellation implemented in my modification of the package.

How are computations cancelled?

The cancellation for the Par monad is cooperative, which means that the computations can be cancelled only when they are written in a particular way that permits cancellation. In particular, the cancel function will not be able to cancel a Par computation that is created by writing return foo (where foo is some expression).

A computation can be cancelled at points when it performs some special computation in the Par monad. Most importantly, this includes the monadic binding (as well as other special calls like forking). The pfib function (demonstrated above) contains many such points. It can be cancelled when it tries to spawn a recursive call (in background), when it recursively calls itsef as well as when it tries to read the result of the background computation.

You can exlpore the source code of my modification on GitHub. The commit that adds the cancellation support (see here) shows the difference compared with the original version. Implementing an un-cooperative cancellation (as, for example, in unamb) would be also possible, but more difficult. However, I believe that the cooperative cancellation model works very well for the Par monad, because parallel computations generally consist of large number of small (atomic) steps.

To finish the blog post, the last section shows a single larger example of implementing tree processing as a speculative computation explicitly using cancellation.

Speculative tree processing

In this section, I present a larger example that uses the cancellation support to implement a speculative processing of a tree. We look how to implement a forall function that checks whether a specified predicate holds for all leaves of the tree. When the function returns False for some value, the overall result will also be false. In this case, the function can cancel all parallel tasks and return immediately.

The example uses the following simple tree structure:

data Tree a 
  = Leaf a
  | Node (Tree a) (Tree a)

The following snippet shows a basic parallel version of forall. It uses the same recursive parallel processing pattern that was used earlier in the pfib function:

-- Test whether all elements match the given predicate (in parallel)
forallp :: (a -> Bool) -> Tree a -> Par Bool
forallp p (Leaf num) = return $ p num
forallp p (Node left right) = do
  al' <- spawn $ forallp p left
  ar  <- forallp p right
  al  <- get al'
  return (al && ar)

When processing a leaf, the function calls the predicate and returns the result. When processing a node, it spawns processing of the left sub-tree as a background operation (using spawn, which is implemented using fork) and then processes the right sub-tree. After both operations complete, it returns logical conjunction of the two results.

When implementing the speculative version, we need to spawn two background computations (to start processing the left and right sub-tree). When any of them completes returning False, we can immediately store the result in some variable, which unblocks the main computation (and it can return the overall result). When the result is False, the process needs to wait until the other computation completes before it can set the final result.

The following snippet shows the speculative version:

-- Test whether all elements match the given predicate
-- (Speculatively - False is returned immediately)
foralls :: (a -> Bool) -> Tree a -> Par Bool
foralls p tree = do
    tok <- newCancelToken
    r <- forall' p tok tree
    cancel tok 
    return r
  where 
    -- Recursively start computations using the same cancellation token
    forall' p tok (Leaf v) = return (p v)
    forall' p tok (Node left right) = do
      leftRes <- new
      rightRes <- new
      finalRes <- newBlocking
      forkWith tok (forall' p tok left >>= 
        completed leftRes rightRes finalRes)
      forkWith tok (forall' p tok right >>= 
        completed rightRes leftRes finalRes)
      get finalRes

    
    -- Called when one of the branches complete. Write result immediately
    -- if it is False, otherwise wait for the second computation.
    completed varA varB fin resA = do
      put varA resA
      if not resA then put fin False
      else get varB >>= put fin . (&& resA)

The main body of the function creates a new CancelToken and then calls a helper that does the actual processing. The cancellation token is used when starting any background computations during the recursive processing. As a result, when the helper function returns False (meaning that some computations may be still running), we can cancel all (still running, but unnecessary) computations using this single token.

The function that implements the recursive processing first creates three variables. The first two (leftRes and rightRes) are used by their corresponding computations (when a left computation completes, it writes result to leftRes). The last variable is created using the newBlocking function, because it can be accessed by any of the two computations. Its value is set to the final result as soon as possible. The function then spawns two tasks to process sub-trees and waits for the final result.

The two background computations spawned by the function both make a recursive call and then pass the result to the completed function. It takes all three variables (variable of the currently completed computation, variable of the other computation and a final result variable) and the result of the computation. If the result is False, then it sets the final result. Otherwise, it waits until the other computation completes (by blocking on the other variable) and then sets the final result (in this case, the computation may race with the other call to completed, but that is not a problem - they both produce the same value and the second one will just block when it attempts to write).

The complete source code contains a full example that you can run to test the function. Assuming we have a tree value named tree, we can write the following test. It demonstrates the performance of a simple sequential version, parallel version and a speculative version from the last listing:

main = do
  -- Binary tree contains ~270 large primes and a single
  -- non-prime number. The tree is fully evaluated.

  -- Single-threaded version (see source code link) (~3.7 sec)
  measure "Non-prime num (Seq) " (forall isPrime tree2)
  -- Simple parallel version (~1.9 sec)
  measure "Non-prime num (Par) " (runPar $ forallp isPrime tree2)
  -- Parallel version with shortcircuiting (~0.01 sec)
  measure "Non-prime num (Shr) " (runPar $ foralls isPrime tree2)

The example generates a tree containing large prime numbers and one number that is not a prime. Then it ensures that the tree is fully evaluated and processes it using different techniques. Parallel processing is roughly two times faster than sequential version, but a speculative version is faster by orders of magnitude. This is, of course, the case only when the tree contains at least one non-prime number. The performance depends on the data, but in general, the speculative parallelism can be used to implement various useful heuristics.

Summary

In this article, I demonstrated a couple of extensions that I implemented for the Par monad that has been recently proposed by Simon Marlow, Ryan Newton and Simon Peyton Jones. The extension adds functions for cancellation of computations in the monad.

The support for cancellation should be viewed as a low-level mechanism. In practice, it can be used to implement useful higher-level abstractions such as the unamb operator designed by Conal Elliott. The operator provides a high-level way for writing speculative computations (or working with partially undefined functions). When using the operator, the developer is responsible for ensuring that the arguments are compatible (will give the same result).

In this article, I showed an implementation of unamb for the Par monad and also a larger tree-processing example. The tree processing example cannot be elegantly written using unamb, because it requires more expressive abstraction. This abstraction could be, for example, a Haskell version of joinads [4], a language extension that I designed with Don Syme during an internship at Microsoft Research. I'll write more about that in some future blog post...

Downloads & Source code

References

Discuss on twitter, .
Send corrections via GitHub pull requests.