Parallel Programming in F# (III.): Aggregating data

In this part of the Parallel Programming in F# series, we'll explore examples of parallel aggregation from Chapter 4 of Parallel Programming with Microsoft .NET, which is a guide that introduces common parallel programming patterns on .NET 4.0. The C# version of the sample is in details discussed in the guide. In this article, we'll look at the F# translation and in particular at several functions from the PSeq module. Some of the functionality is currently available only in the "PSeq.fs" distributed with the samples, but will eventually appear in F# PowerPack as well.

Aggregation of data in parallel is an interesting problem. As we've seen in the previous article, PLINQ and tasks make it easy to parallelize independent blocks of code that don't share any state. Unfortunatelly, not all programs are like that. In particular, we often need to aggregate all elements of a sequence - when writing sequential code in F#, you would use the Seq.fold function. In this article, we'll look at functions that implement fold parallel.

For discussion about other F# examples from Parallel Programming with Microsoft .NET, you can refer to other parts of this article series. So far, the series covers the following topics:

Aggregation with Parallel LINQ

We start by looking at the aggregation operators provided by Parallel LINQ. As we've seen in part II. of this series, there are two ways of calling PLINQ from F#. We can either call the extension methods (from ParallelEnumerable type) directly or we can use the F# functions from the PSeq module. We

Basic aggregation

Let's start by looking at two functions from the "Examples.fs" file in the "BasicAggregation" project (Chapter 4). The first function takes an array of floats as argument, normalizes the values and then adds them. This can be done using built in query operator Sum or the PSeq.sum function:

1: let Chapter4Sample02Plinq (data:_[]) =
2:     data.AsParallel()
3:         .Select(normalize)
4:         .Sum()
5: 
6: let Chapter4Sample02PSeq data =
7:     data |> PSeq.map normalize
8:          |> PSeq.sum

The parallelization of the aggregation operation is a tricky problem. You can find more information about this topic in two blog posts on the PLINQ team blog [1, 2]. A typical use of simple aggregations that do not use CPU intensive function is together with some other operation. In the above example, we first normalize all elements and then sum the results. This means that the sum operator can aggregate elements as they are generated from the normalization code.

The next example will be similar with the difference that we'll provide a custom aggregation function. Instead of adding all numbers, we'll multiply them:

1: let Chapter4Sample03Plinq (data:_[]) =
2:     data.AsParallel()
3:         .Select(normalize) 
4:         .Aggregate(1.0, fun y1 y2 -> y1 * y2)
5: 
6: let Chapter4Sample03PSeq data =
7:     data |> PSeq.map normalize
8:          |> PSeq.fold (*) 1.0

In the first version, we use PLINQ extension methods directly. This is relatively convenient thanks to the fact that F# automatically converts function values to delegates. The second version uses the PSeq module and so it looks like idiomatic functional list processing code. For this reason, I would personally prefer the second option. Also note that the call to AsParallel is done implicitly whenever we use functions from the PSeq module.

Social network example

Let's now look at a more complicated aggregation example. This is a more realistic example and so it performs a larger number of operations in a sequence. As a result, PLINQ has better control over the execution and can better optimize the parallel execution.

In this example, we'll have a social network which has a dictionary of members (named subscribers) that has the ID of a person as the key. We'll use subscribers.[id] to find a record with information about member identified by id. A member has friends which are stored as a list of IDs in the Friends property. The following example implements an algorithm that suggests possible friends for the given member subId:

 1: let candidates =
 2:   subscribers.[subId].Friends
 3:     // Get friends of friends and filter not relevant
 4:     |> PSeq.collect (fun friend -> subscribers.[friend].Friends)
 5:     |> PSeq.filter (fun foaf ->
 6:         // Remove self and existing friends
 7:         foaf <> subId && not (subscribers.[subId].Friends.Contains(foaf)))
 8:     // Aggregate friends to get the most frequent
 9:     |> PSeq.groupBy id
10:     |> PSeq.map (fun (k, v) -> new IDMultisetItem(k, v.Count()))

The example consists of two phases. In the first phase, we take all friends of friends of the current user using PSeq.collect. This gives us a (possibly) very large list of possible friends. Then we filter the list to remove the current user (because she is also a friend of her friends) and we also remove her current friends, because we don't want to suggest them.

In the second phase, we perform aggregation. As discussed earlier, there is already a lot of processing before the aggregation, so PLINQ can effectively parallelize the code. The aggregation is implemented using the groupBy function. It takes a function that gives us the key to be used for the aggregation. In the above example, we have a list of integers (IDs), so we can simply use identity function id to return the integer value itself. The grouping operation in PSeq module returns tuples containing the key and a collection of values in the group - in the final step we turn this tuple into an object that stores the ID of a possible friend and number of occurrences. To suggest possible friends, we'll simply take people who were suggested the most frequently.

Aggregation with Map/Reduce

In this section, we'll look at another example that more directly uses the Map/Reduce pattern. The idea of Map/Reduce is that we split the work into two phases. First phase performs some work that can be easily parallelized and the second phase aggregates the results from the first phase. In some sense, the previous examples were all implementations of this idea, but in this section, we'll look at a more explicit version.

The Map/Reduce pattern [3] has been popularized by Google, which uses a distributed implementation of a variant of this algorithm for most of their data processing. However, the idea is as old as functional programming. The name comes from the names of functions map and reduce (which is called fold in F#).

MapReduce function

In this section, we'll look at the mapReduce function, which is one way to implement the Map/Reduce pattern. The function wraps one of the overloads of the Aggregate operator from PLINQ and makes it more comfortably usable from F#. It is available in the PSeq module distributed with the Parallel Programming with Microsoft .NET samples and will be eventually included in F# PowerPack too. Let's start by looking at the type signature of the function:

val mapReduce :
    initf:    (unit -> 'TState) ->                 // (1)
    processf: ('TState -> 'TInput -> 'TState) ->   // (2)
    mergef:   ('TState -> 'TState -> 'TState) ->   // (3)
    resultf:  ('TState -> 'TResult) ->             // (4)
    input:    pseq<'TInput> ->     // Input data sequence
              'TResult             // Calculated result

The function looks rather complex, but once you understand what is going on, it should become quite clear. It takes four functions as parameters followed by a parallel input sequence and returns a result, which can be of any type. The following diagram demonstrates what the function does:

The boxes in the upper row represent the input sequence. The sequence is partitioned into several groups (dynamically, depending on the number of CPUs), which are shown in different colors. Next, the function processes all elements in each group. Each group is processed on a separate thread and has a local state that can be mutable (because it will be only accessed on the single thread). The local state is generated using the initf (1) function passed as the first parameter and the processing is done using the processf (2) function (it takes the current state and element and returns the new state, but it can also mutate the current state and return it).

In the next phase, the function aggregates all local states into a single state. This is done as the individual threads complete, so it can be partly parallelized. The aggregation uses the mergef function, which takes two local states and returns a single merged state. When all states are aggregated, the processing uses resultf to build the final result from the state.

MapReduce example

The following example is taken from the "AggregateSimulation" project in Chapter 4. The example generates several values (for example by running a simulation or by looking at colors in a picture) and groups them into a histogram. The histogram is the local state and is represented as a mutable array (so that values can be efficiently grouped). In the second phase, the histograms are combined into a single one that is then returned as the result:

 1: /// Runs simulation for several randomly generated inputs and 
 2: /// stores the result in a bucket of a histogram (sequentially)
 3: let doParallelAggregationPSeq count mean stdDev =
 4:     [ 0 .. count ] 
 5:     |> PSeq.ofList
 6:     |> PSeq.mapReduce
 7:         // Initialize local state for each of the partitions
 8:         (fun () -> makeEmptyHistogram(), new Random(makeRandomSeed()))
 9: 
10:         // Run the simulation, modifying the local state
11:         (fun (histogram:int[], rnd:Random) i ->
12:             // Perform a simulation trial for the sample value
13:             let sample = rnd.NextDouble()
14:             let simulationResult = doSimulation sample mean stdDev
15: 
16:             // Put the result into the histogram of the local accumulator
17:             let idx = int (Math.Floor(simulationResult / float bucketSize))
18:             histogram.[idx] <- histogram.[idx] + 1
19:             (histogram, rnd))
20:         
21:         // Aggregate states for all partitions
22:         (fun (histogram1, _) (histogram2, _) ->
23:             combineHistograms histogram1 histogram2, null)
24:         
25:         // Function that extracts result from the aggregated state
26:         fst

The 'TState type is actually a tuple containing a histogram and a random number generator. The type Random is not thread-safe, so we need a separate instance for every thread. The function representing the first phase runs a simulation and adds the result to the current histogram. The aggregate function uses combineHistograms to create a new histogram from two local histograms and the last function (fst) simply returns the histogram (first element of a tuple) as the final result.

Summary

In this article, we looked at F# versions of some of the samples from Chapter 4 of Parallel Programming with Microsoft .NET. The examples show how to parallelize code that needs to aggregate some data. In order to make the parallelization efficient, it is important to run the aggregation together with some other processing of the data. This approach is called the Map/Reduce pattern. We've seen two ways of implementing the pattern. We can use PLINQ operators such as PSeq.groupBy and PSeq.fold in combination with other PLINQ processing or we can use the PSeq.mapReduce function, which allows us to specify both of the phases at once.

Downloads and References 

val Chapter4Sample02Plinq : float [] -> float

Full name: Chapter04.Chapter4Sample02Plinq
val data : float []

  type: float []
  implements: ICloneable
  implements: Collections.IList
  implements: Collections.ICollection
  implements: Collections.IStructuralComparable
  implements: Collections.IStructuralEquatable
  implements: Collections.Generic.IList<float>
  implements: Collections.Generic.ICollection<float>
  implements: seq<float>
  implements: Collections.IEnumerable
  inherits: Array
Multiple overloads
Collections.IEnumerable.AsParallel() : ParallelQuery
Collections.Generic.IEnumerable.AsParallel<'TSource>() : ParallelQuery<'TSource>
val private normalize : float -> float

Full name: Chapter04.normalize


 General transformation before calculating aggregate sum

val Chapter4Sample02PSeq : seq<float> -> float

Full name: Chapter04.Chapter4Sample02PSeq
val data : seq<float>

  type: seq<float>
  inherits: Collections.IEnumerable
module PSeq

from Microsoft.FSharp.Collections
val map : ('a -> 'b) -> seq<'a> -> ParallelQuery<'b>

Full name: Microsoft.FSharp.Collections.PSeqModule.map
val sum : seq<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.PSeqModule.sum
val Chapter4Sample03Plinq : float [] -> float

Full name: Chapter04.Chapter4Sample03Plinq
val y1 : float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
val y2 : float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
val Chapter4Sample03PSeq : seq<float> -> float

Full name: Chapter04.Chapter4Sample03PSeq
val fold : ('State -> 'T -> 'State) -> 'State -> seq<'T> -> 'State

Full name: Microsoft.FSharp.Collections.PSeqModule.fold
val candidates : ParallelQuery<IDMultisetItem>

  type: ParallelQuery<IDMultisetItem>
  implements: seq<IDMultisetItem>
  implements: Collections.IEnumerable
  inherits: ParallelQuery
val subscribers : Dictionary<SubscriberID,Subscriber>

  type: Dictionary<SubscriberID,Subscriber>
  implements: IDictionary<SubscriberID,Subscriber>
  implements: ICollection<KeyValuePair<SubscriberID,Subscriber>>
  implements: seq<KeyValuePair<SubscriberID,Subscriber>>
  implements: Collections.IDictionary
  implements: Collections.ICollection
  implements: Collections.IEnumerable
  implements: Runtime.Serialization.ISerializable
  implements: Runtime.Serialization.IDeserializationCallback



 Collection of all subscribers in the repository

val subId : SubscriberID

  type: SubscriberID
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
module PSeq

from Microsoft.FSharp.Collections
val collect : ('a -> #seq<'U>) -> seq<'a> -> ParallelQuery<'U>

Full name: Microsoft.FSharp.Collections.PSeqModule.collect
val friend : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val filter : ('a -> bool) -> seq<'a> -> ParallelQuery<'a>

Full name: Microsoft.FSharp.Collections.PSeqModule.filter
val foaf : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val not : bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val groupBy : ('T -> 'Key) -> seq<'T> -> ParallelQuery<'Key * seq<'T>> (requires equality)

Full name: Microsoft.FSharp.Collections.PSeqModule.groupBy
val id : 'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val map : ('a -> 'b) -> seq<'a> -> ParallelQuery<'b>

Full name: Microsoft.FSharp.Collections.PSeqModule.map
val k : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val v : seq<int>

  type: seq<int>
  inherits: Collections.IEnumerable
type IDMultisetItem = KeyValuePair<SubscriberID,int>

Full name: Microsoft.Practices.ParallelGuideSamples.SocialNetwork.IDMultisetItem

  type: IDMultisetItem
  inherits: ValueType
Multiple overloads
IEnumerable.Count<'TSource>() : int
IEnumerable.Count<'TSource>(predicate: Func<'TSource,bool>) : int
val doParallelAggregationPSeq : int -> float -> float -> int []

Full name: Script.doParallelAggregationPSeq


 Runs simulation for several randomly generated inputs and
 stores the result in a bucket of a histogram (sequentially)

val count : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val mean : float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
val stdDev : float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
module PSeq

from Microsoft.FSharp.Collections
val ofList : 'T list -> ParallelQuery<'T>

Full name: Microsoft.FSharp.Collections.PSeqModule.ofList
val mapReduce : (unit -> 'TAccumulate) -> ('TAccumulate -> 'TSource -> 'TAccumulate) -> ('TAccumulate -> 'TAccumulate -> 'TAccumulate) -> ('TAccumulate -> 'TResult) -> pseq<'TSource> -> 'TResult

Full name: Microsoft.FSharp.Collections.PSeqModule.mapReduce
val makeEmptyHistogram : unit -> int []

Full name: Script.makeEmptyHistogram
type Random =
  class
    new : unit -> System.Random
    new : int -> System.Random
    member Next : unit -> int
    member Next : int -> int
    member Next : int * int -> int
    member NextBytes : System.Byte [] -> unit
    member NextDouble : unit -> float
  end

Full name: System.Random
val makeRandomSeed : (unit -> int)

Full name: Script.makeRandomSeed
val histogram : int []

  type: int []
  implements: ICloneable
  implements: Collections.IList
  implements: Collections.ICollection
  implements: Collections.IStructuralComparable
  implements: Collections.IStructuralEquatable
  implements: Collections.Generic.IList<int>
  implements: Collections.Generic.ICollection<int>
  implements: seq<int>
  implements: Collections.IEnumerable
  inherits: Array
Multiple items
val int : 'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------

type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>

  type: int<'Measure>
  implements: IComparable
  implements: IConvertible
  implements: IFormattable
  implements: IComparable<int<'Measure>>
  implements: IEquatable<int<'Measure>>
  inherits: ValueType


--------------------

type int = int32

Full name: Microsoft.FSharp.Core.int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val rnd : Random
val i : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val sample : float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
Random.NextDouble() : float
val simulationResult : float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
val doSimulation : float -> float -> float -> float

Full name: Script.doSimulation


 Placeholder for a user-written simulation routine. For example, this
 could be a financial simulation that explores various risk outcomes.
 This placeholder just transforms the value so that the outputs of
 simulation will follow a bell curve.

val idx : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
type Math =
  class
    static val PI : float
    static val E : float
    static member Abs : System.SByte -> System.SByte
    static member Abs : int16 -> int16
    static member Abs : int -> int
    static member Abs : int64 -> int64
    static member Abs : float32 -> float32
    static member Abs : float -> float
    static member Abs : decimal -> decimal
    static member Acos : float -> float
    static member Asin : float -> float
    static member Atan : float -> float
    static member Atan2 : float * float -> float
    static member BigMul : int * int -> int64
    static member Ceiling : decimal -> decimal
    static member Ceiling : float -> float
    static member Cos : float -> float
    static member Cosh : float -> float
    static member DivRem : int * int * int -> int
    static member DivRem : int64 * int64 * int64 -> int64
    static member Exp : float -> float
    static member Floor : decimal -> decimal
    static member Floor : float -> float
    static member IEEERemainder : float * float -> float
    static member Log : float -> float
    static member Log : float * float -> float
    static member Log10 : float -> float
    static member Max : System.SByte * System.SByte -> System.SByte
    static member Max : System.Byte * System.Byte -> System.Byte
    static member Max : int16 * int16 -> int16
    static member Max : uint16 * uint16 -> uint16
    static member Max : int * int -> int
    static member Max : uint32 * uint32 -> uint32
    static member Max : int64 * int64 -> int64
    static member Max : uint64 * uint64 -> uint64
    static member Max : float32 * float32 -> float32
    static member Max : float * float -> float
    static member Max : decimal * decimal -> decimal
    static member Min : System.SByte * System.SByte -> System.SByte
    static member Min : System.Byte * System.Byte -> System.Byte
    static member Min : int16 * int16 -> int16
    static member Min : uint16 * uint16 -> uint16
    static member Min : int * int -> int
    static member Min : uint32 * uint32 -> uint32
    static member Min : int64 * int64 -> int64
    static member Min : uint64 * uint64 -> uint64
    static member Min : float32 * float32 -> float32
    static member Min : float * float -> float
    static member Min : decimal * decimal -> decimal
    static member Pow : float * float -> float
    static member Round : float -> float
    static member Round : decimal -> decimal
    static member Round : float * int -> float
    static member Round : float * System.MidpointRounding -> float
    static member Round : decimal * int -> decimal
    static member Round : decimal * System.MidpointRounding -> decimal
    static member Round : float * int * System.MidpointRounding -> float
    static member Round : decimal * int * System.MidpointRounding -> decimal
    static member Sign : System.SByte -> int
    static member Sign : int16 -> int
    static member Sign : int -> int
    static member Sign : int64 -> int
    static member Sign : float32 -> int
    static member Sign : float -> int
    static member Sign : decimal -> int
    static member Sin : float -> float
    static member Sinh : float -> float
    static member Sqrt : float -> float
    static member Tan : float -> float
    static member Tanh : float -> float
    static member Truncate : decimal -> decimal
    static member Truncate : float -> float
  end

Full name: System.Math
Multiple overloads
Math.Floor(d: float) : float
Math.Floor(d: decimal) : decimal
Multiple items
val float : 'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------

type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>

  type: float<'Measure>
  implements: IComparable
  implements: IConvertible
  implements: IFormattable
  implements: IComparable<float<'Measure>>
  implements: IEquatable<float<'Measure>>
  inherits: ValueType


--------------------

type float = Double

Full name: Microsoft.FSharp.Core.float

  type: float
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<float>
  implements: IEquatable<float>
  inherits: ValueType
val bucketSize : int

Full name: Script.bucketSize

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val histogram1 : int []

  type: int []
  implements: ICloneable
  implements: Collections.IList
  implements: Collections.ICollection
  implements: Collections.IStructuralComparable
  implements: Collections.IStructuralEquatable
  implements: Collections.Generic.IList<int>
  implements: Collections.Generic.ICollection<int>
  implements: seq<int>
  implements: Collections.IEnumerable
  inherits: Array
val histogram2 : int []

  type: int []
  implements: ICloneable
  implements: Collections.IList
  implements: Collections.ICollection
  implements: Collections.IStructuralComparable
  implements: Collections.IStructuralEquatable
  implements: Collections.Generic.IList<int>
  implements: Collections.Generic.ICollection<int>
  implements: seq<int>
  implements: Collections.IEnumerable
  inherits: Array
val combineHistograms : int [] -> int [] -> int []

Full name: Script.combineHistograms
val fst : ('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst

Discuss on twitter, .
Send corrections via GitHub pull requests.