TP

F# Parallel Extras (I.): Image pipeline using agents

In a recent blog post series, I wrote about parallel programming samples that accompany the Parallel Programming with Microsoft .NET book by patterns & practices group at Microsoft. The F# translation of the samples that I wrote about mostly followed the style used in the book, so it used patterns that are typical for C#. However, some of the samples can be written in F# in a more interesting way...

In this article, we'll take a look at agent-based implementation of the Image pipeline example (from chapter 7). A pipeline is a useful pattern if you need to process large number of inputs in parallel and the processing consists of multiple phases or steps. In the original implementation, the pipeline was implemented using BlockingCollection<T> and Task<T> types from .NET 4.0.

In this article, I'll show a version that uses F# agents and asynchronous workflows. We'll use a BlockingQueueAgent<T> type, which is discussed in another article. It represents a queue with limited capacity that asynchronously blocks the process that is adding values if there is no space in the buffer and blocks the process that reads values when there are no values. This type can be elegantly used to implement the pipeline pattern. In this article, we'll demonstrate it by writing a four-phase pipeline that processes images. As you'll see, the agent-based version of the code is very much easier to write and has similar performance as the original version.

Image processing pipeline


Diagram is from the Parallel Programming with Microsoft
.NET
book. Click on the image for a larger version.

We start by looking at code that implements the image processing using the BlockingQueueAgent<T> type (as we'll see, it has a very simple public interface, so using it is quite easy) and then we'll add some user interface. The image processing works in four phases that are demonstrated in the diagram on the right. The diagram is taken from Chapter 7 of the Parallel Programming with Microsoft .NET book, which discusses the pipeline pattern in more details and presents a C# implementation.

The first phase reads images from the disk and stores them into a temporary buffer. The second phase takes images from the buffer, resizes them and puts them into another buffer. The third phase is similar but it adds noise to the image. Finally, the fourth phase takes images from the last buffer and displays them in the user interface.

The intermediate buffers have only limited capacity. When a buffer is full, it will block the caller until an item is removed. Similarly, when it is empty, it will block the process that reads images until an item is added. A pipeline introduces parallelism, because all phases can run in parallel. The intermediate buffers provide a good way of controlling the process, because some phases may be faster - in that case, we want to block it after it generates enough inputs for the next phase.

We start by creating the three intermediate buffers that are used to transfer inputs between phases. They are represented using BlockingQueueAgent<T>, which we'll discuss later. If you place a mouse cursor over the type name in the snippet below, you should see its type signature in a tool tip. The type has members AsyncAdd and AsyncGet, which are both asynchronous (and possibly blocking).

1: let loadedImages = new BlockingQueueAgent<_>(queueLength)
2: let scaledImages = new BlockingQueueAgent<_>(queueLength)    
3: let filteredImages = new BlockingQueueAgent<_>(queueLength)F# Web Snippets

Now that we have the intermediate buffers, we can implement phases of the pipe line. Let's first look at loading and one of the two (similar) processing phases.

Loading and scaling images

We'll implement phases as asynchronous workflows that will read and write items from and to buffers. The loading workflow iterates over all elements in the fileNames sequence (which is an infinite sequence that repeatedly reads all images in the application folder). It uses loadImage function to read the image and then adds it to the first buffer. The processing workflow contains an infinite loop that reads image from one buffer, processes it and adds it to another buffer. Note that asynchronous workflows implicitly support cancellation, so when we want to stop the process, we'll simply set a cancellation token and the infinite loop will end (but we'll talk about that later):

 1: // Phase 1: Load images from disk and put them into a queue
 2: let loadImages = async {
 3:   let clockOffset = Environment.TickCount
 4:   let rec numbers n = seq { yield n; yield! numbers (n + 1) }
 5:   for count, img in fileNames |> Seq.zip (numbers 0) do
 6:     let info = loadImage img sourceDir count clockOffset
 7:     do! loadedImages.AsyncAdd(info) }
 8: 
 9: // Phase 2: Scale to a thumbnail size and add frame
10: let scalePipelinedImages = async {
11:   while true do 
12:     let! info = loadedImages.AsyncGet()
13:     scaleImage info
14:     do! scaledImages.AsyncAdd(info) }F# Web Snippets

The loadImage workflow is slightly more complicated, because it also adds number to every produced image (in the complete application, this is used for verifying that images were processed in the correct order), but both of the functions are relatively straightforward. As already mentioned, the AsyncGet and AsyncAdd methods are asynchronous (possibly block the workflow) and so we need to call them using let! and do! constructs.

We'll skip the second processing workflow (named filterPipelinedImages), because it is exactly the same as scalePipelinedImages. The only difference is that it uses different buffers and another work function (which does the actual processing of the bitmap). As a result, we can move to the last two bits of the implementation...

Displaying images and starting

The last part of the pipeline is an asynchronous workflow displayPipelinedImages that takes images from the last buffer and displays them in the user interface using displayImage function provided by the caller. The function takes care of dispatching the call to the main GUI thread, so we don't have to call it in any special way. Before displaying the image, we also store the current number of items in all three queues, so that the information can be displayed in the user interface. Finally, the snippet (which is a body of a function) starts the pipeline by starting all four asynchronous workflows:

 1: // Phase 4: Display images as they become available
 2: let displayPipelinedImages = async {
 3:   while true do
 4:     let! info = filteredImages.AsyncGet()
 5:     info.QueueCount1 <- loadedImages.Count
 6:     info.QueueCount2 <- scaledImages.Count
 7:     info.QueueCount3 <- filteredImages.Count
 8:     displayImage info }
 9: 
10: // Start workflows that implement pipeline phases
11: Async.Start(loadImages, cts.Token)
12: Async.Start(scalePipelinedImages, cts.Token)
13: Async.Start(filterPipelinedImages, cts.Token)
14: try Async.RunSynchronously(displayPipelinedImages, cancellationToken = cts.Token)
15: with :? OperationCanceledException -> () F# Web Snippets

Note that we pass a cancellation token (cts.Token) from a CancellationTokenSource instance cts to all the computations. When we later want to stop the computation, we'll do that by calling the Cancel method of the cts object. We don't have to do anything else to support cancellation - F# asynchronous workflows automatically check for cancellation flag when performing any asynchronous operation (using let! or do!).

The snippet shows a body of a function that should block until the pipeline processing completes. To do that, we run the last computation using RunSynchronously, which will block until the workflow completes or is cancelled. In the latter case, the method throws OperationCancelledException, which we need to handle.

The sample above is much shorter than the original implementation in the book (the F# version of that is also available in the source code you can download below). This is mainly because of the automatic support for cancellation provided by asynchronous workflows. We don't have to explicitly check whether the cancellation token has been set in each iteration of processing loops. In addition, the original sample also handles exceptions in the processing loops, which is ignored here, but would be easy to add using the try...with construct.

Adding user interface

The original version of the sample uses standard imperative approach to implement the user interface. It keeps some mutable state and implements numerous event handlers that react to actions by updating the mutable state. This was done to keep the code simple, because more advanced approaches (e.g. using the MVVM pattern) often lead to more code. In F#, we'll use asynchronous workflows running on the GUI thread to implement the user interaction. This is a very powerful technique that I described in Chapter 16 of Real-World functional Programming book (there is also a chapter excerpt with some of the information from the book).

Before we start writing asynchronous workflows, we'll use Observable.<...> combinators to create one event value. The application contains a list of RadioButton objects that can be used to select a method used to run the computation (a mode). The original application supports sequential mode and two pipelined modes using BlockingCollection and we add a mode that uses message passing. The following snippet creates an aggregate event that is triggered whenever a radio button is selected:

1: /// Create event that fires when the user selects different mode
2: let checkChanged = 
3:   [ for btn, _ in buttons -> btn.CheckedChanged :> IObservable<_> ]
4:   |> List.reduce Observable.mergeF# Web Snippets

The snippet first extracts RadioButton objects from the list and returns their CheckedChanged event value upcast to IObservable<T> type, so that they can be aggregated using Observable.merge combinator. Now that we have the aggregated event, we can look at a more interesting problem...

The user interface will have two distinct states. The application starts in the waiting state, in which it waits until the user clicks on the "Start" button and it listens to the checkChanged event to update the current computation method. When "Start" is pushed, the application switches to the processing state. In this state, it starts the image processing pipeline (in the background) and waits until the user hits the "Cancel" button. Then it waits until the pipeline completes and switches the state back to waiting. In F#, we can encode this state machine using two asynchronous workflows. We'll later start the workflows on the GUI thread, so we can safely access all WinForms elements from the code:

 1: /// Wait until the user selects 'Start' and handle radiobutton selection
 2: let rec waiting mode = async {
 3:   let! evt = Async.AwaitObservable(checkChanged, form.buttonStart.Click)
 4:   match evt with
 5:   | Choice1Of2(_) ->
 6:       (selecting new mode omitted)
 7:       return! waiting mode
 8:   | Choice2Of2(_) ->
 9:       return! processing mode }
10: 
11: /// Starts the pipeline and waits until it is canceled
12: and processing imageMode = async {
13:   (initialization omitted)
14:   // Start the pipeline and create event that is triggered when it completes
15:   let pipelineCompleted = new Event<_>()
16:   let task = async { 
17:       Pipeline.imagePipelineMainLoop updateFn cts.Token imageMode errorFn  
18:       pipelineCompleted.Trigger() }
19:   Async.Start(task, cts.Token)
20:   
21:   // Wait until the user selects 'Stop' and trigger cancellation
22:   let! _ = form.buttonStop.Click |> Async.AwaitObservable
23:   cts.Cancel() 
24:   // Wait until the pipeline actually completes and switch to waiting
25:   let! _ = pipelineCompleted.Publish |> Async.AwaitObservable
26:   return! waiting imageMode }F# Web Snippets

In the waiting state, we keep the current computation mode as the argument. We use the overloaded Async.AwaitObservable method (you can find it in downloads below) to wait for the first of two events - either the checkChanged event or the Click event of the buttonStart button. The operation returns a value of the generic union type Choice<T1, T2> specifying which of the events occurred. In the first case, we get the new computation mode and continue in the waiting state. In the second case, we switch to the processing state and give it the computation mode as an argument.

The processing workflow doesn't do any looping. It simply waits for a sequence of events. After some initialization, it creates an event pipelineCompleted that will be triggered when the pipeline processing (running in the background) finishes. Then it creates a workflow to run the pipeline and trigger the pipelineCompleted event at the end. The workflow is started using Async.Start and is passed a cancellation token (that is used later). After starting the workflow, we wait until the user clicks on the "Cancel" button using the AwaitObservable primitive. When that happens, we cancel the cancellation token and wait for the event we created (to block until the pipeline actually ends) and then we return back to the waiting mode.

Finally, when the application starts, we need to start the user interface handling in the waiting state:

waiting ImageMode.MessagePassing |> Async.StartImmediateF# Web Snippets

We use the StartImmediate primitive, which runs the workflow on the GUI thread until the first yield point (e.g. let! or do!) and then returns back to the GUI thread using synchronization context. This guarantees that all code in the workflow will run on the GUI thread and so we can safely access user interface elements.

Summary

In this article, we looked at an alternative F# implementation of the pipeline example from Parallel Programming with Microsoft .NET. The implementation uses agents and in particular, the BlockingQueueAgent<T> type (discussed in another article) instead of Task<T> and BlockingCollection<T> types from .NET 4.0.

I tried running various implementations of the pipeline on my dual-core laptop and the performance of the message-passing implementation using workflows is almost as efficient as the original C# version. For a reference, the sequential version needs roughly 182ms to process a single image. A parallel version using BlockingCollection<T> and using tasks processes an image in 103ms. A version presented in this article based on F# agents needs 109ms, which is only slightly more. Another C# version from the book, which uses more sophisticated load balancing needs about 110ms, because the F# version of the sample performs one phase faster than C#, so the load balancing is used in a place where it is not needed. If you try to run the sample on a machine with more cores, I'll be interested to hear your statistics!

The implementation of the image processing is done by creating four asynchronous workflows that communicate via intermediate buffers (the BlockingQueueAgent<T> type). This has the benefit that we don't have to explicitly handle cancellation, because asynchronous workflows automatically check for cancellation at every yield point (e.g. let! and do!). All we have to do to support cancellation is to pass a cancellation token to the Start or RunSynchronously methods when starting the workflow.

Finally, the example also demonstrates how to use asynchronous workflows running on the GUI thread to implement a reactive user interface. When we start a workflow using Async.StartImmediate, the workflow will run only on the thread where it was created. In our case, it runs on the GUI thread, so we can safely access all WinForms elements. Another interesting aspect is the use of Async.AwaitObservable to wait for an event inside asynchronous workflow. This allows us to write state machines in a natural way (as a mutually recursive functions), but without actually blocking any threads.

Source Code

val checkChanged : IObservable<EventArgs>

Create event that fires when the user selects different mode
val btn : RadioButton

  type: RadioButton
  implements: IDropTarget
  implements: ComponentModel.ISynchronizeInvoke
  implements: IWin32Window
  implements: Layout.IArrangedElement
  implements: IBindableComponent
  implements: ComponentModel.IComponent
  implements: IDisposable
  inherits: ButtonBase
  inherits: Control
  inherits: ComponentModel.Component
  inherits: MarshalByRefObject
val buttons : (RadioButton * ImageMode) list

  type: (RadioButton * ImageMode) list
  implements: Collections.IStructuralEquatable
  implements: IComparable<List<RadioButton * ImageMode>>
  implements: IComparable
  implements: Collections.IStructuralComparable
  implements: Collections.Generic.IEnumerable<RadioButton * ImageMode>
  implements: Collections.IEnumerable
event RadioButton.CheckedChanged: IEvent<EventHandler,EventArgs>
Multiple items
type IObservable<'T> =
  interface
    member Subscribe : System.IObserver<'T> -> System.IDisposable
  end

Full name: System.IObservable<_>

--------------------

IObservable
Multiple items
module List

from FSharp.Control

--------------------

module List

from Microsoft.FSharp.Collections

--------------------

type List<'T> =
  | ( [] )
  | ( :: ) of 'T * 'T list
  with
    interface Collections.IEnumerable
    interface Collections.Generic.IEnumerable<'T>
    member Head : 'T
    member IsEmpty : bool
    member Item : index:int -> 'T with get
    member Length : int
    member Tail : 'T list
    static member Cons : head:'T * tail:'T list -> 'T list
    static member Empty : 'T list
  end

Full name: Microsoft.FSharp.Collections.List<_>

  type: List<'T>
  implements: Collections.IStructuralEquatable
  implements: IComparable<List<'T>>
  implements: IComparable
  implements: Collections.IStructuralComparable
  implements: Collections.Generic.IEnumerable<'T>
  implements: Collections.IEnumerable
val reduce : ('a -> 'a -> 'a) -> 'a list -> 'a

Full name: Microsoft.FSharp.Collections.List.reduce
Multiple items
module Observable

from FSharp.Control

--------------------

module Observable

from Microsoft.FSharp.Control
val merge : IObservable<'T> -> IObservable<'T> -> IObservable<'T>

Full name: Microsoft.FSharp.Control.Observable.merge
val waiting : (ImageMode -> Async<'a>)

Wait until the user selects 'Start' and handle radiobutton selection
val mode : ImageMode

  type: ImageMode
  inherits: Enum
  inherits: ValueType
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val evt : Choice<EventArgs,EventArgs>

  type: Choice<EventArgs,EventArgs>
  implements: Collections.IStructuralEquatable
  implements: IComparable<Choice<EventArgs,EventArgs>>
  implements: IComparable
  implements: Collections.IStructuralComparable
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------

type Async
with
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
  static member Ignore : computation:Async<'T> -> Async<unit>
  static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
  static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
  static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
  static member Sleep : millisecondsDueTime:int -> Async<unit>
  static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
  static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
  static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
  static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
  static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
  static member SwitchToNewThread : unit -> Async<unit>
  static member SwitchToThreadPool : unit -> Async<unit>
  static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
  static member CancellationToken : Async<CancellationToken>
  static member DefaultCancellationToken : CancellationToken
end

Full name: Microsoft.FSharp.Control.Async
Multiple overloads
static member Async.AwaitObservable : ev1:IObservable<'a> -> Async<'a>
static member Async.AwaitObservable : ev1:IObservable<'a> * ev2:IObservable<'b> -> Async<Choice<'a,'b>>
static member Async.AwaitObservable : ev1:IObservable<'a> * ev2:IObservable<'b> * ev3:IObservable<'c> -> Async<Choice<'a,'b,'c>>
val form : Gui.MainForm

  type: Gui.MainForm
  implements: IDropTarget
  implements: ComponentModel.ISynchronizeInvoke
  implements: IWin32Window
  implements: IBindableComponent
  implements: Layout.IArrangedElement
  implements: ComponentModel.IComponent
  implements: IDisposable
  implements: IContainerControl
  inherits: Form
  inherits: ContainerControl
  inherits: ScrollableControl
  inherits: Control
  inherits: ComponentModel.Component
  inherits: MarshalByRefObject
field Gui.MainForm.buttonStart
event Control.Click: IEvent<EventHandler,EventArgs>
union case Choice.Choice1Of2: 'T1 -> Choice<'T1,'T2>
let mode = buttons |> List.pick (fun (btn, mode) ->
  if btn.Checked then Some(mode) else None)
union case Choice.Choice2Of2: 'T2 -> Choice<'T1,'T2>
val processing : (ImageMode -> Async<'a>)

Starts the pipeline and waits until it is canceled
val imageMode : ImageMode

  type: ImageMode
  inherits: Enum
  inherits: ValueType
// Initialization & reset mutable values
updateEnabledStatus Mode.Running imageMode
let cts = new CancellationTokenSource()
sw.Restart()
imagesSoFar <- 0
for i in 0 .. totalTime.Length - 1 do totalTime.[i] <- 0
val pipelineCompleted : Event<unit>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------

type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  class
    new : unit -> Event<'Delegate,'Args>
    member Trigger : sender:obj * args:'Args -> unit
    member Publish : IEvent<'Delegate,'Args>
  end

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------

type Event<'T> =
  class
    new : unit -> Event<'T>
    member Trigger : arg:'T -> unit
    member Publish : IEvent<'T>
  end

Full name: Microsoft.FSharp.Control.Event<_>
val task : Async<unit>
module Pipeline

from Microsoft.Practices.ParallelGuideSamples.ImagePipeline
val imagePipelineMainLoop : (ImageInfo -> unit) -> CancellationToken -> ImageMode -> (exn -> unit) -> unit

Full name: Microsoft.Practices.ParallelGuideSamples.ImagePipeline.Pipeline.imagePipelineMainLoop

<summary>
   Runs the image pipeline example. The program goes through the jpg images located in the SourceDir
   directory and performs a series of steps: it resizes each image and adds a black border and then applies
   a Gaussian noise filter operation to give the image a grainy effect. Finally, the program invokes
   a user-provided delegate to the image (for example, to display the image on the user interface).
 
   Images are processed in sequential order. That is, the display delegate will be
   invoked in exactly the same order as the images appear in the file system.
 </summary>

 <param name="displayFn">
   A function that is invoked for each image at the end of the pipeline,
   for example, to display the image in the user interface.
 </param>
 <param name="algorithmChoice">
   The method of calculation. 0=sequential, 1=pipeline, 2=load balanced pipeline
 </param>
 <param name="errorFn">
   A function that will be invoked if this method or any of its parallel
   subtasks observe an exception during their execution.
 </param>
 <param name="token">A token that can signal an external cancellation request.</param>

val updateFn : (ImageInfo -> unit)

Call 'setBitmap' to update info on the GUI thread
val cts : CancellationTokenSource

  type: CancellationTokenSource
  implements: IDisposable
property CancellationTokenSource.Token: CancellationToken
val errorFn : (Exception -> unit)

Display error message when something goes wrong
member Event.Trigger : arg:'T -> unit
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
field Gui.MainForm.buttonStop
Multiple overloads
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
property Event.Publish: IEvent<unit>
type ImageMode =
  | Sequential = 0
  | Pipelined = 1
  | LoadBalanced = 2
  | MessagePassing = 3

Full name: Microsoft.Practices.ParallelGuideSamples.ImagePipeline.Pipeline.ImageMode

  type: ImageMode
  inherits: Enum
  inherits: ValueType
ImageMode.MessagePassing: ImageMode = 3
static member Async.StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
val loadedImages : BlockingQueueAgent<Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo>
type BlockingQueueAgent<'T> =
  class
    new : maxLength:int -> BlockingQueueAgent<'T>
    member AsyncAdd : v:'T * ?timeout:int -> Async<unit>
    member AsyncGet : ?timeout:int -> Async<'T>
    member Count : int
  end

Full name: Image-pipeline-code.BlockingQueueAgent<_>

Agent that implements an asynchronous blocking queue
val queueLength : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val scaledImages : BlockingQueueAgent<Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo>
val filteredImages : BlockingQueueAgent<Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo>
val loadImages : Async<unit>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val clockOffset : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
type Environment =
  class
    static member CommandLine : string
    static member CurrentDirectory : string with get, set
    static member Exit : int -> unit
    static member ExitCode : int with get, set
    static member ExpandEnvironmentVariables : string -> string
    static member FailFast : string -> unit
    static member FailFast : string * System.Exception -> unit
    static member GetCommandLineArgs : unit -> string []
    static member GetEnvironmentVariable : string -> string
    static member GetEnvironmentVariable : string * System.EnvironmentVariableTarget -> string
    static member GetEnvironmentVariables : unit -> System.Collections.IDictionary
    static member GetEnvironmentVariables : System.EnvironmentVariableTarget -> System.Collections.IDictionary
    static member GetFolderPath : SpecialFolder -> string
    static member GetFolderPath : SpecialFolder * SpecialFolderOption -> string
    static member GetLogicalDrives : unit -> string []
    static member HasShutdownStarted : bool
    static member Is64BitOperatingSystem : bool
    static member Is64BitProcess : bool
    static member MachineName : string
    static member NewLine : string
    static member OSVersion : System.OperatingSystem
    static member ProcessorCount : int
    static member SetEnvironmentVariable : string * string -> unit
    static member SetEnvironmentVariable : string * string * System.EnvironmentVariableTarget -> unit
    static member StackTrace : string
    static member SystemDirectory : string
    static member SystemPageSize : int
    static member TickCount : int
    static member UserDomainName : string
    static member UserInteractive : bool
    static member UserName : string
    static member Version : System.Version
    static member WorkingSet : int64
    type SpecialFolderOption =
      | None = 0
      | Create = 32768
      | DoNotVerify = 16384
    type SpecialFolder =
      | ApplicationData = 26
      | CommonApplicationData = 35
      | LocalApplicationData = 28
      | Cookies = 33
      | Desktop = 0
      | Favorites = 6
      | History = 34
      | InternetCache = 32
      | Programs = 2
      | MyComputer = 17
      | MyMusic = 13
      | MyPictures = 39
      | Recent = 8
      | SendTo = 9
      | StartMenu = 11
      | Startup = 7
      | System = 37
      | Templates = 21
      | DesktopDirectory = 16
      | Personal = 5
      | MyDocuments = 5
      | ProgramFiles = 38
      | CommonProgramFiles = 43
      | AdminTools = 48
      | CDBurning = 59
      | CommonAdminTools = 47
      | CommonDocuments = 46
      | CommonMusic = 53
      | CommonOemLinks = 58
      | CommonPictures = 54
      | CommonStartMenu = 22
      | CommonPrograms = 23
      | CommonStartup = 24
      | CommonDesktopDirectory = 25
      | CommonTemplates = 45
      | CommonVideos = 55
      | Fonts = 20
      | MyVideos = 14
      | NetworkShortcuts = 19
      | PrinterShortcuts = 27
      | UserProfile = 40
      | CommonProgramFilesX86 = 44
      | ProgramFilesX86 = 42
      | Resources = 56
      | LocalizedResources = 57
      | SystemX86 = 41
      | Windows = 36
  end

Full name: System.Environment
property Environment.TickCount: int
val numbers : (int -> seq<int>)
val n : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
Multiple items
val seq : seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------

type seq<'T> = IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>

  type: seq<'T>
  inherits: Collections.IEnumerable
val count : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val img : string

  type: string
  implements: IComparable
  implements: ICloneable
  implements: IConvertible
  implements: IComparable<string>
  implements: seq<char>
  implements: Collections.IEnumerable
  implements: IEquatable<string>
val fileNames : seq<string>

  type: seq<string>
  inherits: Collections.IEnumerable
Multiple items
module Seq

from Microsoft.Practices.ParallelGuideSamples.ImagePipeline.PipelineStandard

--------------------

module Seq

from Microsoft.FSharp.Collections
val zip : seq<'T1> -> seq<'T2> -> seq<'T1 * 'T2>

Full name: Microsoft.FSharp.Collections.Seq.zip
val info : Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo

  type: Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo
  implements: IDisposable
val loadImage : string -> string -> int -> int -> Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo

Full name: Microsoft.Practices.ParallelGuideSamples.ImagePipeline.PipelineStandard.loadImage
val sourceDir : string

  type: string
  implements: IComparable
  implements: ICloneable
  implements: IConvertible
  implements: IComparable<string>
  implements: seq<char>
  implements: Collections.IEnumerable
  implements: IEquatable<string>
member BlockingQueueAgent.AsyncAdd : v:'T * ?timeout:int -> Async<unit>

Asynchronously adds item; blocks if queue is full
val scalePipelinedImages : Async<unit>
member BlockingQueueAgent.AsyncGet : ?timeout:int -> Async<'T>

Asynchronously gets item; blocks if queue is empty
val scaleImage : Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo -> unit

Full name: Microsoft.Practices.ParallelGuideSamples.ImagePipeline.PipelineStandard.scaleImage
val displayPipelinedImages : Async<unit>
property BlockingQueueAgent.Count: int

Returns the number of items in the queue (immediately)
val displayImage : (Microsoft.Practices.ParallelGuideSamples.ImagePipeline.ImageInfo -> unit)
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------

type Async
with
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Tasks.Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
  static member Ignore : computation:Async<'T> -> Async<unit>
  static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
  static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
  static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
  static member Sleep : millisecondsDueTime:int -> Async<unit>
  static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions * ?cancellationToken:CancellationToken -> Tasks.Task<'T>
  static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
  static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions -> Async<Tasks.Task<'T>>
  static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
  static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
  static member SwitchToNewThread : unit -> Async<unit>
  static member SwitchToThreadPool : unit -> Async<unit>
  static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
  static member CancellationToken : Async<CancellationToken>
  static member DefaultCancellationToken : CancellationToken
end

Full name: Microsoft.FSharp.Control.Async
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
val cts : CancellationTokenSource

  type: CancellationTokenSource
  implements: IDisposable
property CancellationTokenSource.Token: CancellationToken
val filterPipelinedImages : Async<unit>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
type OperationCanceledException =
  class
    inherit System.SystemException
    new : unit -> System.OperationCanceledException
    new : string -> System.OperationCanceledException
    new : string * System.Exception -> System.OperationCanceledException
    new : System.Threading.CancellationToken -> System.OperationCanceledException
    new : string * System.Threading.CancellationToken -> System.OperationCanceledException
    new : string * System.Exception * System.Threading.CancellationToken -> System.OperationCanceledException
    member CancellationToken : System.Threading.CancellationToken with get, set
  end

Full name: System.OperationCanceledException

  type: OperationCanceledException
  implements: Runtime.Serialization.ISerializable
  implements: Runtime.InteropServices._Exception
  inherits: SystemException
  inherits: exn

Published: Wednesday, 27 October 2010, 11:11 AM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: functional, parallel, asynchronous, f#