RxJS in Angular

Learn via video courses
Topics Covered

RxJS, the Reactive Extension for JavaScript, is integral to Angular for managing asynchronous data streams and application state reactivity. Using observables, RxJS enhances Angular's readability and maintainability, seamlessly integrating with other JavaScript frameworks. This article delves into the benefits of RxJS in Angular and its practical application.

HTTP

  • Http methods return an observable data stream.
  • The returned observables are completed once the API call finishes.
  • This observable stream can be extended using pipe with a collection of RxJS operators.

Usage

employee.component.ts

The employee component is a simple component that displays the list of employees. To display the same it accepts the employees collection as an @Input property. *ngFor directive is used to iterate template representation.

We've created a simple EmployeeService here. It has the getAll method. It makes an HTTP GET(http.get) API call. Returns an Observable of type Employeee[].

app.component.ts

App component on init makes an API call using http.get method. We subscribe to the observable, and as soon as result received we bind that value to employees component variable.

Usage of Async Pipe

  • Async pipe helps to extract data from observable.
  • It automatically subscribe's' to observable and update the binding.
  • It unsubscribe from observable when the component is destroyed.
  • No need to maintain and destroy subscriptions on UI, otherwise onDestroy we have to unsubscribe from all the subscriptions manually.

employee.component.ts

Inside the employee component, it just displays the received employee's collection.

app.component.ts

App component [employees]="employees$ | async" pass employees input binding with async pipe. It automatically passes value after data is received from the server.

Router

  • Router provides multiple streams/observables to listen.

For eg.

Listen toProvider to be injectedOperation Typecode
Router eventsrouter: RouterRouterEventthis.router.events.subscribe((event: RouterEvent) => { })
Route Paramsroute: ActivatedRouteObservable<Params> / RouterSnapshot / ActivatedRoutethis.router.params
Route QueryParamsroute: ActivatedRouteObservable<Params>this.route.queryParams
Route level dataroute: ActivatedRouteObservable<Data>this.route.data
Route level fragmentroute: ActivatedRouteObservable<string>this.route.fragment

Reactive Forms

  • Angular reactive form is made to handle forms in a reactive way.
  • All data will be emitted from the FormControl data stream (observable).
  • You can subscribe to a stream or combine multiple streams to reactively deal with the form changes.

app.component.ts

We've used an input box with formControl (reactive form control). Inside ngOnInit we subscribed to control.valueChanges event, it listens to observable stream itself.

Demo

Reactive forms demo

What are Subjects?

The subject is a special kind of Observable that does both things 1. Act as an Observable 2. Act as an observer

Observable and observer, they both are kind of exactly opposite of each other.

Observer is a function that gets called when a stream emits a value to where it is listening to. Generally, that happens by the call subscribe method with a callback function.

Observable is a special function that allows us to emit value by calling .next method with data as a parameter. That emitted data listen by all the observer's/subscription's.

The subject also keeps a reference to all its observers. So, whenever any value is pushed inside it, it executes these observers. Indeed, one observable implementation is shared across observers. Hence we can also call the subject as multicast observable.

How do Subjects Work

Mainly subject has 3 methods.

  • next - It is used for pushing values over the stream
  • subscribe - subscribe for listening to events.
  • complete - to complete the observable stream. It will all references of observers after complete method is called.

Creating a Subject in Angular

To create a Subject first import Subject dependency from rxjs

Then instantiate a Subject with the return type

Next, we can define subscribe to the Subject and put a value inside a stream using .next operator.

In the above example, on the 1st line, we emit test1 value; at that time, there was no subscription on the subject. Hence nothing would be printed in the console. On the 2nd line, the subscribe function is called. So on the very next line (6th line), another value test2 is emitted on the stream. And it prints the sub 1, test2 value in the console. Afterwards, another subscription is made. Line 10 put another value in the stream, that calls both the subscription. That clearly proves the Subject is multicast.

Subject is an Observable

A Subject is like an Observable but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners. You can subscribe to a Subject.

The subject has a few variants like

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

You may need to use one of these based on your need.

Usage of Subjects in Angular Services

The subject can be used to store a state. Over here, we can consider storing a navbar state value in the form of NavbarState type.

navbar.service.ts

NavbarService have

  • navbarState - holds subject instance of type NavbarState. It is also defined as private variable, to maintain level of abstraction.
  • navbarStateObservable exposes Subject as data stream (observable).
  • setNavbarState is a setter assist to set new value to subject.

app.component.ts

AppComponent listens to navbarService's navbarStateObservable. And react based on any changes in the navbar state. onDestroy$ is used for unsubscribe the subscriptions.

Transmitting Data Between Components

We can use RxJS while communicating between components. A child component can send data back to the parent component using EventEmitter object. EventEmitter extends Subject.

file-extractor.component.ts

The file-extract component extracts data from the file. It accepts file paths in file Input property. On click of Extract button, it triggers the extraction process. And once it is finished, it emits (similar to next function of Subject) a value inside extracted EventEmitter.

app.component.ts

From the app component, we've passed filePath to file-extractor component. Listening to extracted event with onExtracted($event). Basically, when the file extractor component emits a value inside the extracted EventEmitter, it triggers onExtracted method of the app component with $event emitted value (true).

Angular Change Detection OnPush Strategy

  • Angular Change detection has a couple of strategies.
    • Default
    • OnPush.
  • When a component is on Default ChangeDetectionStrategy, it could run change detection for a component multiple times, even though it is not expected to run there.
  • If there are more than 5k bindings on the page, you may see a significant difference in UI (you may lose some frames depending on the use case).
  • This problem can be addressed by switching to the OnPush strategy. This strategy is widely known for performance optimizations.
  • OnPush strategy on the component level can be configured by setting the metadata option changeDetectionStrategy

Change Detection - It is a mechanism to update view bindings when anything changes in a modelValue.

To understand this step by step, how OnPush change detection works.

Create App Component

Create AppComponent, Html contains my-cmp component. ngModel responsible for two way binding for name variable.

app.component.ts

Create My Component

My component is super simple, it has name and seconds binding on the HTML. ngOnInit has an interval observable function. Subscription on interval gradually increment a seconds value.

my.component.ts

Demo

Create App Component

Convert My Component to use OnPush Strategy

After converting the component to OnPush, it stops updating the seconds value. But in the console can see the updated value is printed.

Create App Component 2

Surprisingly, if we change the input value of textbox written with 'Pankaj' text, it updates the seconds value. OH, WAIT! what must be going on there 🤔? isn't that weird? Don't worry! we will see what happened there.

OnPush change detection strategy enables components to run change detection only when

  • the Input properties of the component have changed.
  • any Output event emit a value and it is listened to event by the consumer component.
  • a DOM event listened on component
  • explicitly triggers change detection using the detectChanges/markForCheck method.

Yes, you're right, we see that from the above statements, as per 1st rule it changed the value. And the latest value of seconds is updated on UI. Likewise, you can similarly see value updates for other mentioned reasons.

Create App Component 3

Let's verify another principle out of the above 4. We can check by running change detection manually using detectChanges method on ChangeDetectorRef.

And now it automatically updates the values as soon as detectChanges method is called.

Create App Component 4

Code Example to Use RxJS in Component and Service with OnPush Strategy

We will extend the above example here. It feels like we can make it better rather than calling detectChanges method explicitly. Can we do this implicitly? Yes, using async pipe, we can do that.

We have seen in the last section that the async pipe subscribes to the observable automatically and extracts the value on the template. But there is a catch. When we say it subscribes to observable, and that's what we did in the previous problems like interval(1000).subscribe, still it wasn't updating view bidding. That's true. Async pipe is not only just subscribe to observable but also calls markForCheck method on component ChangeDetectorRef. Hence async pipe can be a way to go to use when dealing with observable and display value on UI from observable.

Now directly keep a value of interval(1000) value to seconds varaible. And replace seconds binding with {{ seconds | async }} expression. Rest of the magic done by async as aformentioned explanation.

Next, We are going to look example where we play with Observable received from service, inside OnPush strategy component.

Let's assume, we've the employee component is a simple component that displays the list of employees. To display the same it accepts the employees collection as an @Input property.

employee.service.ts

employee.component.ts

app.component.ts

Here we've received Observable value from employeeService.getAll() method. The variable stores that value in the employee$ variable. When we pass these observables to the app-employee component, we use the async pipe. That way we don't have any subscription as such.

Conclusion

  • We have learned how RxJS in angular has been used.
  • How to work with subjects?
  • How HTTP and Router APIs are using RxJS?
  • How RxJS async pipe can be used to display data on UI and that too inside component with OnPush change detection strategy?