TryJoinads (IV.) - Concurrency using join calculus
Join calculus provides a declarative way of expressing asynchronous synchronization
patterns. It has been use as a basis for programming languages (JoCaml and COmega), but also
as a basis for libraries (embedded in C# and Scala). Using joinads, it is possible to
embed join calculus in F# with a nice syntax using the match!
construct. Formally,
join calculus does not form a monad, but it can be viewed as a version of joinad
as described in the first paper on joinads.
The programming model is based on channels and join patterns. A channel can be viewed as a thread-safe mailbox into which we can put values without blocking the caller. In some sense, this is quite similar to F# agents. A join pattern is then a rule saying that a certain combination of values in channels should trigger a specific reaction (and remove values from the channels). The ability to match on multiple channels distinguishes join calculus from F# agents.
Note: This blog post is a re-publication of a tutorial from the TryJoinads.org web page. If you read the article there, you can run the examples interactively and experiment with them: view the article on TryJoinads.
Thread-safe buffer
As a first example, we implement a thread-safe (unbounded) blocking buffer. The buffer
uses two channels - the put
channel is used for adding values to the buffer and
has a type Channel<string>
; the get
channel is used for retrieving values from the
buffer. When the buffer is empty, the call cannot complete, so the operation is available as
an asynchronous workflow. Channels that are used to obtain some result from a join pattern
are represented using the ReplyChannel<'T>
type where 'T
is the type of the returned
value.
After constructing the channels, we construct a join pattern using the join { ... }
computation builder by pattern matching on channels using match!
:
open FSharp.Extensions.Joinads // Construct channels implementing the buffer let put = Channel<string>() let get = ReplyChannel<string>() // Encode a join pattern over the channels join { match! get, put with | repl, v -> repl.Reply("Echo: " + v) }
The pattern matching consists of a single clause that matches on both of the channels. When
both values become available, the join pattern is triggered. In response, it calls
Reply
method of the value received from the get
buffer. This sends a string value back
to the caller of the get
channel. Unlike for example async { ... }
, the join { ... }
computation does not return any value - it constructs a join pattern that operates over
the existing channels. We look at a more complex sample soon, but let's first
demonstrate how the channels are used:
// Store some values in 'put' channel put.Call("Hello world.") put.Call("Hi there!") // Obtain value via the 'get' channel get.AsyncCall() |> Async.RunSynchronously
The above example shows how channels behave - values stored in channels are cached and when
there is a matching pair (after calling both put
and get
), the join pattern fires.
Note that calling AsyncCall
using RunSynchronously
blocks the F# Interactive if there
is no matching value in the put
channel.
Buffer with two input channels
To make the example slightly more interesting, we now change it to use two input channels.
In the extended version, we can store two types of values in the buffer. The putString
channel makes it possible to store strings and the putInt
channel can be used to store
integers. Values can be retrieved only as strings (numbers get converted to strings) using
a single getString
channel.
This makes the join pattern more interesting, because it must now handle two cases. One
pattern handles the case when there is a string in putString
and a request to read value
in getString
and another pattern handles a case when there are values in putInt
and
getString
:
let putString = Channel<string>() let putInt = Channel<int>() let getString = ReplyChannel<string>() join { match! getString, putString, putInt with | repl, v, ? -> repl.Reply("Echo " + v) | repl, ?, v -> repl.Reply("Echo " + (string v)) }
The first clause of the match!
construct handles the case when there is a value in
getString
and putString
. The third channel is matched against the ?
pattern, which
means that a value from that channel is not required. When the pattern matches, the
values from getString
and putString
channels are removed and they are processed
(by sending a string back to the caller of getString
channel). The second pattern is
similar, but it matches on getString
and putInt
.
The following example shows how to call the buffer implemented above:
// Put values to 'putString' and 'putInt' channels async { for i in 1 .. 5 do putString.Call("Hello!") putInt.Call(i) do! Async.Sleep(100) } |> Async.Start // Call 'getString' asynchronously to read replies async { while true do let! repl = getString.AsyncCall() printfn "got: %s" repl } |> Async.Start
The code creates and starts tw oasynchronous workflows - one that puts 5 values to
each of the put channels and another that keeps reading values from the getString
channel. When you run the code, you should see that the two join patterns are
triggered in an interleaving order, as the values are added to the put channels.
Dining Philosophers
Nick Palladinos recently used the join
computation builder to implement the famous
dining philosophers problem,
which demonstrates competition for resources. The solution is available
on F# snippets.
The problem is defined as follows: There are five philosophers sitting around the table with a fork between each of them. In order to eat, a philosopher needs to take hold of two forks (one on the left and one on the right of him). When holding both forks, the philosopher can eat for some time and then puts both forks back on the table and thinks for some time.
Using joinads, the problem can be solved by creating a join pattern for every philosopher. The pattern matches when the philosopher acquires forks on both sides (represented as channels).
- See the join calculus based dining philosophers on F# snippets.
- Load join calculus sample on TryJoinads.
Summary
In this article, we looked at two basic examples that used join calculus, embedded
in F# using the match!
construct. The second example showed a sample consisting
of multiple join patterns. This is when join calculus becomes quite powerful, because
we can declaratively specify what combination of values should trigger a certain
reaction.
Join calculus provides a powerful abstraction for implementing concurrency primitives - it can be used to write various buffers, reader-writer locks, count-down events etc. However, the join calculus has been designed as an abstraction for distributed programming and so it may find use in other areas as well.
Full name: TryJoinads.put
type: Channel<string>
module Channel
from FSharp.Extensions.Joinads
--------------------
type Channel<'T> =
class
interface IChannel<'T>
new : unit -> Channel<'T>
member Call : message:'T -> unit
member Put : message:'T -> unit
end
Full name: FSharp.Extensions.Joinads.Channel<_>
type: Channel<'T>
val string : 'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
type: string
Full name: TryJoinads.get
type: ReplyChannel<string>
inherits: Channel<IReplyChannel<string>>
type ReplyChannel<'TArg,'TRes> =
class
inherit Channel<'TArg * IReplyChannel<'TRes>>
new : unit -> ReplyChannel<'TArg,'TRes>
member AsyncCall : arg:'TArg -> Async<'TRes>
end
Full name: FSharp.Extensions.Joinads.ReplyChannel<_,_>
type: ReplyChannel<'TArg,'TRes>
inherits: Channel<'TArg * IReplyChannel<'TRes>>
--------------------
type ReplyChannel<'TRes> =
class
inherit Channel<IReplyChannel<'TRes>>
new : unit -> ReplyChannel<'TRes>
member AsyncCall : unit -> Async<'TRes>
end
Full name: FSharp.Extensions.Joinads.ReplyChannel<_>
type: ReplyChannel<'TRes>
inherits: Channel<IReplyChannel<'TRes>>
Full name: FSharp.Extensions.Joinads.Builders.join
type: string
module Async
from FSharp.Extensions.Joinads.AsyncTopLevel
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
--------------------
type Async =
class
static member ParallelImmediate : works:Async<'T> [] -> Async<'T []>
static member StartChildImmediate : work:Async<'a0> -> Async<Async<'a0>>
static member WhenAny : works:Async<'T> [] -> Async<'T>
end
Full name: FSharp.Extensions.Joinads.Async
Full name: TryJoinads.putString
type: Channel<string>
Full name: TryJoinads.putInt
type: Channel<int>
val int : 'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
type: int<'Measure>
inherits: System.ValueType
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
type: int
inherits: System.ValueType
Full name: TryJoinads.getString
type: ReplyChannel<string>
inherits: Channel<IReplyChannel<string>>
type: int
inherits: System.ValueType
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
type: int
inherits: System.ValueType
type: string
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Published: Wednesday, 22 February 2012, 5:38 PM
Author: Tomas Petricek
Typos: Send me a pull request!
Tags: f#, research, joinads, asynchronous, parallel