TP

Parallel Programming in F# (I.): Introducing the samples

Parallel Programming with Microsoft .NET is a guide written by the patterns & practices group at Microsoft. It introduces .NET programmers to patterns for including parallelism in their applications (using support for parallel programming in .NET 4.0). It introduces techniques such as parallel loops, parallel tasks, data aggregation and so on. The book includes numerous samples in C# and Visual Basic that can be easily copied and adapted to match your needs.

As part of a contracting work for the F# team, I developed an F# version of the samples, which is now available on the book web site. You can get it by downloading F# code samples (ZIP) from the 1.0 release, or you can download the latest version of the source code directly. The F# version of the code is mostly a direct translation of the C# versions, but there are a few interesting places that are worth discussing. In particular, some of the samples use the PSeq module from F# PowerPack and I also added a version of one example that uses F# asynchronous workflows.

In this article series, I'll look at several interesting examples from the F# version of the source code. We'll look how to use PLINQ and Tasks (which are available in .NET 4.0) from F# (Part II.) including some advanced topics such as the Map-Reduce algorithm (Part III.). We'll also look at a larger example built using tasks and an alternative implementation using asynchronous workflows (Part IV.) Here are links to the individual articles:

Structure of F# samples

In this article, we'll briefly look at the general structure of F# versions of the samples. In the C# version, there is usually a source file that contains both implementation of the examples and the Main method. In F#, I split the file into two - one file contains the examples (e.g. "Examples.fs") and another file runs them (e.g. "Program.fs"). This makes it possible to add a script file (e.g. "Script.fsx") that allows you to run the samples easily in F# Interactive.

Code file

In this section, we'll briefly look at one example of the main source file that runs examples as a standalone program. For example, the "Program.fs" file from the "BasicTaskSamples" project in Chapter 3 looks as follows:

 1: let main() =
 2:     Console.WriteLine("Basic Parallel Tasks Samples\n")
 3:     // Run several samples and measure the time taken
 4:     SampleUtilities.TimedAction "2 steps, sequential" (fun () ->
 5:         Chapter3Sample01Sequential() )
 6:     SampleUtilities.TimedAction "2 steps, parallel invoke" (fun () ->
 7:         Chapter3Sample01ParallelInvoke() )
 8:     // (Other samples omitted)
 9: 
10: // Run the main testing program in the thread pool
11: do Task.Factory.StartNew(main).Wait()

The sample uses SampleUtilities module (which is defined in the "Utilities" project) to measure the time of an operation. In this case, the main function simply runs all the examples in the project. Some examples provide more options for configuration - you can adjust various constants in the program code or provide them as command line arguments. However, in F# it is more comfortable to run the sample from F# Interactive and adjust the sample configuration directly in the script file...

Script files

To demonstrate a typical script file that can be used to run samples in F# Interactive, we'll look at "Script.fsx" from the "BasicParallelLoops" project in Chapter 2. Every script file first needs to load several references and then loads files that implement the examples:

> #r @@"..\..\Utilities\bin\Release\Utilities.dll"
  #load @@"..\..\PSeq.fs"
  #load "ParallelForExample.fs";;

The first #r directive loads the "Utilities" project, which contains various helper functions that are used by most of the samples (e.g. calculations, simulation of CPU and I/O bound tasks). Next, we load the PSeq module from a file that is distributed as part of the release. This is a local version copied from the latest version of F# PowerPack. The release uses a local copy to make it easier to use the samples (you don't need to install PowerPack) and also because the local copy contains several minor improvements; for example the mapReduce function that we'll use in part III. of this series.

In the next step, we open necessary namespaces and create a configuration for running the samples (in this case, configuration is a simple record that is passed to the example). In F# Interactive we can also use built-in support for time measurement, so we turn it on using #time "on":

> open System
  open Microsoft.Practices.ParallelGuideSamples.BasicParallelLoops.Examples;;

> let opts = { LoopBodyComplexity = 10000; NumberOfSteps = 10000 }
      VerifyResult = false; SimulateInternalError = false };;
val opts : ExampleOptions = { (...) }

> #time "on";;
--> Timing now on

Now we have everything we need to run the samples. Thanks to the interactive environment it is quite easy to run the samples that you're interested in as well as to go back to the configuration, change some properties of the sample and re-run it. The following example compares sequential version of a sample with a parallel version using the PSeq module (which we'll discuss in part II. of the series).

> let results = Chapter2.Example01 opts;; // Sequential 'for'
Real: 00:00:00.986, CPU: 00:00:00.998, GC gen0: 0, gen1: 0, gen2: 0
val results : float [] =
  [|0.0; 250025000.0; 500050000.0; (...) |]

> let results = Chapter2.Example04 opts;; // Parallel using F# PSeq
Real: 00:00:00.543, CPU: 00:00:00.998, GC gen0: 0, gen1: 0, gen2: 0
val results : float [] =
  [|0.0; 250025000.0; 500050000.0; (...) |]

Additional Information

Below are links to the book homepage at CodePlex, F# PowerPack project which is used in some of the examples, blogs by authors of Parallel Programming with .NET and other resources that you may find interesting.

val main : unit -> unit

Full name: Program.main
type Console =
  class
    static member BackgroundColor : System.ConsoleColor with get, set
    static member Beep : unit -> unit
    static member Beep : int * int -> unit
    static member BufferHeight : int with get, set
    static member BufferWidth : int with get, set
    static member CapsLock : bool
    static member Clear : unit -> unit
    static member CursorLeft : int with get, set
    static member CursorSize : int with get, set
    static member CursorTop : int with get, set
    static member CursorVisible : bool with get, set
    static member Error : System.IO.TextWriter
    static member ForegroundColor : System.ConsoleColor with get, set
    static member In : System.IO.TextReader
    static member InputEncoding : System.Text.Encoding with get, set
    static member KeyAvailable : bool
    static member LargestWindowHeight : int
    static member LargestWindowWidth : int
    static member MoveBufferArea : int * int * int * int * int * int -> unit
    static member MoveBufferArea : int * int * int * int * int * int * char * System.ConsoleColor * System.ConsoleColor -> unit
    static member NumberLock : bool
    static member OpenStandardError : unit -> System.IO.Stream
    static member OpenStandardError : int -> System.IO.Stream
    static member OpenStandardInput : unit -> System.IO.Stream
    static member OpenStandardInput : int -> System.IO.Stream
    static member OpenStandardOutput : unit -> System.IO.Stream
    static member OpenStandardOutput : int -> System.IO.Stream
    static member Out : System.IO.TextWriter
    static member OutputEncoding : System.Text.Encoding with get, set
    static member Read : unit -> int
    static member ReadKey : unit -> System.ConsoleKeyInfo
    static member ReadKey : bool -> System.ConsoleKeyInfo
    static member ReadLine : unit -> string
    static member ResetColor : unit -> unit
    static member SetBufferSize : int * int -> unit
    static member SetCursorPosition : int * int -> unit
    static member SetError : System.IO.TextWriter -> unit
    static member SetIn : System.IO.TextReader -> unit
    static member SetOut : System.IO.TextWriter -> unit
    static member SetWindowPosition : int * int -> unit
    static member SetWindowSize : int * int -> unit
    static member Title : string with get, set
    static member TreatControlCAsInput : bool with get, set
    static member WindowHeight : int with get, set
    static member WindowLeft : int with get, set
    static member WindowTop : int with get, set
    static member WindowWidth : int with get, set
    static member Write : bool -> unit
    static member Write : char -> unit
    static member Write : char [] -> unit
    static member Write : float -> unit
    static member Write : decimal -> unit
    static member Write : float32 -> unit
    static member Write : int -> unit
    static member Write : uint32 -> unit
    static member Write : int64 -> unit
    static member Write : uint64 -> unit
    static member Write : obj -> unit
    static member Write : string -> unit
    static member Write : string * obj -> unit
    static member Write : string * obj [] -> unit
    static member Write : string * obj * obj -> unit
    static member Write : char [] * int * int -> unit
    static member Write : string * obj * obj * obj -> unit
    static member Write : string * obj * obj * obj * obj -> unit
    static member WriteLine : unit -> unit
    static member WriteLine : bool -> unit
    static member WriteLine : char -> unit
    static member WriteLine : char [] -> unit
    static member WriteLine : decimal -> unit
    static member WriteLine : float -> unit
    static member WriteLine : float32 -> unit
    static member WriteLine : int -> unit
    static member WriteLine : uint32 -> unit
    static member WriteLine : int64 -> unit
    static member WriteLine : uint64 -> unit
    static member WriteLine : obj -> unit
    static member WriteLine : string -> unit
    static member WriteLine : string * obj -> unit
    static member WriteLine : string * obj [] -> unit
    static member WriteLine : char [] * int * int -> unit
    static member WriteLine : string * obj * obj -> unit
    static member WriteLine : string * obj * obj * obj -> unit
    static member WriteLine : string * obj * obj * obj * obj -> unit
  end

Full name: System.Console
Multiple overloads
Console.WriteLine() : unit
Console.WriteLine(value: string) : unit
Console.WriteLine(value: obj) : unit
Console.WriteLine(value: uint64) : unit
Console.WriteLine(value: int64) : unit
Console.WriteLine(value: uint32) : unit
Console.WriteLine(value: int) : unit
Console.WriteLine(value: float32) : unit
Console.WriteLine(value: float) : unit
Console.WriteLine(value: decimal) : unit
   (+9 other overloads)
module SampleUtilities

from Microsoft.Practices.ParallelGuideSamples.Utilities
val TimedAction : string -> (unit -> 'a) -> 'a

Full name: Microsoft.Practices.ParallelGuideSamples.Utilities.SampleUtilities.TimedAction
val Chapter3Sample01Sequential : unit -> unit

Full name: Microsoft.Practices.ParallelGuideSamples.BasicParallelTasks.Examples.Chapter3Sample01Sequential


 Execute two operations sequentially

val Chapter3Sample01ParallelInvoke : unit -> unit

Full name: Microsoft.Practices.ParallelGuideSamples.BasicParallelTasks.Examples.Chapter3Sample01ParallelInvoke


 Execute operations in parallel using Parallel.Invoke

Multiple items
type Task<'TResult> =
  class
    inherit System.Threading.Tasks.Task
    new : System.Func<'TResult> -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<'TResult> * System.Threading.CancellationToken -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<'TResult> * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<'TResult> * System.Threading.CancellationToken * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<obj,'TResult> * obj -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<obj,'TResult> * obj * System.Threading.CancellationToken -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<obj,'TResult> * obj * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task<'TResult>
    new : System.Func<obj,'TResult> * obj * System.Threading.CancellationToken * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task<'TResult>
    member ContinueWith : System.Action<System.Threading.Tasks.Task<'TResult>> -> System.Threading.Tasks.Task
    member ContinueWith<'TNewResult> : System.Func<System.Threading.Tasks.Task<'TResult>,'TNewResult> -> System.Threading.Tasks.Task<'TNewResult>
    member ContinueWith : System.Action<System.Threading.Tasks.Task<'TResult>> * System.Threading.CancellationToken -> System.Threading.Tasks.Task
    member ContinueWith : System.Action<System.Threading.Tasks.Task<'TResult>> * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task
    member ContinueWith : System.Action<System.Threading.Tasks.Task<'TResult>> * System.Threading.Tasks.TaskContinuationOptions -> System.Threading.Tasks.Task
    member ContinueWith<'TNewResult> : System.Func<System.Threading.Tasks.Task<'TResult>,'TNewResult> * System.Threading.CancellationToken -> System.Threading.Tasks.Task<'TNewResult>
    member ContinueWith<'TNewResult> : System.Func<System.Threading.Tasks.Task<'TResult>,'TNewResult> * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task<'TNewResult>
    member ContinueWith<'TNewResult> : System.Func<System.Threading.Tasks.Task<'TResult>,'TNewResult> * System.Threading.Tasks.TaskContinuationOptions -> System.Threading.Tasks.Task<'TNewResult>
    member ContinueWith : System.Action<System.Threading.Tasks.Task<'TResult>> * System.Threading.CancellationToken * System.Threading.Tasks.TaskContinuationOptions * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task
    member ContinueWith<'TNewResult> : System.Func<System.Threading.Tasks.Task<'TResult>,'TNewResult> * System.Threading.CancellationToken * System.Threading.Tasks.TaskContinuationOptions * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task<'TNewResult>
    member Result : 'TResult with get, set
    static member Factory : System.Threading.Tasks.TaskFactory<'TResult>
  end

Full name: System.Threading.Tasks.Task<_>

  type: Task<'TResult>
  implements: Threading.IThreadPoolWorkItem
  implements: IAsyncResult
  implements: IDisposable
  inherits: Task


--------------------

type Task =
  class
    new : System.Action -> System.Threading.Tasks.Task
    new : System.Action * System.Threading.CancellationToken -> System.Threading.Tasks.Task
    new : System.Action * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task
    new : System.Action * System.Threading.CancellationToken * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task
    new : System.Action<obj> * obj -> System.Threading.Tasks.Task
    new : System.Action<obj> * obj * System.Threading.CancellationToken -> System.Threading.Tasks.Task
    new : System.Action<obj> * obj * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task
    new : System.Action<obj> * obj * System.Threading.CancellationToken * System.Threading.Tasks.TaskCreationOptions -> System.Threading.Tasks.Task
    member AsyncState : obj
    member ContinueWith : System.Action<System.Threading.Tasks.Task> -> System.Threading.Tasks.Task
    member ContinueWith<'TResult> : System.Func<System.Threading.Tasks.Task,'TResult> -> System.Threading.Tasks.Task<'TResult>
    member ContinueWith : System.Action<System.Threading.Tasks.Task> * System.Threading.CancellationToken -> System.Threading.Tasks.Task
    member ContinueWith : System.Action<System.Threading.Tasks.Task> * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task
    member ContinueWith : System.Action<System.Threading.Tasks.Task> * System.Threading.Tasks.TaskContinuationOptions -> System.Threading.Tasks.Task
    member ContinueWith<'TResult> : System.Func<System.Threading.Tasks.Task,'TResult> * System.Threading.CancellationToken -> System.Threading.Tasks.Task<'TResult>
    member ContinueWith<'TResult> : System.Func<System.Threading.Tasks.Task,'TResult> * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task<'TResult>
    member ContinueWith<'TResult> : System.Func<System.Threading.Tasks.Task,'TResult> * System.Threading.Tasks.TaskContinuationOptions -> System.Threading.Tasks.Task<'TResult>
    member ContinueWith : System.Action<System.Threading.Tasks.Task> * System.Threading.CancellationToken * System.Threading.Tasks.TaskContinuationOptions * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task
    member ContinueWith<'TResult> : System.Func<System.Threading.Tasks.Task,'TResult> * System.Threading.CancellationToken * System.Threading.Tasks.TaskContinuationOptions * System.Threading.Tasks.TaskScheduler -> System.Threading.Tasks.Task<'TResult>
    member CreationOptions : System.Threading.Tasks.TaskCreationOptions
    member Dispose : unit -> unit
    member Exception : System.AggregateException
    member Id : int
    member IsCanceled : bool
    member IsCompleted : bool
    member IsFaulted : bool
    member RunSynchronously : unit -> unit
    member RunSynchronously : System.Threading.Tasks.TaskScheduler -> unit
    member Start : unit -> unit
    member Start : System.Threading.Tasks.TaskScheduler -> unit
    member Status : System.Threading.Tasks.TaskStatus
    member Wait : unit -> unit
    member Wait : System.TimeSpan -> bool
    member Wait : System.Threading.CancellationToken -> unit
    member Wait : int -> bool
    member Wait : int * System.Threading.CancellationToken -> bool
    static member CurrentId : System.Nullable<int>
    static member Factory : System.Threading.Tasks.TaskFactory
    static member WaitAll : System.Threading.Tasks.Task [] -> unit
    static member WaitAll : System.Threading.Tasks.Task [] * System.TimeSpan -> bool
    static member WaitAll : System.Threading.Tasks.Task [] * int -> bool
    static member WaitAll : System.Threading.Tasks.Task [] * System.Threading.CancellationToken -> unit
    static member WaitAll : System.Threading.Tasks.Task [] * int * System.Threading.CancellationToken -> bool
    static member WaitAny : System.Threading.Tasks.Task [] -> int
    static member WaitAny : System.Threading.Tasks.Task [] * System.TimeSpan -> int
    static member WaitAny : System.Threading.Tasks.Task [] * System.Threading.CancellationToken -> int
    static member WaitAny : System.Threading.Tasks.Task [] * int -> int
    static member WaitAny : System.Threading.Tasks.Task [] * int * System.Threading.CancellationToken -> int
  end

Full name: System.Threading.Tasks.Task

  type: Task
  implements: Threading.IThreadPoolWorkItem
  implements: IAsyncResult
  implements: IDisposable
Multiple items
property Task.Factory: TaskFactory

--------------------

property Task.Factory: TaskFactory<'TResult>
Multiple items
TaskFactory.StartNew<'TResult>(function: Func<'TResult>) : Task<'TResult>
TaskFactory.StartNew(action: Action) : Task
TaskFactory.StartNew<'TResult>(function: Func<obj,'TResult>, state: obj) : Task<'TResult>
TaskFactory.StartNew<'TResult>(function: Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
TaskFactory.StartNew<'TResult>(function: Func<'TResult>, cancellationToken: Threading.CancellationToken) : Task<'TResult>
TaskFactory.StartNew(action: Action<obj>, state: obj) : Task
TaskFactory.StartNew(action: Action, creationOptions: TaskCreationOptions) : Task
TaskFactory.StartNew(action: Action, cancellationToken: Threading.CancellationToken) : Task
TaskFactory.StartNew<'TResult>(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
TaskFactory.StartNew<'TResult>(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken) : Task<'TResult>
   (+6 other overloads)
--------------------

TaskFactory.StartNew(function: Func<'TResult>) : Task<'TResult>
TaskFactory.StartNew(function: Func<obj,'TResult>, state: obj) : Task<'TResult>
TaskFactory.StartNew(function: Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
TaskFactory.StartNew(function: Func<'TResult>, cancellationToken: Threading.CancellationToken) : Task<'TResult>
TaskFactory.StartNew(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
TaskFactory.StartNew(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken) : Task<'TResult>
TaskFactory.StartNew(function: Func<'TResult>, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions, scheduler: TaskScheduler) : Task<'TResult>
TaskFactory.StartNew(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions, scheduler: TaskScheduler) : Task<'TResult>

Published: Monday, 6 September 2010, 10:00 AM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: functional, parallel, asynchronous, f#