Chapter 1. The Power of the Reactive Paradigm

  • Observables: These are a representation of the data streams that notify the observers of any change.

  • Observers: These are the consumers of the data streams emitted by observables.

Using RxJS in Angular and its advantages

The HTTP client module

HttpClient.get() method
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { environment } from '@env/environment';

const BASE_PATH = environment.basePath;

@Injectable()
export class RecipesService {
	constructor(private http: HttpClient) { }
	getRecipes(): Observable<Recipe[]> {
		return this.http.get<Recipe[]>(`${BASE_PATH}/recipes/search/all`); (1)
	}
}
1 возвращаем Observable для которого впоследствии можно выполнить subscribe
  • Observables are cancellable, so you can cancel the HTTP request whenever you want by calling the unsubscribe method.

  • Also, you can retry HTTP requests when an error occurs or an exception is thrown.

  • The server’s response cannot be mutated by observables, although this can be the case when chaining then() to promises.

The router module

RouterEvent
import { Injectable } from '@angular/core';
import { Router, RouterEvent } from '@angular/router';
import { filter } from 'rxjs/operators';

@Injectable()
export class CustomRouteService {
    constructor(public router: Router) {
        this.router.events.pipe(
            filter(event => event instanceof RouterEvent)
        ).subscribe((event: RouterEvent) => {
            console.log(`The current event is : ${event.id} | event.url`);
        });
    }
}
NavigationStart
import { Injectable } from '@angular/core';
import { NavigationStart, Router } from '@angular/router';
import { filter } from 'rxjs/operators';

@Injectable()
export class CustomRouteService {
    constructor(public router: Router) {
        this.router.events.pipe(
            filter(event => event instanceof NavigationStart)
        ).subscribe((event: NavigationStart) => {
            console.log(`The current event is : ${event.id} | event.url`);
        });
    }
}
The activated route
ActivatedRoute
class ActivatedRoute {
    snapshot: ActivatedRouteSnapshot
    url: Observable<UrlSegment[]>
    params: Observable<Params>
    queryParams: Observable<Params>
    fragment: Observable<string | null>
    data: Observable<Data>
    outlet: string
    component: Type<any> | string | null
    routeConfig: Route | null
    root: ActivatedRoute
    parent: ActivatedRoute | null
    firstChild: ActivatedRoute | null
    children: ActivatedRoute[]
    pathFromRoot: ActivatedRoute[]
    paramMap: Observable<ParamMap>
    queryParamMap: Observable<ParamMap>
    toString(): string
}
RecipesComponent
import { Component, OnInit } from '@angular/core';
import { ActivatedRoute } from '@angular/router';

@Component({
    selector: 'app-recipes',
    templateUrl: './recipes.component.html'
})
export class RecipesComponent implements OnInit {
    criteria: any;
    constructor(private activatedRoute: ActivatedRoute) { 
    }
    ngOnInit() {
        this.activatedRoute.url
            .subscribe(url => console.log('The URL changed to: ' + url));
        this.activatedRoute.queryParams.subscribe(params => {
            this.processCriteria(params.criteria);
        });
    }
    processCriteria(criteria: any) {
        this.criteria = criteria;
    }
}

Reactive forms

FormControl
class FormControl extends AbstractControl {
    //other properties here
    valueChanges: Observable<any>
    statusChanges: Observable<any>
}
MyComponent
import { Component, OnInit } from '@angular/core';
import { FormGroup } from '@angular/forms';

@Component({
    selector: 'app-recipes',
    templateUrl: './recipes.component.html'
})
export class MyComponent implements OnInit {
    form!: FormGroup;
    ngOnInit() {
        const ratingControl = this.form.get('rating');
        ratingControl?.valueChanges.subscribe(
            (value) => {
                console.log(value);
            }
        );
    }
}

The event emitter

EventEmitter
class EventEmitter<T> extends Subject {
    constructor(isAsync?: boolean): EventEmitter<T>
    emit(value?: T): void
    subscribe(
        next?: (value: T) => void, 
        error?: (error: any) => void, 
        complete?: () => void): Subscription
}
RecipesComponent
import { Component, Output } from '@angular/core';
import { EventEmitter } from 'events';

@Component({
    selector: 'app-recipes',
    templateUrl: './recipes.component.html'
})
export class RecipesComponent {
    constructor() { }
    @Output() updateRating = new EventEmitter();
    updateRecipe(value: string) {
        this.updateRating.emit(value);
    }
}

The async pipe

<div *ngIf="data$ | async"></div>

Chapter 2. RxJS 7 – The Major Features

Understanding the toPromise() deprecation

Last value
const source$ = new Observable<string>(observer => {
    observer.next('Hello');
    observer.next('World');
    observer.complete();
});

hello();

async function hello() {
    const value = await source$.toPromise();
    console.log(value);
}
//console output
//World
Undefined value
const source$ = new Observable<string>(observer => {
    observer.complete();
});

hello();

async function hello() {
    const value = await source$.toPromise();
    console.log(value);
}
//console output
//undefined

The firstValueFrom() method

import { firstValueFrom, lastValueFrom } from 'rxjs';

const source$ = new Observable<string>(observer => {
    observer.next('Hello');
    observer.next('World');
    observer.complete();
});

hello();

async function hello() {
    const value = await firstValueFrom(source$);
    console.log(value);
}
//console output
//Hello

The lastValueFrom() method

import { firstValueFrom, lastValueFrom } from 'rxjs';

const source$ = new Observable<string>(observer => {
    observer.next('Hello');
    observer.next('World');
    observer.complete();
});

hello();

async function hello() {
    const value = await lastValueFrom(source$);
    console.log(value);
}
//console output
//World

Empty errors

No elements in sequence
import { lastValueFrom } from 'rxjs';

const source$ = new Observable<string>(observer => {
    observer.complete();
});

hello();

async function hello() {
    const value = await lastValueFrom(source$);
    console.log(value);
}
//console output
//Error: no elements in sequence
defaultValue
const source$ = new Observable<string>(observer => {
    observer.complete();
});

hello();

async function hello() {
    const value = await lastValueFrom(source$, {
        defaultValue: 'DEFAULT'
    });
    console.log(value);
}
//console output
//DEFAULT

Chapter 4: Fetching Data as Streams

Chapter 5. Error Handling

See also:

Exploring error handling patterns and strategies