Parallel Programming in F# (III.): Aggregating data
In this part of the Parallel Programming in F# series, we'll explore examples of parallel aggregation
from Chapter 4 of Parallel
Programming with Microsoft .NET, which is a guide that introduces common parallel programming patterns
on .NET 4.0. The C# version of the sample is in details discussed in the guide. In this article,
we'll look at the F# translation and in particular at several functions from the PSeq
module. Some of the functionality is currently available only in the "PSeq.fs" distributed with the
samples, but will eventually appear in F# PowerPack as well.
Aggregation of data in parallel is an interesting problem. As we've seen in the previous article,
PLINQ and tasks make it easy to parallelize independent blocks of code that don't share any state.
Unfortunatelly, not all programs are like that. In particular, we often need to aggregate all
elements of a sequence - when writing sequential code in F#, you would use the Seq.fold
function. In this article, we'll look at functions that implement fold parallel.
For discussion about other F# examples from Parallel Programming with Microsoft .NET, you can refer to other parts of this article series. So far, the series covers the following topics:
- Parallel Programming in F# (I.): Introducing the samples
- Parallel Programming in F# (II.): Using PLINQ and Tasks
- Parallel Programming in F# (III.): Aggregating data
- Parallel Programming in F# (IV.): Financial dashboard example
Aggregation with Parallel LINQ
We start by looking at the aggregation operators provided by Parallel LINQ. As we've seen in part II.
of this series, there are two ways of calling PLINQ from F#. We can either call the extension
methods (from ParallelEnumerable
type) directly or we can use the F# functions
from the PSeq
module. We
Basic aggregation
Let's start by looking at two functions from the "Examples.fs" file in the "BasicAggregation"
project (Chapter 4). The first function takes an array of floats as argument, normalizes the
values and then adds them. This can be done using built in query operator Sum
or the
PSeq.sum
function:
1: let Chapter4Sample02Plinq (data:_[]) = 2: data.AsParallel() 3: .Select(normalize) 4: .Sum() 5: 6: let Chapter4Sample02PSeq data = 7: data |> PSeq.map normalize 8: |> PSeq.sum
The parallelization of the aggregation operation is a tricky problem. You can find more information about this topic in two blog posts on the PLINQ team blog [1, 2]. A typical use of simple aggregations that do not use CPU intensive function is together with some other operation. In the above example, we first normalize all elements and then sum the results. This means that the sum operator can aggregate elements as they are generated from the normalization code.
The next example will be similar with the difference that we'll provide a custom aggregation function. Instead of adding all numbers, we'll multiply them:
1: let Chapter4Sample03Plinq (data:_[]) = 2: data.AsParallel() 3: .Select(normalize) 4: .Aggregate(1.0, fun y1 y2 -> y1 * y2) 5: 6: let Chapter4Sample03PSeq data = 7: data |> PSeq.map normalize 8: |> PSeq.fold (*) 1.0
In the first version, we use PLINQ extension methods directly. This is relatively convenient thanks
to the fact that F# automatically converts function values to delegates. The second version uses
the PSeq
module and so it looks like idiomatic functional list processing code. For this
reason, I would personally prefer the second option. Also note that the call to AsParallel
is done implicitly whenever we use functions from the PSeq
module.
Social network example
Let's now look at a more complicated aggregation example. This is a more realistic example and so it performs a larger number of operations in a sequence. As a result, PLINQ has better control over the execution and can better optimize the parallel execution.
In this example, we'll have a social network which has a dictionary of members (named subscribers
)
that has the ID of a person as the key. We'll use subscribers.[id]
to find a record with
information about member identified by id
. A member has friends which are stored
as a list of IDs in the Friends
property. The following example implements an algorithm
that suggests possible friends for the given member subId
:
1: let candidates = 2: subscribers.[subId].Friends 3: // Get friends of friends and filter not relevant 4: |> PSeq.collect (fun friend -> subscribers.[friend].Friends) 5: |> PSeq.filter (fun foaf -> 6: // Remove self and existing friends 7: foaf <> subId && not (subscribers.[subId].Friends.Contains(foaf))) 8: // Aggregate friends to get the most frequent 9: |> PSeq.groupBy id 10: |> PSeq.map (fun (k, v) -> new IDMultisetItem(k, v.Count()))
The example consists of two phases. In the first phase, we take all friends of friends
of the current user using PSeq.collect
. This gives us a (possibly) very large
list of possible friends. Then we filter the list to remove the current user (because she
is also a friend of her friends) and we also remove her current friends, because we don't
want to suggest them.
In the second phase, we perform aggregation. As discussed earlier, there is already a lot of
processing before the aggregation, so PLINQ can effectively parallelize the code. The aggregation
is implemented using the groupBy
function. It takes a function that gives us the
key to be used for the aggregation. In the above example, we have a list of integers (IDs), so we
can simply use identity function id
to return the integer value itself. The grouping
operation in PSeq
module returns tuples containing the key and a collection of
values in the group - in the final step we turn this tuple into an object that stores the
ID of a possible friend and number of occurrences. To suggest possible friends, we'll simply
take people who were suggested the most frequently.
Aggregation with Map/Reduce
In this section, we'll look at another example that more directly uses the Map/Reduce pattern. The idea of Map/Reduce is that we split the work into two phases. First phase performs some work that can be easily parallelized and the second phase aggregates the results from the first phase. In some sense, the previous examples were all implementations of this idea, but in this section, we'll look at a more explicit version.
The Map/Reduce pattern [3] has been popularized by Google, which
uses a distributed implementation of a variant of this algorithm for most of their data processing.
However, the idea is as old as functional programming. The name comes from the names of functions
map
and reduce
(which is called fold
in F#).
MapReduce function
In this section, we'll look at the mapReduce
function, which is one way
to implement the Map/Reduce pattern. The function wraps one of the overloads of the
Aggregate
operator from PLINQ and makes it more comfortably usable from F#.
It is available in the PSeq
module distributed with the
Parallel Programming with Microsoft .NET
samples and will be eventually included in F# PowerPack too. Let's start by looking
at the type signature of the function:
val mapReduce :
initf: (unit -> 'TState) -> // (1)
processf: ('TState -> 'TInput -> 'TState) -> // (2)
mergef: ('TState -> 'TState -> 'TState) -> // (3)
resultf: ('TState -> 'TResult) -> // (4)
input: pseq<'TInput> -> // Input data sequence
'TResult // Calculated result
The function looks rather complex, but once you understand what is going on, it should become quite clear. It takes four functions as parameters followed by a parallel input sequence and returns a result, which can be of any type. The following diagram demonstrates what the function does:
The boxes in the upper row represent the input sequence. The sequence is partitioned into
several groups (dynamically, depending on the number of CPUs), which are shown in different colors.
Next, the function processes all elements in each group. Each group is processed on a separate
thread and has a local state that can be mutable (because it will be only accessed on the single thread).
The local state is generated using the initf
(1) function passed as the first
parameter and the processing is done using the processf
(2) function (it takes the
current state and element and returns the new state, but it can also mutate the current state
and return it).
In the next phase, the function aggregates all local states into a single state. This is done
as the individual threads complete, so it can be partly parallelized. The aggregation uses
the mergef
function, which takes two local states and returns a single merged state.
When all states are aggregated, the processing uses resultf
to build the
final result from the state.
MapReduce example
The following example is taken from the "AggregateSimulation" project in Chapter 4. The example generates several values (for example by running a simulation or by looking at colors in a picture) and groups them into a histogram. The histogram is the local state and is represented as a mutable array (so that values can be efficiently grouped). In the second phase, the histograms are combined into a single one that is then returned as the result:
1: /// Runs simulation for several randomly generated inputs and 2: /// stores the result in a bucket of a histogram (sequentially) 3: let doParallelAggregationPSeq count mean stdDev = 4: [ 0 .. count ] 5: |> PSeq.ofList 6: |> PSeq.mapReduce 7: // Initialize local state for each of the partitions 8: (fun () -> makeEmptyHistogram(), new Random(makeRandomSeed())) 9: 10: // Run the simulation, modifying the local state 11: (fun (histogram:int[], rnd:Random) i -> 12: // Perform a simulation trial for the sample value 13: let sample = rnd.NextDouble() 14: let simulationResult = doSimulation sample mean stdDev 15: 16: // Put the result into the histogram of the local accumulator 17: let idx = int (Math.Floor(simulationResult / float bucketSize)) 18: histogram.[idx] <- histogram.[idx] + 1 19: (histogram, rnd)) 20: 21: // Aggregate states for all partitions 22: (fun (histogram1, _) (histogram2, _) -> 23: combineHistograms histogram1 histogram2, null) 24: 25: // Function that extracts result from the aggregated state 26: fst
The 'TState
type is actually a tuple containing a histogram and a random number
generator. The type Random
is not thread-safe, so we need a separate instance
for every thread. The function representing the first phase runs a simulation and adds the
result to the current histogram. The aggregate function uses combineHistograms
to create a new histogram from two local histograms and the last function (fst
)
simply returns the histogram (first element of a tuple) as the final result.
Summary
In this article, we looked at F# versions of some of the samples from Chapter 4 of
Parallel Programming with
Microsoft .NET. The examples show how to parallelize code that needs to aggregate some data.
In order to make the parallelization efficient, it is important to run the aggregation together
with some other processing of the data. This approach is called the Map/Reduce pattern. We've seen
two ways of implementing the pattern. We can use PLINQ operators such as PSeq.groupBy
and
PSeq.fold
in combination with other PLINQ processing or we can use the PSeq.mapReduce
function, which allows us to specify both of the phases at once.
Downloads and References
- Parallel Programming with Microsoft .NET - Book homepage at CodePlex
- F# Code Samples - Parallel Programming with Microsoft .NET
- [1] Parallel Aggregation in PLINQ - Parallel Programming with .NET blog
- [2] More Powerful Aggregations in PLINQ - Parallel Programming with .NET blog
- [3] MapReduce - Wikipedia.org
Full name: Chapter04.Chapter4Sample02Plinq
type: float []
implements: ICloneable
implements: Collections.IList
implements: Collections.ICollection
implements: Collections.IStructuralComparable
implements: Collections.IStructuralEquatable
implements: Collections.Generic.IList<float>
implements: Collections.Generic.ICollection<float>
implements: seq<float>
implements: Collections.IEnumerable
inherits: Array
Collections.IEnumerable.AsParallel() : ParallelQuery
Collections.Generic.IEnumerable.AsParallel<'TSource>() : ParallelQuery<'TSource>
Full name: Chapter04.normalize
General transformation before calculating aggregate sum
Full name: Chapter04.Chapter4Sample02PSeq
type: seq<float>
inherits: Collections.IEnumerable
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.PSeqModule.map
Full name: Microsoft.FSharp.Collections.PSeqModule.sum
Full name: Chapter04.Chapter4Sample03Plinq
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
Full name: Chapter04.Chapter4Sample03PSeq
Full name: Microsoft.FSharp.Collections.PSeqModule.fold
type: ParallelQuery<IDMultisetItem>
implements: seq<IDMultisetItem>
implements: Collections.IEnumerable
inherits: ParallelQuery
type: Dictionary<SubscriberID,Subscriber>
implements: IDictionary<SubscriberID,Subscriber>
implements: ICollection<KeyValuePair<SubscriberID,Subscriber>>
implements: seq<KeyValuePair<SubscriberID,Subscriber>>
implements: Collections.IDictionary
implements: Collections.ICollection
implements: Collections.IEnumerable
implements: Runtime.Serialization.ISerializable
implements: Runtime.Serialization.IDeserializationCallback
Collection of all subscribers in the repository
type: SubscriberID
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.PSeqModule.collect
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
Full name: Microsoft.FSharp.Collections.PSeqModule.filter
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
Full name: Microsoft.FSharp.Core.Operators.not
Full name: Microsoft.FSharp.Collections.PSeqModule.groupBy
Full name: Microsoft.FSharp.Core.Operators.id
Full name: Microsoft.FSharp.Collections.PSeqModule.map
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
type: seq<int>
inherits: Collections.IEnumerable
Full name: Microsoft.Practices.ParallelGuideSamples.SocialNetwork.IDMultisetItem
type: IDMultisetItem
inherits: ValueType
IEnumerable.Count<'TSource>() : int
IEnumerable.Count<'TSource>(predicate: Func<'TSource,bool>) : int
Full name: Script.doParallelAggregationPSeq
Runs simulation for several randomly generated inputs and
stores the result in a bucket of a histogram (sequentially)
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.PSeqModule.ofList
Full name: Microsoft.FSharp.Collections.PSeqModule.mapReduce
Full name: Script.makeEmptyHistogram
class
new : unit -> System.Random
new : int -> System.Random
member Next : unit -> int
member Next : int -> int
member Next : int * int -> int
member NextBytes : System.Byte [] -> unit
member NextDouble : unit -> float
end
Full name: System.Random
Full name: Script.makeRandomSeed
type: int []
implements: ICloneable
implements: Collections.IList
implements: Collections.ICollection
implements: Collections.IStructuralComparable
implements: Collections.IStructuralEquatable
implements: Collections.Generic.IList<int>
implements: Collections.Generic.ICollection<int>
implements: seq<int>
implements: Collections.IEnumerable
inherits: Array
val int : 'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
type: int<'Measure>
implements: IComparable
implements: IConvertible
implements: IFormattable
implements: IComparable<int<'Measure>>
implements: IEquatable<int<'Measure>>
inherits: ValueType
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
Full name: Script.doSimulation
Placeholder for a user-written simulation routine. For example, this
could be a financial simulation that explores various risk outcomes.
This placeholder just transforms the value so that the outputs of
simulation will follow a bell curve.
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
class
static val PI : float
static val E : float
static member Abs : System.SByte -> System.SByte
static member Abs : int16 -> int16
static member Abs : int -> int
static member Abs : int64 -> int64
static member Abs : float32 -> float32
static member Abs : float -> float
static member Abs : decimal -> decimal
static member Acos : float -> float
static member Asin : float -> float
static member Atan : float -> float
static member Atan2 : float * float -> float
static member BigMul : int * int -> int64
static member Ceiling : decimal -> decimal
static member Ceiling : float -> float
static member Cos : float -> float
static member Cosh : float -> float
static member DivRem : int * int * int -> int
static member DivRem : int64 * int64 * int64 -> int64
static member Exp : float -> float
static member Floor : decimal -> decimal
static member Floor : float -> float
static member IEEERemainder : float * float -> float
static member Log : float -> float
static member Log : float * float -> float
static member Log10 : float -> float
static member Max : System.SByte * System.SByte -> System.SByte
static member Max : System.Byte * System.Byte -> System.Byte
static member Max : int16 * int16 -> int16
static member Max : uint16 * uint16 -> uint16
static member Max : int * int -> int
static member Max : uint32 * uint32 -> uint32
static member Max : int64 * int64 -> int64
static member Max : uint64 * uint64 -> uint64
static member Max : float32 * float32 -> float32
static member Max : float * float -> float
static member Max : decimal * decimal -> decimal
static member Min : System.SByte * System.SByte -> System.SByte
static member Min : System.Byte * System.Byte -> System.Byte
static member Min : int16 * int16 -> int16
static member Min : uint16 * uint16 -> uint16
static member Min : int * int -> int
static member Min : uint32 * uint32 -> uint32
static member Min : int64 * int64 -> int64
static member Min : uint64 * uint64 -> uint64
static member Min : float32 * float32 -> float32
static member Min : float * float -> float
static member Min : decimal * decimal -> decimal
static member Pow : float * float -> float
static member Round : float -> float
static member Round : decimal -> decimal
static member Round : float * int -> float
static member Round : float * System.MidpointRounding -> float
static member Round : decimal * int -> decimal
static member Round : decimal * System.MidpointRounding -> decimal
static member Round : float * int * System.MidpointRounding -> float
static member Round : decimal * int * System.MidpointRounding -> decimal
static member Sign : System.SByte -> int
static member Sign : int16 -> int
static member Sign : int -> int
static member Sign : int64 -> int
static member Sign : float32 -> int
static member Sign : float -> int
static member Sign : decimal -> int
static member Sin : float -> float
static member Sinh : float -> float
static member Sqrt : float -> float
static member Tan : float -> float
static member Tanh : float -> float
static member Truncate : decimal -> decimal
static member Truncate : float -> float
end
Full name: System.Math
Math.Floor(d: float) : float
Math.Floor(d: decimal) : decimal
val float : 'T -> float (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.float
--------------------
type float<'Measure> = float
Full name: Microsoft.FSharp.Core.float<_>
type: float<'Measure>
implements: IComparable
implements: IConvertible
implements: IFormattable
implements: IComparable<float<'Measure>>
implements: IEquatable<float<'Measure>>
inherits: ValueType
--------------------
type float = Double
Full name: Microsoft.FSharp.Core.float
type: float
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<float>
implements: IEquatable<float>
inherits: ValueType
Full name: Script.bucketSize
type: int
implements: IComparable
implements: IFormattable
implements: IConvertible
implements: IComparable<int>
implements: IEquatable<int>
inherits: ValueType
type: int []
implements: ICloneable
implements: Collections.IList
implements: Collections.ICollection
implements: Collections.IStructuralComparable
implements: Collections.IStructuralEquatable
implements: Collections.Generic.IList<int>
implements: Collections.Generic.ICollection<int>
implements: seq<int>
implements: Collections.IEnumerable
inherits: Array
type: int []
implements: ICloneable
implements: Collections.IList
implements: Collections.ICollection
implements: Collections.IStructuralComparable
implements: Collections.IStructuralEquatable
implements: Collections.Generic.IList<int>
implements: Collections.Generic.ICollection<int>
implements: seq<int>
implements: Collections.IEnumerable
inherits: Array
Full name: Script.combineHistograms
Full name: Microsoft.FSharp.Core.Operators.fst
Published: Monday, 6 September 2010, 10:20 AM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: functional, parallel, f#