Reactive Programming (III.) - Useful Reactive LINQ Operators

In the previous article, I introduced Reactive LINQ. I explained the different point of view that we can use when working with .NET events. The idea is that .NET events can be viewed as streams of values. The value is information about the event (such as position of a mouse click or a mouse movement). These streams can be processed using LINQ queries - we can for example filter all values that are not interesting for us using where LINQ clause. For example if we want to handle clicks only in some specified area.

In the previous article, I talked about basic LINQ query operators such as select and where and some useful methods that Reactive LINQ provides (for example for merging event streams). Today, we'll take a look at two more advanced kinds of operations that we can use for working with event streams. In particular, we'll talk about aggregation operators (that you certainly know from LINQ) and about switching. Switching is a concept from functional reactive programming and it allows us to change dynamically how the application behaves. However, I'll explain this in a more detail soon.

In this article, I'm going to use mostly C# (and some Visual Basic). The functionality that I'm describing in this part isn't part of the standard F# Event module that I discussed in the first part. I implemented most of them in F# too, but I'm not going to write the samples in both of the versions in this part. If you've seen the first two articles, you'll be definitely able to use the F# versions as well, because they follow exactly the same ideas as the C#/LINQ versions. I'm going to talk about a larger demo application in the last section and I'll show an F# version as well, so you'll see some F# examples in the next part. This part serves more as a reference of the available operators, so you may read only some parts of it, then jump to the last one (to see an exciting example!) and then come back here.

Aggregating event streams

When working with data using LINQ, we can use various aggregation operators. These operators compute a single value from a collection of object. You can for example use Sum to add prices of all products, Max to find the largest price or Aggregate, which is performs a custom aggregation specified by a lambda function.

As we'll see, these operations are quite useful for Reactive LINQ as well. However, they'll have to behave a bit differently than for collections. When you have, for example, a collection of products, you can easily calculate the total price. The collection has finite number of elements, so we can iterate over all of them. On the other hand, event streams aren't really finite or at least, you can't say when you reached the end. If you're processing for example mouse clicks, you can't tell whether the user will continue clicking or not. The stream is finite in some sense - when the application is closed, the stream ends, but doing something (other than cleanup) when the application is closed doesn't look very useful!

Instead of returning a value at the end of the stream, our aggregation operators will yield the present value every time an event occurs and then continue with the aggregation. This means that aggregation operators will also return IEvent<T> and the event will be triggered every time the aggregated value is updated. We can use this for example for counting number of clicks on a button.

Counting clicks

I believe that an example will make everything clear. In the following snippet, we'll attach an event to a button and we'll use Sum to count the number of clicks on the button:

btnClick.AttachEvent("Click")
.Select(ea => 1).Sum().Listen(n =>
lblOut.Text = String.Format("Current count is: {0}", n));


The usual call to AttachEvent is followed by a call to Select. The result of this call will be an event stream that will contain a number 1 every time the button is clicked. To get the total number of clicks on the button, we'll need to sum all the ones coming from the stream. The easiest way to do this is to use the Sum operator. The result of this operation is an event that is triggered every time the user clicks on the button. Values that will appear in this event stream will be the sum calculated from the values so far, which means that it will contain numbers 1, 2, 3, ... and that's the number of clicks that we want to display!

The fact that the result of aggregation in Reactive LINQ isn't just a number, but an event stream means that we can do multiple aggregations in a sequence. In the previous example, we've seen how to generate a sequence containing ascending sequence of integers. If we'll multiply all of them, we'll get the well known factorial function (the sequence of factorials will be 1, 1*2, 1*2*3, 1*2*3*4, ...). Calculating total value was easy, because we had Sum operator, but multiplication isn't built-in, so we'll have to use the generic Aggregate operation:

btnClick.AttachEvent("Click")
// Generates sequence of integers: 1, 2, 3, ...
.Select(ea => 1).Sum()
// Generate sequence of factorials: 1, 1*2, 1*2*3, ...
.Aggregate(1, (fact, current) => fact * current)
.Listen(n =>
lblOut.Text = String.Format("Factorial is: {0}", n));


The part ended with the call to Sum is same as in the previous example and it gives us a sequence of integers. When aggregating using the Aggregate operator, we'll need some initial value and we'll write a function that takes the current value from the stream and the previous result (starting with the initial) and produces a new result. In our example, the initial value is 1 (we don't want to multiply everything by zero!) and the function takes the current factorial (fact) and multiplies it by the next value from the stream (current). Finally, we add a handler to show the results using Listen.

Using Visual Basic Query syntax

The only way to write aggregation in C# is to use extension methods explicitly. Interestingly, in Visual Basic, there is query syntax for that as well using the Aggregate clause. We'll take a look how to rewrite the first example (showing the number of clicks) in Visual Basic using just a single query. First, we'll need a simple utility function for displaying the result, because Visual Basic has only limited support for lambda functions:

' Utility function that allows us to set label text in the query easily
Function SetText(ByVal lbl As Label, ByVal str As String) As Integer
lbl.Text = str
Return 0
End Function


Now we can take a look at the query. This time we'll add reaction to the MouseDown event and we'll only count clicks that were made by the left button. In C#, we'd have to write this using a nested query, but in VB it can be written like this:

' Count the number of clicks made by the left button
Dim eCount = _
Aggregate ea In btnClick.AttachEvent(Of MouseEventArgs)("MouseDown") _
Where ea.Button = MouseButtons.Left _
Into Sum(1)

' Display the number using utility function 'SetText'
eCount.Listen(Function(n) SetText(lblOutput, _
String.Format("Count: {0}", n)))


Visual Basic query that does aggregation starts with the Aggregate keyword and then specifies what values will be aggregated. In our case, it is the value coming from the MouseDown event. This can be followed by various other LINQ clauses. We use just a single Where to filter only left button clicks. Finally, there is an Into clause that specifies what will be the aggregation function and what will be used as an input. The number 1 is the input - we ignore the value that comes from the event stream and just add one every time. You could for example sum X-coordinates of the clicks, in which case you'd write Sum(ea.X).

The call to the Listen method is similar as in C#. Lambda function in Visual Basic is written using Function(..) ... syntax. One of the limitations is that it can't contain assignment in the body, so that's why we implemented the SetText function first. Thanks to this, we can write a lambda function that contains method call, which is valid.

All the examples that we've seen so far were quite interesting, but a little limited. The most important limitation is that once they started executing, the behavior didn't change. All of the examples were always doing the same thing.

Changing behavior dynamically

In this section we're going to look at another set of operators that I'll call switching operators. These operators can be used for writing programs that change their behavior during the execution and start reacting differently. For example, we'll extend our 'click counting' sample and add a checkbox that will specify whether we want to add 1 or subtract 1. Another example is that we could write a program that would start behaving differently after an event occurs ten times. These examples should give you an idea what I mean by dynamically. When you write the program, you don't know when the change in the behavior will happen, because it is controlled by the program execution. The operators that we'll look at now are all very powerful, because most of the programs change parts of their behavior during the execution.

Choosing between behaviors

The first example that we'll implement is the 'click counting' sample with 'decrement' checkbox. We'll add a checkbox to the form and we'll want to subtract 1 from the total every time the user clicks on a button while the checkbox is checked and add 1 in case the checkbox is not checked. Let's start by a thing that you already know. We'll use aggregation to create a stream that will yield a value when user changes the checkbox state. The values in the stream will be 1 (when checkbox is checked) or -1. We also don't want to wait until the user changes the state, so we'll add one additional initial event that will be triggered shortly after the program starts using Reactive.After:

// Triggered after 1 second and then on every check-box click
var eZigZag =
Reactive.Merge
(checkDecrement.AttachEvent("Click"),
Reactive.After(1000, EventArgs.Empty))
// Create an event stream containing: 1, -1, 1, -1, ...
.Aggregate(-1, (current, _) => current * -1);


The input for the Aggregate operator will be EventArgs values from the events. This isn't needed and in cases like that, I'll often just use underscore as a variable name (in F#, this means ignore the value; in C#, it is a valid variable name, but it suggests that we don't care about it). The initial value is -1 and every time an event occurs, we just multiply it by -1 to get a zig-zag stream containing 1, -1, 1, -1 etc.

Now comes the interesting part. We know how to count the number of clicks on a button and we have another event stream (eZigZag) that gives us a value every time we want to switch between addition and subtraction when processing button clicks. We can write this using SwitchRepeatedly, which calls our function every time the value in the zig-zag stream changes. The function that we provide is the most important thing. Instead of returning some value, it will generate an event stream. The overall event stream will then switch between the event streams generated by our function. Let's take a look at the example and I'll explain everything in a more detail shortly:

// Dynamically switch between 1, 1, 1... and -1, -1, -1...
eZigZag.SwitchRepeatedly(n =>
from ea in btnClick.AttachEvent("Click") select n)

// Sum all nubers coming from the 'switch'
.Sum().Listen(n =>
lblOut.Text = String.Format("Current count is: {0}", n));


The input for the SwitchRepeatedly operator is the event that will be triggered when the application starts (with 1 as the argument) and then every time the 'decrement' checkbox is clicked. This means that the lambda function will be executed for the first time when the application starts. The code inside the lambda function creates another event stream. In our example, it attaches event handler for the Click event of the button and yields n every time the event occurs. This variable is an argument for the lambda function and it will have value 1 at the beginning. Later (when eZigZag is triggered), it will alternate between -1 and 1.

The overall type of the lambda function is Func<int,IEvent<int>>. This means that it gets int as an argument and returns an event of integers. The result of SwitchRepeatedly is an event that does the same thing as the event returned by the lambda function (in our example, yields 1 every time button is clicked). However, when the eZigZag is triggered, it calls the function to get a new event stream and then starts to behave just like this new event stream. In our example, the event returned by the second call will be yielding -1 values. The Repeatedly in the name of the operator suggests that the behavior is changed dynamically every time an event in the source stream occurs is triggered.

Also note that the type of eZigZag and the type of the event returned by the lambda function can be different. If the lambda function returned for example event yielding strings, the result will be also an event yielding strings. Values from the source event are used only to generate the current event. The next switching operator will be a bit simpler, but (as we'll see shortly) very useful as well!

Changing behavior on the first event

In this section we'll take a look at SwitchFirst operator. It allows us to write a event streams that yield a value when the event occurs for the first time and then start behaving differently. For example, if you wanted to display a game score, you'd start with 0 when the user clicks on 'Start' and then switch to the actual algorithm that computes the score.

Our example will be simpler. When the user clicks on a button, we'll display a message that the button was clicked for the first time. Any subsequent click will display a message showing the number of clicks:

btnClick.AttachEvent("Click").Select(_ => "Clicked for the first time!")
.SwitchFirst(s =>
// Return the new behavior: Sum clicks & return message
btnClick.AttachEvent("Click").Select(_ => 1).Sum()
.Select(n => string.Format("Clicked {0} times!", n)))
.Listen(s => lblOut.Text = s);


The event that we use as a source for SwitchFirst yields a message when a button is clicked. The SwitchFirst operator waits for this event and sends this event further to its listeners. This means that the Listen handler will receive the message as well. After triggering the event, SwitchFirst call the function used as an argument to get a new event stream and changes its behavior to do the same thing as this event stream. In our example, the function returns an event stream that counts the number of clicks. After the first click (and after showing the first message), the program will start counting the clicks and it will display the new count every time the button is clicked.

This operator can be used for changing the behavior once, but it is actually far more interesting. It can be used for changing the behavior in almost any way if we use it together with recursion. We'll look at an example in the next section.

Changing behavior on every event

Recursion is a very useful technique and it is used very often in functional programming. In imperative languages, it is used less frequently, so if you're not used to it, you may find this section a bit tricky. As we've seen, we can use SwitchFirst to change the behavior after the first event occurs. This is done by specifying a function that will be called to get a new event stream.

However, the newly returned event stream can also contain SwitchFirst, so it can change the behavior again. Even more interestingly, we can generate the original stream and the new one using the same function, so every event stream returned by the function will always wait for the first event and then switch to another behavior generated using the same function (possibly with some other arguments).

This sounds a bit abstract, so let me show you a practical example. We'll a similar construct in the next article when developing the game. We want to implement the following behavior: When you click on the button a message "Click" will appear. After 5 seconds, it will be replaced with "Timeout!" unless you clicked on the button again during that time. The way we'll do this is that we'll wait for an event - either click or a timeout. After an event occurs we'll show the right message and then change the behavior to start from the beginning again. This change here is very important, because it resets the timer, so the "Timeout!" message will not appear sooner than after next 5 seconds:

// 'eClickOrTimeout' is a function that returns an evnet
Func<IEvent<string>> eClickOrTimeout = null;
eClickOrTimeout = () =>
// Triggered when user clicks on the button or after 5 seconds
Reactive.Merge
(btnClick.AttachEvent("Click").Select(_ => "Click..."),
Reactive.After(5000, "Timeout!"))
.SwitchFirst(e => eClickOrTimeout());

// Create an event stream and add listener
eClickOrTimeout()
.Listen(s => lblOut.Text = s);


We first declare a value called eClickOrTimeout. This is not an event as in previous examples, but a function that creates an event when we call it. This is because we need to generate a new event every time when the SwitchFirst operator calls our function to make sure that the event generated using Reactive.After will be triggered after 5 seconds (if it wasn't a function, the event would be triggered only once). Because we need to use the eClickOrTimeout inside its own declaration we first initialize the value to null and then assign it the actual value.

When constructing the event, we attach a handler for the button click and merge it with an event generated using Reactive.After. This means that the first event will be either a mouse click or a timeout event after 5 seconds. This is followed by a SwitchFirst call. When the first event occurs, SwitchFirst will throw the original event stream away (which means that no further clicks or timeouts will be handled) and changes its behavior to do the same thing as the new stream. The new stream is generated using the given function. In our example, it recursively uses eClickOrTimeout, so it will again wait either for a click or for a timeout.

To add an event handler to this recursive stream, we run the lambda function to start the event processing and add an event handler using Listen operator. As I mentioned earlier, writing recursion explicitly is often quite tricky, so Reactive LINQ provides one more switching operator that can be used for implementing this example in an easier way.

Using recursive switch operator

The operator is named SwitchRecursive and it is the last one from the family of switching operators that Reactive LINQ currently implements. It allows us to write the previous example without using recursion explicitly (internally, it will do almost the same thing). The only difference is that we have to explicitly specify the first event that will trigger the whole processing. In the previous example, the first event was the same as all the subsequent events. In the new version, the program will first wait 1 second and then start the processing similarly as in the last example:

Reactive.After(1000, "")
.SwitchRecursive(s =>
Reactive.Merge
(btnClick.AttachEvent("Click").Select(_ => "Click..."),
Reactive.After(5000, "Timeout!"))
.Listen(s => lblOut.Text = s);


The SwitchRecursive operator waits for the first event from the input stream and then calls the function to generate the new event. So far, it does the same thing as SwitchFirst. However, the difference is that when a first event occurs in the returned event stream (in our example either a click or a timeout) it calls the function again to obtain a new stream again. When calling the function, it uses the value returned by the last event as an argument, so we can represent any state we need during the event processing.

So far, we've looked at three slightly different switching operators that should be sufficient for most of the event processing that you may need. In the last section, we're going to look how they can be written using LINQ queries. This is not completely straightforward as for example when using where, but it still simplifies the code in many situations.

Switches and LINQ queries

From all the query operators that LINQ provides the one that corresponds closely to our switching operators is SelectMany (side note for Haskell programmers: this is bind, but not all switching operators above have exactly the right type). When writing a query using the simplified syntax, SelectMany is used when you have multiple from clauses in the query.

The following query implements the first example that we implemented earlier using SwitchRepeatedly. It counts number of clicks on a specified button and allows changing between incrementing and decrementing using a checkbox. We first declare an event that is triggered when the application starts and then when the user clicks on the 'decrement' checkbox. This is just to make the code more readable - it would be perfectly possible to write the whole thing as a single query. Then we write a query with multiple from clauses that internally uses the SwitchRepeatedly operator. Let's take a look at the example and I'll explain how exactly the operator is used later:

// When application starts and then when 'decrement' checkbox is clicked
var eDecrementClick = Reactive.Merge
(Reactive.After(1000, EventArgs.Empty),
checkDecrement.AttachEvent("Click"));

// Create stream with 1, -1, 1.. and combine it using 'SwitchRepeatedly'
(from n in eDecrementClick.Aggregate(1, (current, _) => current * -1)
from m in btnClick.AttachEvent("Click").UseSwitchRepeatedly()
select -1 * n).Sum().Listen(n =>
lblOut.Text = String.Format("Current count is: {0}", n));


The first from clause specifies the input stream for the SwitchRepeatedly operator. The second one specifies how to obtain an event stream once we have a value for the n variable (or when we get a new value as a result of the first event being triggered). An interesting thing to note is that we use an extension method UseSwitchRepeatedly at the end of the second from clause. This alone doesn't do anything, but it specifies what kind of switch do we want to use (similarly, you could write UseSwitchFirst or UseSwitchRecursive). If you forget to specify this, you'll get a compilation error, because the C# compiler will not know which of the possible SelectMany implementations should it use. There is one implementation for every switch and this UseXyz method specifies this by returning a special type of event. In our example, it will return IEventSwitchRepeatedly<T> instead of IEvent<T>, but this is very much just a technical detail that you don't have to worry about.

I believe that in some cases, the query syntax is more readable. In particular, it is interesting when you need to use other operations that are available in a query such as select or where or for example Aggregate in Visual Basic (actually, you could even use nested switching operators). Anyway, the following example should give you an idea how the query is translated to the ordinary method calls:

(from n in eDecrementClick.Aggregate(1, (current, _) => current * -1)
from m in btnClick.AttachEvent("Click").UseSwitchRepeatedly()
select -1 * n)
.Sum().Listen(n =>
lblOut.Text = String.Format("Current count is: {0}", n));


This code will be translated to something like this:

(...).SwitchRepeatedly(n => ...).Select(... => ...).(...)

Actually, the query is compiled to a code that calls one of the SelectMany operators, but this is what we get if we look at the implementation of this operator. As I already mentioned, the use of SwitchRepeatedly switching operator is controlled by the UseXyz method. The example uses select clause, which is automatically translated to a call to Select method that is added after the call to SwitchRepeatedly. If you included where clause, it would be added after the call to switch as well and these are the cases where query syntax is a lot more concise.

Summary

In this article, we've looked at a couple of operators that you can use in Reactive LINQ. In particular, we discussed various ways of doing aggregation (such as Sum) and I also explained how this is different to the usual notion of aggregation in LINQ that works with finite data sets. Next, we discussed three slightly different switching operators. These are very important when implementing a more sophisticated code and in the next article we're going to look at a larger example that will use them (together with everything else that I just introduced).

I already mentioned in the previous article, that the current implementation is just a prototype that demonstrates the idea and shows what can be done with LINQ. My goal is to work on this project and turn it into a more stable and easier to use library. Currently, you can run into a problem when using switching operators in some ways, because the resources used by events may not be released correctly. Also, I'm not yet sure which of the switching operators are really the most useful, so if you'll play with the code, definitely send me your feedback!