7 min read

In this section, we are going to learn how to use some functions known as operators, which allow us to manipulate observables in RxJS in different ways.

This post is extracted from the book Hands-On Functional Programming with TypeScript by Remo H. Jansen. In this book, you will delve into the principles, patterns, and best practices of functional and reactive programming.

Pipe

In RxJS, observables have a method named pipe, which is very similar to the pipe operator in functional programming. When we pipe two functions, we generate a new function that passes the return of the first function as arguments to the second function in the pipe.

The idea is very similar in reactive programming. When we pipe an observable through an operator, we generate a new observable. The new observable passes each of the items in the original observable to an operator that transforms them into the items in the new sequence.


We are not going to include a code example here, because we are going to use the pipe method multiple times during the remaining part of this post.

Max

The max operator function can be used to find the biggest value in an observable. We must apply the max operator using the pipe method:

import { from } from "rxjs";
import { max } from "rxjs/operators";
const observable = from([2, 30, 22, 5, 60, 1]);

observable.pipe(max());

const subscription = observable.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The following marble diagram showcases the initial sequence and the result sequence after applying the max operator:

The result sequence contains only one value (the biggest value in the original sequence).

Every

The every operator function can be used to test whether all the values in an observable adhere to a given requirement:

import { from } from "rxjs";
import { every } from "rxjs/operators";
const observable = from([1,2, 3, 4, 5]);

observable.pipe(every(x => x  console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses every operator to test that all the values in an observable are lower than ten. The following marble diagram showcases the initial sequence and the result sequence after applying every operator:

The result sequence contains only one value (true or false).

Find

The find operator function can be used to find the first value in an observable that adheres to a given constraint:

import { from } from "rxjs";
import { find } from "rxjs/operators";
const observable = from([2, 30, 22, 5, 60, 1]);

observable.pipe(find(x => x > 10));

const subscription = observable.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses the find operator to find the first value in an observable greater than ten. The following marble diagram showcases the initial sequence and the result sequence after applying the find operator:

The result sequence contains only one value (the first value in the stream that matches the given constraint).

Filter

The filter operator function can be used to find the values in an observable that adhere to a given constraint:

import { from } from "rxjs";
import { filter } from "rxjs/operators";
const observable = from([2, 30, 22, 5, 60, 1]);

observable.pipe(filter(x => x > 10));

const subscription = observable.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses the filter operator to find the values in an observable greater than ten. The following marble diagram showcases the initial sequence and the result sequence after applying the filter operator:

The result sequence contains only some values (the values in the stream that match the given constraint).

Map

The map operator function can be used to transform the values in an observable into derived values:

import { from } from "rxjs";
import { map } from "rxjs/operators";
const observable = from([1, 2, 3]);

observable.pipe(map(x => 10 * x));

const subscription = observable.subscribe(
 (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses the map operator to transform the values in an observable into new values (the original value multiplied by ten). The following marble diagram showcases the initial sequence and the result sequence after applying the map operator:

The result sequence contains a new mapped value for each value in the original sequence.

Reduce

The reduce operator function can be used to transform all the values in an observable into one single value:

import { from } from "rxjs";
import { reduce } from "rxjs/operators";
const observable = from([1, 2, 3, 3, 4, 5]);

observable.pipe(reduce((x, y) => x + y));

const subscription = observable.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses the reduce operator to transform the values in an observable into a new single value (the total of all the values). The function that transforms multiple values into one single value is known as an accumulator. The following marble diagram showcases the initial sequence and the result sequence after applying the reduce operator:

The result sequence contains only one value (the result of the accumulator).

Throttle

The throttle operator function can be used to reduce the number of values that are added to an observable:

import { fromEvent, interval } from "rxjs";
import { throttle, mapTo, scan } from "rxjs/operators";
const observable = fromEvent(document, "click")
                    .pipe(mapTo(1))
                    .pipe(throttle(x => interval(100)))
                    .pipe(scan((acc, one) => acc + one, 0));

const subscription = observable.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet creates an observable for click events. Every click will add an item to the sequence. The example also uses the pipe method and the mapTo function to map all the click events to the numeric value 1. It is then when we use the throttle operator to reduce the number of values that are added to the sequence. If two or more click events take place within a time interval lower than the one declared by the interval, only the first value will be added to the sequence.

The following marble diagram showcases the initial sequence and the result sequence after applying the reduce operator:

The result sequence only contains some values because the values that take place too close in time are ignored.

Merge

The merge operator function can be used to merge the values of two observables into value pairs:

import { from } from "rxjs";
import { merge } from "rxjs/operators";
const observableA = from([20, 40, 60, 80, 100]);
const observableB = from([1, 1]);

const observableC = observableA.pipe(merge(observableB));

const subscription = observableC.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses the merge operator to combine the values of two observables into a new observable. The values are ordered chronologically. The following marble diagram showcases the initial sequences and the result sequence after applying the merge operator:

The result sequence contains the values of both observables ordered in the same sequence as they took place in time.

Zip

The zip operator function can be used to merge the values of two observables into value pairs:

import { from } from "rxjs";
import { zip } from "rxjs/operators";
const observableA = from([1, 2, 3, 3, 4, 5]);
const observableB = from(["A", "B", "C", "D"]);

const observableC = observableA.pipe(zip(observableB));

const subscription = observableC.subscribe(
    (value) => console.log(value)
);

subscription.unsubscribe();

The preceding code snippet uses the zip operator to combine the values of two observables into a new observable. The values in the new observable are value pairs that contain a value from the first observable and a value from the second observable and are grouped by their index in the sequence. The following marble diagram showcases the initial sequences and the result sequence after applying the zip operator:

The result sequence contains the values of both observables merged into single value pairs.

In this post, we learned different types of operators, which allow us to manipulate observables in RxJS in different ways.  To further understand the pros, cons, and core principles of functional programming in TypeScript, read our book Hands-On Functional Programming with TypeScript.

Read Next

What makes functional programming a viable choice for artificial intelligence projects?

Why functional programming in Python matters: Interview with best selling author, Steven Lott

Introducing Coconut for making functional programming in Python simpler

Content Marketing Editor at Packt Hub. I blog about new and upcoming tech trends ranging from Data science, Web development, Programming, Cloud & Networking, IoT, Security and Game development.