RxJS (Reactive Extensions Library for JavaScript) is a really popular and helpful library. Today I will share with you a list of the top 10 most common RxJS operators.
Before we start with the operators, we have to know what is RxJS and reactive programming, and why should we use them.
Reactive programming
Reactive programming is programming with async data streams where we observe these streams and ‘react’ when a value is emitted. In this approach, we are programming using, and relying on on, events instead of the order of lines in the code.
Think about this as a news portal where you subscribe to their news feed ‘in our case observable’ which streams their news (‘Events/Messages..’) to us and we ‘react’ when we see a new post.
We are here subscribing to news portals that are interesting to us and reading the news we like. – You can think this way.
RxJS
“RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.”, the official page says.
I would say that RxJS is a powerful toolbox that gives us (developers) an arsenal of functions to create, transform and manipulate the data streams in an easy way.
At the moment of writing this post, the latest version of RxJS is 7, so I will stick to it and adopt the code to follow the latest changes.
Creation
Operators for creating allow the creation of an observable from almost anything and turn everything into a stream.
From
From operator creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.
Example:
// RxJS v7+
import { from } from 'rxjs';
//create observable from array
const arr$ = from([1, 2, 3, 4, 5]);
const subscribe = arr$.subscribe(console.log);
/*console log output:
1
2
3
4
5
*/
n the example above we can see that the array has been logged as a sequence of items, or as a stream of items.
Also, we could provide a string and get all characters of the string as a sequence as well. For example, we could add the following:
const arr$ = from('12345');
This would also print a sequence of 5 items but as string values.
Of
Of operator converts input arguments to an observable sequence, similarly as in the from but this one prints whole elements, not the data inside as a squence. Take a look at the following example.
Example:
// RxJS v7+
import { of } from 'rxjs';
//creating a sequence of provided input elements
const elements$ = of(
1, 2, 3, 4, 5,
['6','7','8'],
{new: 'new'},
'test'
);
/*
Output:
1
2
3
4
5
[ '6', '7', '8' ]
{ new: 'new' }
'test'
*/
const subscribe = elements$.subscribe(console.log);
As we can see in the example above all elements have been logged as a sequence, but the values are still inside, not chunked.- In the other words it does not do any flattening and emits each argument in whole as a separate next notification.
Combination
If you need to join multiple observables combination operators are the tool you are looking for.
ConcatWith
If you are wondering why I haven’t mentioned here Concat operator instead that’s because it is deprecated and will be removed in version 8 of RxJS, so the ConcatWith operator is the successor.
ConcatWith emits all of the values from the source observable, then, once it completes, subscribes to each observable source provided, one at a time, emitting all of their values, and not subscribing to the next one until it completes.
Example:
// RxJS v7+
import { of } from 'rxjs';
import { concatWith } from 'rxjs/operators';
const first$ = of(1,2,3);
const second$ = of(4,5,6);
const third$ = of(7,8,9);
const subscribe = first$
.pipe(concatWith(second$, third$))
.subscribe(console.log);
/*
Output:
1
2
3
4
5
6
7
8
9
*/
As we can see in the example, concatWith is a bit different from Concat, as here we are adding the first observable, piping it, and adding other observables with which we are concatenating.
Here is the comparison:
concat(a$, b$, c$)
// is the same as
a$.pipe(concatWith(b$, c$))
Merge
The merge operator creates an output Observable that concurrently emits all values from every given input Observable. So this is similar to concatWith, but merge is not waiting for the first observable to complete with emitting events, but instead, it combines all with their times as a stream. It would be more clear in the example below.
Example:
// RxJS v7+
import { merge } from 'rxjs/operators';
import { interval } from 'rxjs';
//emit every second
const interval1$ = interval(1000);
//emit every 3 seconds
const interval2$ = interval(3000);
// merged intervals
const mergedIntervals$ = interval1$.pipe(merge(interval2$));
const subscribe = mergedIntervals$.subscribe(v => console.log(v));
/*
Output:
0
1
2
0 -> Fires every 3 seconds
3
4
5
1 -> Fires every 3 seconds
6
7
8
2 -> Fires every 3 seconds
9...
*/
Transformation
Transformation operators transform values as they pass through the operator chain.
Map
Map operator applies projection with each value from the input source.
Example:
// RxJS v7+
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const xTen$ = of(1,2,3,4,5).pipe(
map((v) => v*10)
).subscribe(console.log)
In the example, we used the map operator to multiply each value from the stream by 10.
Filtering
Filtering operators provide options to filter data from the stream and return what passes given conditions.
Filter
The filter operator emits values that pass a provided condition.
Example:
// RxJS v7+
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
const stream$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//filter even numbers
const even = stream$.pipe(filter(num => num % 2 === 0)).subscribe(console.log);
/*
Output:
2
4
6
8
10
*/
TakeUntil
TakeUntil operator emits values until provided observable emits.
Example:
// RxJS v7+
import { timer, interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
// Emits value after 5 seconds
const stop$ = timer(5000);
// Emits value every second
const interval$ = interval(1000);
const subscription = interval$
.pipe(
takeUntil(stop$))
.subscribe(console.log);
//
DistinctUntilChanged
DistinctUntilChanged will emit values in a sequence if the current value is different than the previous one. For example, if we have a stream of 1,2,2,3 the result will be 1,2,3.
Example:
// RxJS v7+
import { of } from ‘rxjs’;
import { distinctUntilChanged } from ‘rxjs/operators’
// Strem from numbers
const someData$ = of(1, 2, 3, 3, 3, 4, 4, 5);
const subscription = someData$.pipe(distinctUntilChanged()).subscribe(console.log);
/*Output:
1
2
3
4
5
*/
As we could see in the example above, It doesn’t matter how many of the same values we have in a row, only one would be printed.
Utility operators
Powerful utilities for our observables, from logging, side effects, handling notifications…
Tap
The tap operator performs side effects, and it is used when we want to affect the outside state with a notification without altering the notification. The most common use of tap is actually for debugging purposes.
// RxJS v7+
import { of } from ‘rxjs’;
import { tap, map } from ‘rxjs/operators’;
const subscription = of(Math.random()).pipe(
tap(console.log),
map(n => n > 0.5 ? 'big' : 'small')
).subscribe(console.log);
The example above shows us how the tap operator does a side effect => and logs a random value so we could see it before the ternary operator.
Multicasting operators
Multicasting operators make an observable hot or multicast.
ShareReplay
The shareReplay allows us to have previously emitted values available to new subscribers that are late.
You would want to use the shareReplay operator when you have side effects or taxing computations that you do not wish to be executed amongst multiple subscribers.
A successfully completed source will stay cached in the shareReplayed observable forever, but an errored source can be retried.
Example:
In this example I am using Angular (just for demonstrating purposes), to show you how shareReplay can save on a number of requests to a server.
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
import { map, shareReplay } from 'rxjs/operators';
import { ApiService } from './http/api.service';
// Here I have an interface/model for my post observables
interface Posts {
userId: number;
id: number;
title: string;
body: string;
}
@Component({
selector: 'app-public',
templateUrl: './public.component.html',
styleUrls: ['./public.component.scss'],
})
export class PublicComponent implements OnInit {
// Defined observables of the post type
posts$: Observable<Posts[]>;
first10$: Observable<Posts[]>;
last10$: Observable<Posts[]>;
constructor(private api: ApiService) {}
ngOnInit(): void {
// Getting posts from an API and storing the data in the posts$ observable
this.posts$ = this.api.getPosts().pipe(
map((val: Posts[]) => val),
// Using shareReplay to share the data from API request to other subscribers, and to avoid multiple API requests
shareReplay()
);
// Filtering first 10 posts by ID
this.first10$ = this.posts$.pipe(
map((post) => post.filter((post) => post.id < 10))
);
// Filtering last 10 posts by ID
this.last10$ = this.posts$.pipe(
map((post) => post.filter((post) => post.id > 90))
);
/* Usually this logic is going to call the API 3 times for every observable. But, as we used shareReplay operator,
It will share the data from the 'posts$' observable to 'first10$' and te 'last10$' observables */
}
}
<h2>First 10 posts</h2>
<ul>
<li *ngFor="let post of first10$ | async">{{post.title}}</li>
</ul>
<h2>Last 10 posts</h2>
<ul>
<li *ngFor="let post of last10$ | async">{{post.title}}</li>
</ul>
<h2>All posts</h2>
<ul>
<li *ngFor="let post of posts$ | async">{{post.title}}</li>
</ul>
As you could notice, I am using the async pipe from Angular, which automatically subscribes and unsubscribes from observables.
Here is a screenshot showing number of requests before and after shareReplay():
Conclusion
As you could see in the examples above, RxJS is a powerful library that brings a lot of benefits to us as developers. There are way more operators which you can find on the official page. I picked up some of the most used operators, there is a more common, but the list would be a bit longer. The examples are here just to show you some of the functionalities and could be expanded or shortened as well.