12 min read

In the first post of this series we saw how RxJS Observables are similar in use to Promises and we covered how powerful they are. In this post we’ll experience the power Observables bring to the table.

Now, as if our little app wasn’t inspirational enough, we want to overload it with inspiration. Instead of 1, we want to show 10 Chuck Norris inspirations one by one, with a delay of 2 seconds each. Let’s implement that with Promises first:

Promise

JsBin: http://jsbin.com/wupoya/1/edit?js,output
import superagent from 'superagent';

let apiUrl,
   inspireHTML,
   addPromiscuousInspiration,
   add10Inspirations;

apiUrl = `http://api.icndb.com/jokes/random?limitTo=[nerdy,explicit]`;

inspireHTML = (parentId, inspiration) => {
   let parentNode,
       inspirationalNode;

   parentNode = document.getElementById(parentId);
   inspirationalNode = document.createElement('p');
   inspirationalNode.innerHTML = inspiration;

   parentNode.insertBefore(inspirationalNode, parentNode.firstChild);
};

addPromiscuousInspiration = () => {
   let promiscuousInspiration;

   promiscuousInspiration = new Promise((resolve, reject) => {
       superagent
           .get(apiUrl)
           .end((err, res) => {
                if (err) {
                   return reject(err);
               }

               let data,
                   inspiration;

               data = JSON.parse(res.text);
               inspiration = data.value.joke;

               console.log('Inspiration has arrived!');
               return resolve(inspiration);
           });
   });

   promiscuousInspiration.then((inspiration) => {
       let parentId;

       parentId = `inspiration`;
       inspireHTML(parentId, inspiration);
   }, (err) => {
       console.error('Error while getting inspired: ', err);
   });
};

add10Inspirations = () => {
   let maxTries,
       tries,
       interval;

   maxTries = 10;
   tries = 1;

   interval = setInterval(() => {
       addPromiscuousInspiration();

       if (tries < maxTries) {
           tries++;
       } else {
           clearInterval(interval);
       }
   }, 2000);
};

add10Inspirations();

Note: From now on we are injecting inspirations into HTML (as you’d have guessed from the code). So keep the ES6/Babel and Output panels open in JsBin

Well there, we solved the problem. The code is convoluted, but it is just plain JavaScript we already know, so I am not going to explain it step-by-step.

Let’s try to implement the same using Observable.

Observable

JsBin: http://jsbin.com/wupoya/3/edit?js,output
import superagent from 'superagent';
import {Observable} from 'rx';

let apiUrl,
   inspireHTML,
   reactiveInspirations;

apiUrl = `http://api.icndb.com/jokes/random?limitTo=[nerdy,explicit]`;

inspireHTML = (parentId, inspiration) => {
   let parentNode,
       inspirationalNode;

   parentNode = document.getElementById(parentId);
   inspirationalNode = document.createElement('p');
   inspirationalNode.innerHTML = inspiration;

   parentNode.insertBefore(inspirationalNode, parentNode.firstChild);
};


reactiveInspirations = Observable.create((observer) => {
   let interval,
       maxTries,
       tries;

   maxTries = 10;
   tries = 0;

   interval = setInterval(() => {
       superagent
           .get(apiUrl)
           .end((err, res) => {
               if (err) {
                   return observer.onError(err);
               }

               let data,
                  inspiration;

               data = JSON.parse(res.text);
               inspiration = data.value.joke;

               observer.onNext(inspiration);
           });

       if (tries < maxTries) {
           tries++;
       } else {
          observer.onCompleted();
       }
   }, 2000);

   return () => {
       console.log('Releasing the Kraken!');
       clearInterval(interval);
   };
});

reactiveInspirations.subscribe({
   onNext: (inspiration) => {
       let parentId;

      parentId = `inspiration`;
       inspireHTML(parentId, inspiration);
   },
   onError: (error) => {
       console.error('Error while getting inspired', error);
   },
   onCompleted: () => {
       console.log('Done getting inspired!');
   },
});

Easter Egg: There is a tiny little easter egg in this code. Try find it if you want some candy! Hint: Observable can stop right when asked to.

There are better ways of doing this (and we’ll use them later), but to try comparing apples to apples I took the same approach as Promises. We’ll use an interval that makes a call to the server and gets us an inspiration. You may have noticed some major differences. The consumer of our Observable (subscriber) is not changed at all, it is still assuming the same thing it was earlier. And that is a big thing. The only change we made is in the creation of our Observable. Earlier we would create a single value and called it completed, but now we set an interval and emit values making an Ajax request every 2 seconds.

Take another look at how we have written the Observable creator. Notice how we are clearing the interval we set up. We put the code responsible for clearing the interval in the dispose method, because its responsibility is to release resources. And then when we have made 10 requests, we simply execute onCompleted and all the resources that need to be cleared (the interval in this case) are released. If you still can’t see the power this declarative way of disposing resources brings, let’s assume another case. Assume you are the consumer (subscriber) of this observable, and now you want only five inspirations. But you can’t change the implementation of the Observable. How would you go about it? We can go around counting the inspirations and ignore that after we’ve received five, but that means we waste five requests.

We want to make only five requests to the server, and then we want to stop the interval and make no more requests. We can actually do that without making a single change to how the Observable is created. When we do a reactiveInspiration.subscribe, it returns us a Disposable object. We can call reactiveInspiration.subscribe(…).dispose() any time to stop the Observable right there and release all its resources. Turns out there are many such use cases which come up more than often when you’re dealing with streams/collections of asynchronous operations. RxJS provides very nice API to deal with a lot of them. Instead of manually counting the number of items emitted by our Observable and then disposing it, we can use Observable.prototype.take operator. Observable.prototype.take takes a number as input, and call dispose on the Observable after it has emitted that many values. Here we go:

JsBin: http://jsbin.com/wupoya/4/edit?js,output
reactiveInspirations
   .take(5)
   .subscribe({
   onNext: (inspiration) => {
       let parentId;

       parentId = `inspiration`;
       inspireHTML(parentId, inspiration);
   },
   onError: (error) => {
       console.error('Error while getting inspired', error);
   },
   onCompleted: () => {
       console.log('Done getting inspired!');
   },
});

If you notice in the console, you would see Releasing the Kraken! logged after five inspirations, and no more requests will be made to the server. take is one of the many operators available on the Observable that we can use to declaratively manipulate asynchronous collections. Doing the same thing in the present implementation with promises would involve making changes all over the place.

Some would argue that we could create a list of promises and use a library like Q to work over it sequentially, or sequentially create a list of promises with Q and then sequentially operate on it (or something better), but that’s not the point. The point is that our use case is to handle a list of asynchronous operations, and Promises are not designed for it. At the end of the day both Promise and Observable are just abstractions for same operation, the one which makes the job easier wins. Easy here is not just the “easy to implement”, but easy includes:

  • easy to think
  • easy to implement
  • easy to read
  • easy to maintain
  • easy to extend
  • easy to reuse

I don’t know about you but the expected behavior of a Promise for me is that it will execute when I want it to, not when I create it. That is what I expected of it when I first tried one. I mean they advertised it as a “unit of asynchronous operation” not as a “unit of asynchronous operation which is already done, here’s the dump it took on you”.

I won’t go into explaining each point about which code stands on which of the above points, since that’s your job. In my opinion Observables encourage good architecture with maintainable/modular code. From this point on we’ll abandon the Promise-based implementation and build little more on Observable implementations. For starters, let’s clean it up and do it the right way.

import superagent from 'superagent';
import {Observable} from 'rx';

let apiUrl,
   inspireHTML,
   getInspiration;

apiUrl = `http://api.icndb.com/jokes/random?limitTo=[nerdy,explicit]`;

inspireHTML = (parentId, inspiration) => {
   let parentNode,
       inspirationalNode;

   parentNode = document.getElementById(parentId);
   inspirationalNode = document.createElement('p');
   inspirationalNode.innerHTML = inspiration;

   parentNode.insertBefore(inspirationalNode, parentNode.firstChild);
};


getInspiration = () => new Promise((resolve, reject) => {
   superagent
       .get(apiUrl)
       .end((err, res) => {
           if (err) {
               return reject(err);
           }

           let data,
               inspiration;

           data = JSON.parse(res.text);
           inspiration = data.value.joke;

           resolve(inspiration);
       });
});

Observable
   .interval(2000)
   .take(10)
   .flatMap(getInspiration)
   .subscribe({
       onNext: (inspiration) => {
           let parentId;

           parentId = `inspiration`;
           inspireHTML(parentId, inspiration);
       },
      onError: (error) => {
           console.error('Error while getting inspired', error);
       },
       onCompleted: () => {
           console.log('Done getting inspired!');
       },
   });

Promises are not bad. They are designed to represent single asynchronous operations and they do their job fine. (Another) Best thing about RxJS is interoperability. They play well with almost all (a)synchronous forms, including regular arrays, promises, generators, events, and even ES7 async functions. Promises in particular are smooth to work with in RxJS, because when we return a promise in an Observable chain, it gets automatically converted to an Observable.

Let’s dissect that code from bottom up again. At the bottom of that code is an Observable chain. Yup, a chain! It’s a chain of small independent operations that do a single task in our operation. Let’s start from the top of this chain.

  • On the top of the chain sits an Observable.interval(2000). Observable.create that we used earlier is one of many ways to create an Observable. Many times (I’d say mostly) we use special purpose operators to create Observable from promises, callbacks, events, and intervals. Observable.interval(N) creates an Observable which emits a value every N milliseconds.
  • It just constantly keeps emitting the values indefinitely. To ask it to stop after emitting 10 values, we use Observable.prototype.take(N). take will accept N values, and then ask its parent to stop. Take, like most other operators, creates an Observable, so we can chain it further.
  • Next in the chain we have a flatMap. If you have never worked with Array-extras like map/filter/reduce, flatMap might be hard to understand at first.

Array.prototype.map takes a function, and you can apply it to each item in the array, and return a new array from returned values of the functions. There is another method not included in the default native array extras (but is provided by utility libraries like underscore/lodash/ramda) called flatten. flatten takes an array, and if that array further contains arrays, it flattens them all and returns a single dimensional array. It converts arrays like [1, 2, 3, [4, 5]] to [1, 2, 3, 4, 5, 6].

flatMap is a combination of map and flatten. It takes a function, invokes it on every item in the collection, and then flattens the resulting collection.

RxJs tries to mimic the Array-extra API, which makes working with asynchronous collections so much more natural and intuitive. So you have map, filter, reduce and friends available on Observables. flatMap is also great. It flattens the Observable within the Observable to a value and passes it down the chain (i.e creates another Observable that emits flattened values, but let’s not get technical).

To understand it well, let’s take another look at our example: .flatMap(getInspiration). getInspiration returns a Promise. As I said earlier, Promises inside the Observable chain get automatically converted to Observable. So we can safely say that getInspiration returns an Observable. If we used a simple map instead of flatMap, it would just return an Observable for each value it gets from Observable.interval, so it would give us an Observable every 2 seconds. But flatMap goes a step ahead and flattens this Observable so it will resolve this Observable, and give us its value. What value does the Promise/Observable returned by getInspiration resolve to? It resolves to the inspiration. So that’s what we get in the next part of the chain. Wow! Such complex task done with such a simple API.

  • Next in the chain is our subscriber, i.e the end of the chain. Here we expect whatever end result we wanted. Here we just do some work on it (append it in our page).

I hope it wasn’t too much for you to grasp. If it was, perhaps you should go ahead and take a tour of this awesome rxjs tutorial.

I am sure you are as impressed by now as I am. The ability to compose asynchronous operations like this brings great power and agility to our code. Plus it is so much fun to code this way. Let’s add some more features for our inspirational app. Why not remove the 10 inspiration limit and instead add a button to stop getting inspired when we feel like we’re done?

JsBin: http://jsbin.com/jenule/1/edit?js,output
import superagent from 'superagent';
import {Observable} from 'rx';

let apiUrl,
   inspireHTML,
   getInspiration,
   stopButtonId,
   stopGettingInspired;

apiUrl = `http://api.icndb.com/jokes/random?limitTo=[nerdy,explicit]`;

inspireHTML = (parentId, inspiration) => {
   let parentNode,
       inspirationalNode;

   parentNode = document.getElementById(parentId);
   inspirationalNode = document.createElement('p');
   inspirationalNode.innerHTML = inspiration;

   parentNode.insertBefore(inspirationalNode, parentNode.firstChild);
};

getInspiration = () => new Promise((resolve, reject) => {
   superagent
       .get(apiUrl)
       .end((err, res) => {
           if (err) {
               return reject(err);
           }

           let data,
               inspiration;

           data = JSON.parse(res.text);
           inspiration = data.value.joke;

           resolve(inspiration);
       });
});

stopButtonId = 'stop';

stopGettingInspired = Observable
   .fromEvent(document.getElementById(stopButtonId), 'click')
   .do((event) => event.preventDefault());

Observable
   .interval(2000)
   .takeUntil(stopGettingInspired)
   .flatMap(getInspiration)
   .subscribe({
       onNext: (inspiration) => {
           let parentId;

           parentId = `inspiration`;
           inspireHTML(parentId, inspiration);
       },
       onError: (error) => {
           console.error('Error while getting inspired', error);
       },
       onCompleted: () => {
           console.log('Done getting inspired!');
       },
   });

Let’s quickly go over what new we’ve added:

  • Stop Button : We’ve added a stop button in the HTML with HTML id stop.
  • In our JavaScript, we created a new Observable for click events on this button. Observable.fromEvent does that for you. I won’t go in much detail here (I am already way out of my allotted space), but this is really powerful. Plus I promised you I’d show you events in action). This allow us to think of click events on our button as any other asynchronous collection, which we can combine with other Observable. That’s what we do.
  • We want to stop our Observable.interval Observable on clicking the stop button. take is one of many operators for restricting an Observable. Another one is takeUntil. takeUntil works exactly like take, but instead of taking a number, it takes another Observable. When this Observable emits a value, it stops its parent Observable. So in our case, when our stopGettingInspired Observable emits a value (it emits the click event when we click the button), our interval is stopped. That’s some great extensibility!

That’s all the fun we going to have for today. If you felt like it was too much, I’d recommend you read this tutorial again: .

Or if you felt it was too simple, I wrote a small (but use-able) RSS reader as a tutorial here.

About the author

Charanjit Singh is a freelance developer based out of Punjab, India. He can be found on GitHub @channikhabra.

LEAVE A REPLY

Please enter your comment!
Please enter your name here