1 of 41

Mastering Reactive Programming with RxJS

December 7, 2019

Saif Eddine JERBI

@JerbiSaif

2 of 41

Saif Eddine Jerbi

Frontend Tech Lead @Vermeg

  • Angular - NodeJS
  • Help developers scale fast

3 of 41

Reactive Programming is a declarative programming paradigm concerned with data streams and the propagation of change.

4 of 41

ReactiveX.io

5 of 41

RxJS

  • Observables
  • Subjects
  • Static Function & Operators

6 of 41

RxJS

  • Observables
  • Subjects
  • Static Function & Operators
  • Scheduler

7 of 41

RxJS

  • Observables
  • Subjects
  • Static Function & Operators
  • Scheduler

8 of 41

Observables

  • Sequence of events
  • Zero to n values
  • Pushed over time
  • Cancellable
  • Lazy

9 of 41

�source$.subscribe(

value => doStuff(value)

);

Subscription

10 of 41

�source$.subscribe(

value => doStuff(value),

err => handleError(err)

);

Subscription

11 of 41

�source$.subscribe(

value => doStuff(value),

err => handleError(err),

() => console.log(‘complete’)

);

Subscription

12 of 41

�source$.subscribe(

value => doStuff(value)

);

Subscription

13 of 41

const subscription = source$.subscribe(

value => doStuff(value)

);

Cancellable

14 of 41

const subscription = source$.subscribe(

value => doStuff(value)

);

// later you can cancel it

subscription.unsubscribe();

Cancellable

15 of 41

Developers subscribes to Observables events/data, so what if you wanna push some data to Subscribers

16 of 41

Subscriber 1

Subjects

  • Subject

Subject

Subscribe

1

1

Subscribe

Subscriber 2

2

2

2

17 of 41

Subscriber 1

Subjects

  • Behavior Subject

Behavior

Subject

Subscribe

1

1

Subscribe

Subscriber 2

2

2

2

2

3

3

3

18 of 41

Subscriber 1

Subjects

  • Replay Subject

Replay

Subject

Subscribe

1

1

Subscribe

Subscriber 2

2

2

1

1

2

2

3

3

3

19 of 41

Subscriber 1

Subjects

  • Async Subject

Async

Subject

Subscribe

1

2

3

3

Completed

20 of 41

const mySource$ = new Subject<any>();

21 of 41

const mySource$ = new Subject<any>();

mySource$.asObservable().subscribe(

value => doStuff(value)

);

22 of 41

const mySource$ = new Subject<any>();

mySource$.asObservable().subscribe(

value => doStuff(value)

);

// Push data

mySource$.next(‘foo’);

mySource$.next(‘bar’);

mySource$.complete();

23 of 41

Observables are a sequence of events/values emitted over time

24 of 41

Observables are a collections of events/values emitted over time

25 of 41

Collections => Operators

  • Filtering
  • Transforming
  • Accumulating
  • Joining
  • & more...

26 of 41

Arrays

Observables

const data = [1, 2 , 3, 4];

data.filter(v => v % 2)

.map(v => v + 1)

.reduce((res, v) => res + v, 0);

// 6

data$.pipe(

filter(v => v % 2),

map(v => v + 1),

reduce((res, v) => res + v, 0)

);

data$.subscribe(res=> console.log(res));

// 6

27 of 41

Arrays

Observables

const data = [1, 2 , 3, 4];

data.filter(v => v % 2)

.map(v => v + 1)

.reduce((res, v) => res + v, 0);

// 6

const data$ = from([1, 2 , 3, 4]);

data$.pipe(

filter(v => v % 2),

map(v => v + 1),

reduce((res, v) => res + v, 0)

);

data$.subscribe(res=> console.log(res));

// 6

28 of 41

RxJS provide more than 100 operators to handle Sync/Async process

29 of 41

Let’s move to real life examples

30 of 41

switchMap

// When navigation in a PaginationBar

const data$ = pageChanges$.pipe(

switchMap(page => fetchDataFromAPI(page))

);

31 of 41

debounceTime

// Wait for the user to type a query

const suggestions$ = inputText$.pipe(

debounceTime(300),

switchMap(query => fetchPossibleValuesFromAPI(query))

);

32 of 41

filter

// Wait for the user to type a query that contains 3+ chars

const suggestions$ = inputText$.pipe(

debounceTime(300),

filter(query => query.length > 2),

switchMap(query => fetchPossibleValuesFromAPI(query))

);

33 of 41

distinctUntilChanged

// Wait for the user to type a new query that contains 3+ chars

// different than the last executed

const suggestions$ = inputText$.pipe(

debounceTime(300),

filter(query => query.length > 2),

distinctUntilChanged(),

switchMap(query => fetchPossibleValuesFromAPI(query))

);

34 of 41

exhaustMap

// User could click on save button twice, so we should ignore the

// second click

const saveAction$ = saveClick$.pipe(

exhaustMap(data => postToAPI(data))

);

35 of 41

mergeMap

// User have a list of items to validate displayed into a grid

// => Running the same action validate but with different data

const validateDataAction$ = validateData$.pipe(

mergeMap(data => postToAPI(data))

);

36 of 41

fromEvent

// Fork node js process and listen to messages emitted

const forkedProcess = fork(‘child.js’);

const message$ = fromEvent(forkedProcess, ‘message’);

const success$ = message$.pipe(

filter(msg => msg === ‘success’)

);

const failure$ = message$.pipe(

filter(msg => msg === ‘failure’)

);

37 of 41

zip

// Return status of forked process

const prc1$ = fromEvent(forkedProcess1, ‘exit’);

const prc2$ = fromEvent(forkedProcess2, ‘exit’);

const prc3$ = fromEvent(forkedProcess3, ‘exit’);

const fullProcess$ = zip(prc1$, prc2$, prc3$).pipe(

map(([prc1, prc2, prc3]) => ‘completed’)

);

38 of 41

bindNodeCallback

// Transform node callbackfunction to Observable

import * as fs from 'fs';

const readFileAsObservable = bindNodeCallback(fs.readFile);

const result$ = readFileAsObservable('./roadNames.txt', 'utf8');

39 of 41

100+ Operators

Transformation

Filtering

Joining

Buffer

groupBy

Pairewaise

Scan

...

take/takeWhile/takeUntil

skip/skipWhile/skipUntil

Last, first

Throttle, audit

...

combineLatest

Concat

forkJoin

Race

...

40 of 41

41 of 41

Thank You!

Saif Eddine JERBI

@JerbiSaif