12 min read

In this article, written by Leonardo Borges, the author of Clojure Reactive Programming, we will:

  • Explore Rx’s main abstraction: Observables
  • Learn about the duality between iterators and Observables
  • Create and manipulate Observable sequences

(For more resources related to this topic, see here.)

The Observer pattern revisited

Let’s take a look at an example:

(def numbers (atom []))
 
(defn adder [key ref old-state new-state]
(print "Current sum is " (reduce + new-state)))
 
(add-watch numbers :adder adder)

In the preceding example, our Observable subject is the var, numbers. The Observer is the adder watch. When the Observable changes, it pushes its changes to the Observer synchronously.

Now, contrast this to working with sequences:

(->> [1 2 3 4 5 6]
     (map inc)
     (filter even?)
   (reduce +))

This time around, the vector is the subject being observed and the functions processing it can be thought of as the Observers. However, this works in a pull-based model. The vector doesn’t push any elements down the sequence. Instead, map and friends ask the sequence for more elements. This is a synchronous operation.

Rx makes sequences—and more—behave like Observables so that you can still map, filter, and compose them just as you would compose functions over normal sequences.

Observer – an Iterator’s dual

Clojure’s sequence operators such as map, filter, reduce, and so on support Java Iterables. As the name implies, an Iterable is an object that can be iterated over. At a low level, this is supported by retrieving an Iterator reference from such object. Java’s Iterator interface looks like the following:

public interface Iterator<E> {
   boolean hasNext();
   E next();
   void remove();
}

When passed in an object that implements this interface, Clojure’s sequence operators pull data from it by using the next method, while using the hasNext method to know when to stop.

The remove method is required to remove its last element from the underlying collection. This in-place mutation is clearly unsafe in a multithreaded environment. Whenever Clojure implements this interface for the purposes of interoperability, the remove method simply throws UnsupportedOperationException.

An Observable, on the other hand, has Observers subscribed to it. Observers have the following interface:

public interface Observer<T> {
   void onCompleted();
   void onError(Throwable e);
   void onNext(T t);
}

As we can see, an Observer implementing this interface will have its onNext method called with the next value available from whatever Observable it’s subscribed to. Hence, it being a push-based notification model.

This duality becomes clearer if we look at both the interfaces side by side:

Iterator<E> {                       Observer<T> {
   boolean hasNext();                 void onCompleted();
   E next();                          void onError(Throwable e);
   void remove();                     void onNext(T t);
}                                       }

Observables provide the ability to have producers push items asynchronously to consumers. A few examples will help solidify our understanding.

Creating Observables

This article is all about Reactive Extensions, so let’s go ahead and create a project called rx-playground that we will be using in our exploratory tour. We will use RxClojure (see https://github.com/ReactiveX/RxClojure), a library that provides Clojure bindings for RxJava() (see https://github.com/ReactiveX/RxJava):

$ lein new rx-playground

Open the project file and add a dependency on RxJava’s Clojure bindings:

(defproject rx-playground "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
           :url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
                 [io.reactivex/rxclojure "1.0.0"]])"]])

Now, fire up a REPL in the project’s root directory so that we can start creating some Observables:

$ lein repl

The first thing we need to do is import RxClojure, so let’s get this out of the way by typing the following in the REPL:

(require '[rx.lang.clojure.core :as rx])
(import '(rx Observable))

The simplest way to create a new Observable is by calling the justreturn function:

(def obs (rx/return 10))

Now, we can subscribe to it:

(rx/subscribe obs
             (fn [value]
             (prn (str "Got value: " value))))

This will print the string “Got value: 10” to the REPL.

The subscribe function of an Observable allows us to register handlers for three main things that happen throughout its life cycle: new values, errors, or a notification that the Observable is done emitting values. This corresponds to the onNext, onError, and onCompleted methods of the Observer interface, respectively.

In the preceding example, we are simply subscribing to onNext, which is why we get notified about the Observable’s only value, 10.

A single-value Observable isn’t terribly interesting though. Let’s create and interact with one that emits multiple values:

(-> (rx/seq->o [1 2 3 4 5 6 7 8 9 10])
   (rx/subscribe prn))

This will print the numbers from 1 to 10, inclusive, to the REPL. seq->o is a way to create Observables from Clojure sequences. It just so happens that the preceding snippet can be rewritten using Rx’s own range operator:

(-> (rx/range 1 10)
   (rx/subscribe prn))

Of course, this doesn’t yet present any advantages to working with raw values or sequences in Clojure.

But what if we need an Observable that emits an undefined number of integers at a given interval? This becomes challenging to represent as a sequence in Clojure, but Rx makes it trivial:

(import '(java.util.concurrent TimeUnit))
(rx/subscribe (Observable/interval 100 TimeUnit/MILLISECONDS)
             prn-to-repl)

RxClojure doesn’t yet provide bindings to all of RxJava’s API. The interval method is one such example. We’re required to use interoperability and call the method directly on the Observable class from RxJava.

Observable/interval takes as arguments a number and a time unit. In this case, we are telling it to emit an integer—starting from zero—every 100 milliseconds. If we type this in an REPL-connected editor, however, two things will happen:

  • We will not see any output (depending on your REPL; this is true for Emacs)
  • We will have a rogue thread emitting numbers indefinitely

Both issues arise from the fact that Observable/interval is the first factory method we have used that doesn’t emit values synchronously. Instead, it returns an Observable that defers the work to a separate thread.

The first issue is simple enough to fix. Functions such as prn will print to whatever the dynamic var *out* is bound to. When working in certain REPL environments such as Emacs’, this is bound to the REPL stream, which is why we can generally see everything we print.

However, since Rx is deferring the work to a separate thread, *out* isn’t bound to the REPL stream anymore so we don’t see the output. In order to fix this, we need to capture the current value of *out* and bind it in our subscription. This will be incredibly useful as we experiment with Rx in the REPL. As such, let’s create a helper function for it:

(def repl-out *out*)
(defn prn-to-repl [& args]
(binding [*out* repl-out]
   (apply prn args)))

The first thing we do is create a var repl-out that contains the current REPL stream. Next, we create a function prn-to-repl that works just like prn, except it uses the binding macro to create a new binding for *out* that is valid within that scope.

This still leaves us with the rogue thread problem. Now is the appropriate time to mention that the subscribe method from an Observable returns a subscription object. By holding onto a reference to it, we can call its unsubscribe method to indicate that we are no longer interested in the values produced by that Observable.

Putting it all together, our interval example can be rewritten like the following:

(def subscription (rx/subscribe (Observable/interval 100 TimeUnit/MILLISECONDS)
                               prn-to-repl))
 
(Thread/sleep 1000)
 
(rx/unsubscribe subscription)

We create a new interval Observable and immediately subscribe to it, just as we did before. This time, however, we assign the resulting subscription to a local var. Note that it now uses our helper function prn-to-repl, so we will start seeing values being printed to the REPL straight away.

Next, we sleep the current—the REPL—thread for a second. This is enough time for the Observable to produce numbers from 0 to 9. That’s roughly when the REPL thread wakes up and unsubscribes from that Observable, causing it to stop emitting values.

Custom Observables

Rx provides many more factory methods to create Observables (see https://github.com/ReactiveX/RxJava/wiki/Creating-Observables), but it is beyond the scope of this article to cover them all.

Nevertheless, sometimes, none of the built-in factories is what you want. For such cases, Rx provides the create method. We can use it to create a custom Observable from scratch.

As an example, we’ll create our own version of the just Observable we used earlier in this article:

(defn just-obs [v]
(rx/Observable*
   (fn [Observer]
     (rx/on-next Observer v)
     (rx/on-completed Observer))))
 
(rx/subscribe (just-obs 20) prn)

First, we create a function, just-obs, which implements our Observable by calling the Observable* function.

When creating an Observable this way, the function passed to Observable* will get called with an Observer as soon as one subscribes to us. When this happens, we are free to do whatever computation—and even I/O—we need in order to produce values and push them to the Observer.

We should remember to call the Observer’s onCompleted method whenever we’re done producing values. The preceding snippet will print 20 to the REPL.

While creating custom Observables is fairly straightforward, we should make sure we exhaust the built-in factory functions first, only then resorting to creating our own.

Manipulating Observables

Now that we know how to create Observables, we should look at what kinds of interesting things we can do with them. In this section, we will see what it means to treat Observables as sequences.

We’ll start with something simple. Let’s print the sum of the first five positive even integers from an Observable of all integers:

(rx/subscribe (->> (Observable/interval 1 TimeUnit/MICROSECONDS)
                   (rx/filter even?)
                   (rx/take 5)
                   (rx/reduce +))
                   prn-to-repl)

This is starting to look awfully familiar to us. We create an interval that will emit all positive integers starting at zero every 1 microsecond. Then, we filter all even numbers in this Observable. Obviously, this is too big a list to handle, so we simply take the first five elements from it. Finally, we reduce the value using +. The result is 20.

To drive home the point that programming with Observables really is just like operating on sequences, we will look at one more example where we will combine two different Observable sequences. One contains the names of musicians I’m a fan of and the other the names of their respective bands:

(defn musicians []
(rx/seq->o ["James Hetfield" "Dave Mustaine" "Kerry King"]))
 
(defn bands     []
(rx/seq->o ["Metallica" "Megadeth" "Slayer"]))

We would like to print to the REPL a string of the format Musician name – from: band name. An added requirement is that the band names should be printed in uppercase for impact.

We’ll start by creating another Observable that contains the uppercased band names:

(defn uppercased-obs []
(rx/map (fn [s] (.toUpperCase s)) (bands)))

While not strictly necessary, this makes a reusable piece of code that can be handy in several places of the program, thus avoiding duplication. Subscribers interested in the original band names can keep subscribing to the bands Observable.

With the two Observables in hand, we can proceed to combine them:

(-> (rx/map vector
          (musicians)
           (uppercased-obs))
   (rx/subscribe (fn [[musician band]]
                   (prn-to-repl (str musician " - from: " band)))))

Once more, this example should feel familiar. The solution we were after was a way to zip the two Observables together. RxClojure provides zip behavior through map, much like Clojure’s core map function does. We call it with three arguments: the two Observables to zip and a function that will be called with both elements, one from each Observable, and should return an appropriate representation. In this case, we simply turn them into a vector.

Next, in our subscriber, we simply destructure the vector in order to access the musician and band names. We can finally print the final result to the REPL:

"James Hetfield - from: METALLICA"
"Dave Mustaine - from: MEGADETH"
"Kerry King - from: SLAYER"

Summary

In this article, we took a deep dive into RxJava, a port form Microsoft’s Reactive Extensions from .NET. We learned about its main abstraction, the Observable, and how it relates to iterables. We also learned how to create, manipulate, and combine Observables in several ways. The examples shown here were contrived to keep things simple.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here