7 min read

Reactive programming requires us to change the way that we think about events in an application. Reactive programming requires us to think about events as a stream of values. For example, a mouse click event can be represented as a stream of data. Every click event generates a new value in the data stream. In reactive programming, we can use the stream of data to query and manipulate the values in the stream.

Observables are streams of data, and this explains why it is easy to imagine that we can represent an event such as an onClick event using an observable. However, the use cases for observables are much more diverse than that. In this article, we are going to explore how to create an observable given different types.

This article is taken from the book Hands-On Functional Programming with TypeScript by Remo H. Jansen. In this book, you will discover the power of functional programming, lazy evaluation, monads, concurrency, and immutability to create succinct and expressive implementations.

Creating observables from a value

We can create an observable given a value using the of function. In the old versions of RxJS, the function of was a static method of the Observable class, which was available as Observable.of. This should remind us to use the of method of the Applicative type in category theory because observables take some inspiration from category theory. However, in RxJS 6.0, the of method is available as a standalone factory function:


import { of } from "rxjs";
const observable = of(1);

const subscription = observable.subscribe(
(value) => console.log(value),
(error: any) => console.log(error),
() => console.log("Done!")
);

subscription.unsubscribe();

The preceding code snippet declares an observable with one unique value using the of function. The code snippet also showcases how we can subscribe to an observable using the subscribe method. The subscribe method takes three function arguments:

  • Item handler: Invoked once for each item in the sequence.
  • Error handler: Invoked if there is an error in the sequence. This argument is optional.
  • Done handler: Invoked when there are no more items in the sequence. This argument is optional.

The following diagram is known as a marble diagram and is used to represent observables in a visual manner. The arrow represents the time and the circles are values. In this case, we have only one value:

As we can see, the circle also has a small vertical line in the middle. This line is used to represent the last element in an observable. In this case, the item handler in the subscription will only be invoked once.

Creating observables from arrays

We can create an observable given an existing array using the from function:

import { from } from "rxjs";
const observable = from([10, 20, 30]);

const subscription = observable.subscribe(
(value) => console.log(value),
(error: any) => console.log(error),
() => console.log("Done!")
);

subscription.unsubscribe();

The preceding code snippet declares an observable with three values using the from function. The code snippet also showcases how we can subscribe once more.

The following marble diagram represents the preceding example in a visual manner. The generated observable has three values (10, 20, and 30) and 30 is the last element in the observable:

We can alternatively use the interval function to generate an array with a given number of elements:

import { interval } from "rxjs";
const observable = interval(10);

const subscription = observable.subscribe(
(value) => console.log(value),
(error: any) => console.log(error),
() => console.log("Done!")
);

subscription.unsubscribe();

The preceding code snippet declares an observable with ten values using the interval function. The code snippet also showcases how we can subscribe once more. In this case, the item handler in the subscription will be invoked ten times.

The following marble diagram represents the preceding example in a visual manner. The generating observable has ten values, and 9 is the last item contained by it:

 In this case, the item handler in the subscription will be invoked ten times.

Creating observables from events

It is also possible to create an observable using an event as the source of the items in the stream. We can do this using the fromEvent function:

import { fromEvent } from "rxjs";
const observable = fromEvent(document, "click");

const subscription = observable.subscribe(
(value) => console.log(value)
);

subscription.unsubscribe();

In this case, the item handler in the subscription will be invoked as many times as the click event takes place.

Please note that the preceding example can only be executed in a web browser. To execute the preceding code in a web browser, you will need to use a module bundler, such as Webpack.

Creating observables from callbacks

It is also possible to create an observable that will iterate the arguments of a callback using the bindCallback function:

import { bindCallback } from "rxjs";
import fetch from "node-fetch";
function getJSON(url: string, cb: (response: unknown|null) => void) {
fetch(url)
.then(response => response.json())
.then(json => cb(json))
.catch(_ => cb(null));
}

const uri = "https://jsonplaceholder.typicode.com/todos/1";
const observableFactory = bindCallback(getJSON);
const observable = observableFactory(uri);

const subscription = observable.subscribe(
(value) => console.log(value)
);

subscription.unsubscribe();

The preceding example uses the node-fetch module because the fetch function is not available in Node.js. You can install the node-fetch module using the following npm command:

npm install node-fetch @types/node-fetch

The getJSON function takes a URL and a callback as its arguments. When we pass it to the bindCallback function, a new function is returned. The new function takes a URL as its only argument and returns an observable instead of taking a callback.

In Node.js, callbacks follow a well-defined pattern. The Node.js callbacks take two arguments, error and result, and don’t throw exceptions. We must use the error argument to check whether something went wrong instead of a try/catch statement. RxJS also defines a function named bindNodeCallback that allows us to work with the callbacks:

import { bindNodeCallback } from "rxjs";
import * as fs from "fs";
const observableFactory = bindNodeCallback(fs.readFile);
const observable = observableFactory("./roadNames.txt");

const subscription = observable.subscribe(
(value) => console.log(value.toString())
);

subscription.unsubscribe();

The helpers, bindCallback and bindNodeCallback, have very similar behavior, but the second has been specially designed to work with Node.js callbacks.

Creating observables from promises

Another potential source of items for an observable sequence is a Promise. RxJS also allows us to handle this use case with the from function. We must pass a Promise instance to the from function. In the following example, we use the fetch function to send an HTTP request. The fetch function returns a promise that is passed to the from function:

import { bindCallback } from "rxjs";
import fetch from "node-fetch";
const uri = "https://jsonplaceholder.typicode.com/todos/1";
const observable = from(fetch(uri)).pipe(map(x => x.json()));

const subscription = observable.subscribe(
(value) => console.log(value.toString())
);

subscription.unsubscribe();

The generated observable will contain the result of the promise as its only item.

Cold and hot observables

The official RxJS documentation explores the differences between cold and hot observables as follows:

“Cold observables start running upon subscription, that is, the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers. This is different from hot observables, such as mouse move events or stock tickers, which are already producing values even before a subscription is active. When an observer subscribes to a hot observable sequence, it will get all values in the stream that are emitted after it subscribes. The hot observable sequence is shared among all subscribers, and each subscriber is pushed to the next value in the sequence.”

It is important to understand these differences if we want to have control over the execution flow of our components. The key point to remember is that cold observables are lazily evaluated.

In this article, we learned what observables are and how we can create them and work with them. To know more about working with observables, and other aspects of functional programming, read our book Hands-On Functional Programming with TypeScript.

Read Next

What makes functional programming a viable choice for artificial intelligence projects?

Why functional programming in Python matters: Interview with best selling author, Steven Lott

Introducing Coconut for making functional programming in Python simpler

Content Marketing Editor at Packt Hub. I blog about new and upcoming tech trends ranging from Data science, Web development, Programming, Cloud & Networking, IoT, Security and Game development.