10 min read

Reactive programming is a paradigm where the main focus is working with an asynchronous data flow. Reactive Extensions allow you to work with asynchronous data streams. Reactive Extensions is an agnostic framework (this means it has implementations for several languages), and can be used in other platforms (such as RxJava, RxSwift, and so on). This makes learning Reactive Extensions (and functional reactive programming) really useful, as you can use it to improve your code on different platforms.

One of these types, Reactive Extensions for JavaScript (RxJS) is a reactive streams library for JS that can be used both in the browser or in the server-side using Node.js. RxJS is a library for reactive programming using Observables. Observables provide support for passing messages between publishers and subscribers in your application. Observables offer significant benefits over other techniques for event handling, asynchronous programming, and handling multiple values.

In this article, we will learn about the different types of observables in the context of RxJS and a few different ways of creating them.

This article is an excerpt from the book, Mastering Reactive JavaScript, written by Erich de Souza Oliveira.
RxJS lets you have even more control over the source of your data.  We will learn about the different flavors of Observables and how we can better control their life cycle.

Installing RxJS

RxJS is divided into modules. This way, you can create your own bundle with only the modules you’re interested in. However, we will always use the official bundle with all the contents from RxJS; by doing so, we’ll not have to worry about whether a certain module exists in our bundle or not. So, let’s follow the steps described here to install RxJS.

To install it on your server, just run the following command inside a node project:

    npm i rx@4.1.0 -save

To add it to an HTML page, just paste the following code snippet inside your HTML:

For those using other package managers, you can install RxJS from Bower or NuGet.

If you’re running inside a node program, you need to have the RxJS library in each JavaScript file that you want to use. To do this, add the following line to the beginning of your JavaScript file:

var Rx = require('rx');

The preceding line will be omitted in all examples, as we expect you to have added it before testing the sample code.

Creating an observable

Here we will see a list of methods to create an observable from common event sources. This is not an exhaustive list, but it contains the most important ones.

You can see all the available methods on the RxJS GitHub page (https://github.com/Reactive-Extensions/RxJS).

Creating an observable from iterable objects

We can create an observable from iterable objects using the from() method. An iterable in JavaScript can be an array (or an array-like object) or other iterates added in ES6 (such as Set() and map()). The from() method has the following signature:

Rx.Observable.from(iterable,[mapFunction],[context],[scheduler]);

Usually, you will pass only the first argument. Others arguments are optional; you can see them here:

  • iterable: This is the iterable object to be converted into an observable (can be an array, set, map, and so on)
  • mapFunction: This is a function to be called for every element in the array to map it to a different value
  • context: This object is to be used when mapFunction is provided
  • scheduler: This is used to iterate the input sequence
Don’t worry if you don’t know what a scheduler is. We will see how it changes our observables, but we will discuss it later in this chapter. For now, focus only on the other arguments of this function.

Now let’s see some examples on how we can create observables from iterables.

To create an observable from an array, you can use the following code:

Rx.Observable 
    .from([0,1,2]) 
    .subscribe((a)=>console.log(a));

This code prints the following output:

    0
    1
    2

Now let’s introduce a minor change in our code, to add the mapFunction argument to it, instead of creating an observable to propagate the elements of this array. Let’s use mapFunction to propagate the double of each element of the following array:

Rx.Observable 
    .from([0,1,2], (a) => a*2) 
    .subscribe((a)=>console.log(a));

This prints the following output:

    0
    2
    4

We can also use this method to create an observable from an arguments object. To do this, we need to run from() in a function. This way, we can access the arguments object of the function. We can implement it with the following code:

var observableFromArgumentsFactory = function(){ 
    return Rx.Observable.from(arguments); 
}; 
observableFromArgumentsFactory(0,1,2) 
    .subscribe((a)=>console.log(a));

If we run this code, we will see the following output:

    0
    1
    2

One last usage of this method is to create an observable from either Set() or Map(). These data structures were added to ES6. We can implement it for a set as follows:

var set = new Set([0,1,2]); 
Rx.Observable 
    .from(set) 
    .subscribe((a)=>console.log(a));

This code prints the following output:

    0
    1
    2

We can also use a map as an argument for the from() method, as follows:

var map = new Map([['key0',0],['key1',1],['key2',2]]); 
Rx.Observable 
    .from(map) 
    .subscribe((a)=>console.log(a));

This prints all the key-value tuples on this map:

    [ 'key0', 0 ]
    [ 'key1', 1 ]
    [ 'key2', 2 ]

All observables created from this method are cold observables. As discussed before, this means it fires the same sequence for all the observers. To test this behavior, create an observable and add an Observer to it; add another observer to it after a second:

var observable = Rx.Observable.from([0,1,2]);
observable.subscribe((a)=>console.log('first subscriber receives => '+a));

setTimeout(()=>{ 
observable.subscribe((a)=>console.log('second subscriber receives => '+a)); 
},1000);

If you run this code, you will see the following output in your console, showing both the subscribers receiving the same data as expected:

    first subscriber receives => 0
    first subscriber receives => 1
    first subscriber receives => 2
    second subscriber receives => 0
    second subscriber receives => 1
    second subscriber receives => 2

Creating an observable from a sequence factory

Now that we have discussed how to create an observable from a sequence, let’s see how we can create an observable from a sequence factory. RxJS has a built-in method called generate() that lets you create an observable from an iteration (such as a for() loop). This method has the following signature:

Rx.Observable.generate(initialState, conditionFunction, iterationFunction, resultFactory, [scheduler]);

In this method, the only optional parameter is the last one. A brief description of all the parameters is as follows:

  • initialState: This can be any object, it is the first object used in the iteration
  • conditionFunction: This is a function with the condition to stop the iteration
  • iterationFunction: This is a function to be used on each element to iterate
  • resultFactory: This is a function whose return is passed to the sequence
  • scheduler: This is an optional scheduler

Before checking out an example code for this method, let’s see some code that implements one of the most basic constructs in a program: a for() loop. This is used to generate an array from an initial value to a final value. We can produce this array with the following code:

var resultArray=[]; 
for(var i=0;i 

This code prints the following output:

    [0,1,2]

When you create a for() loop, you basically give to it the following: an initial state (the first argument), the condition to stop the iteration (the second argument), how to iterate over the value (the third argument), and what to do with the value (block). Its usage is very similar to the generate() method. Let’s do the same thing, but using the generate() method and creating an observable instead of an array:

Rx.Observable.generate( 
    0, 
    (i) => i i+1, 
    (i) => i 
).subscribe((i) => console.log(i));

This code will print the following output:

    0
    1
    2

Creating an observable using range ()

Another common source of data for observables are ranges. With the range() method, we can easily create an observable for a sequence of values in a range. The range() method has the following signature:

Rx.Observable.range(first, count, [scheduler]);

The last parameter in the following list is the only optional parameter in this method:

  • first: This is the initial integer value in the sequence
  • count: This is the number of sequential integers to be iterated from the beginning of the sequence
  • scheduler: This is used to generate the values

We can create an observable using a range with the following code:

Rx.Observable 
    .range(0, 4) 
    .subscribe((i)=>console.log(i));

This prints the following output:

    0
    1
    2
    3

Creating an observable using period of time

In the previous chapter, we discussed how to create timed sequences in bacon.js. In RxJS, we have two different methods to implement observables emitting values with a given interval. The first method is interval(). This method emits an infinite sequence of integers starting from one every x milliseconds; it has the following signature:

Rx.Observable.interval(interval, [scheduler]);

The interval parameter is mandatory, and the second argument is optional:

  • interval: This is an integer number to be used as the interval between the values of this sequence
  • scheduler: This is used to generate the values

Run the following code:

Rx.Observable 
    .interval(1000) 
    .subscribe((i)=> console.log(i));

You will see an output as follows; you will have to stop your program (hitting Ctrl+C) or it will keep sending events:

    0
    1
    2

The interval() method sends the first value of the sequence after the given period of interval and keeps sending values after each interval.

RxJS also has a method called timer(). This method lets you specify a due time to start the sequence or even generate an observable of only one value emitted after the due time has elapsed. It has the following signature:

Rx.Observable.timer(dueTime, [interval], [scheduler]);

Here are the parameters:

  • dueTime: This can be a date object or an integer. If it is a date object, then it means it is the absolute time to start the sequence; if it is an integer, then it specifies the number of milliseconds to wait for before you could send the first element of the sequence.
  • interval: This is an integer denoting the time between the elements. If it is not specified, it generates only one event.
  • scheduler: This is used to produce the values.

We can create an observable from the timer() method with the following code:

Rx.Observable 
    .timer(1000,500) 
    .subscribe((i)=> console.log(i));

You will see an output that will be similar to the following; you will have to stop your program or it will keep sending events:

    0
    1
    2

We can also use this method to generate only one value and finish the sequence. We can do this omitting the interval parameter, as shown in the following code:

Rx.Observable 
    .timer(1000) 
    .subscribe((i)=> console.log(i));

If you run this code, it will only print 0 in your console and finish.

We learned about various RxJS Observables and a few different ways of creating them. Read the book,  Mastering Reactive JavaScript, to create powerful applications using RxJs without compromising performance.

Read Next


Subscribe to the weekly Packt Hub newsletter. We'll send you this year's Skill Up Developer Skills Report.

* indicates required

LEAVE A REPLY

Please enter your comment!
Please enter your name here