Mastering Reactive Programming with RxJS
December 7, 2019
Saif Eddine JERBI
@JerbiSaif
Saif Eddine Jerbi
Frontend Tech Lead @Vermeg
Reactive Programming is a declarative programming paradigm concerned with data streams and the propagation of change.
ReactiveX.io
RxJS
RxJS
RxJS
Observables
�source$.subscribe(
value => doStuff(value)
);
Subscription
�source$.subscribe(
value => doStuff(value),
err => handleError(err)
);
Subscription
�source$.subscribe(
value => doStuff(value),
err => handleError(err),
() => console.log(‘complete’)
);
Subscription
�source$.subscribe(
value => doStuff(value)
);
Subscription
�const subscription = source$.subscribe(
value => doStuff(value)
);
Cancellable
�const subscription = source$.subscribe(
value => doStuff(value)
);
// later you can cancel it
subscription.unsubscribe();
Cancellable
Developers subscribes to Observables events/data, so what if you wanna push some data to Subscribers
Subscriber 1
Subjects
Subject
Subscribe
1
1
Subscribe
Subscriber 2
2
2
2
Subscriber 1
Subjects
Behavior
Subject
Subscribe
1
1
Subscribe
Subscriber 2
2
2
2
2
3
3
3
Subscriber 1
Subjects
Replay
Subject
Subscribe
1
1
Subscribe
Subscriber 2
2
2
1
1
2
2
3
3
3
Subscriber 1
Subjects
Async
Subject
Subscribe
1
2
3
3
Completed
const mySource$ = new Subject<any>();�
const mySource$ = new Subject<any>();�
mySource$.asObservable().subscribe(
value => doStuff(value)
);
const mySource$ = new Subject<any>();�
mySource$.asObservable().subscribe(
value => doStuff(value)
);
// Push data
mySource$.next(‘foo’);
mySource$.next(‘bar’);
mySource$.complete();
Observables are a sequence of events/values emitted over time
Observables are a collections of events/values emitted over time
Collections => Operators
Arrays
Observables
const data = [1, 2 , 3, 4];�
data.filter(v => v % 2)
.map(v => v + 1)
.reduce((res, v) => res + v, 0);
// 6
data$.pipe(
filter(v => v % 2),
map(v => v + 1),
reduce((res, v) => res + v, 0)
);
data$.subscribe(res=> console.log(res));
// 6
Arrays
Observables
const data = [1, 2 , 3, 4];�
data.filter(v => v % 2)
.map(v => v + 1)
.reduce((res, v) => res + v, 0);
// 6
const data$ = from([1, 2 , 3, 4]);
data$.pipe(
filter(v => v % 2),
map(v => v + 1),
reduce((res, v) => res + v, 0)
);
data$.subscribe(res=> console.log(res));
// 6
RxJS provide more than 100 operators to handle Sync/Async process
Let’s move to real life examples
switchMap
// When navigation in a PaginationBar
const data$ = pageChanges$.pipe(
switchMap(page => fetchDataFromAPI(page))
);
debounceTime
// Wait for the user to type a query
const suggestions$ = inputText$.pipe(
debounceTime(300),
switchMap(query => fetchPossibleValuesFromAPI(query))
);
filter
// Wait for the user to type a query that contains 3+ chars
const suggestions$ = inputText$.pipe(
debounceTime(300),
filter(query => query.length > 2),
switchMap(query => fetchPossibleValuesFromAPI(query))
);
distinctUntilChanged
// Wait for the user to type a new query that contains 3+ chars
// different than the last executed
const suggestions$ = inputText$.pipe(
debounceTime(300),
filter(query => query.length > 2),
distinctUntilChanged(),
switchMap(query => fetchPossibleValuesFromAPI(query))
);
exhaustMap
// User could click on save button twice, so we should ignore the
// second click
const saveAction$ = saveClick$.pipe(
exhaustMap(data => postToAPI(data))
);
mergeMap
// User have a list of items to validate displayed into a grid
// => Running the same action validate but with different data
const validateDataAction$ = validateData$.pipe(
mergeMap(data => postToAPI(data))
);
fromEvent
// Fork node js process and listen to messages emitted
const forkedProcess = fork(‘child.js’);
const message$ = fromEvent(forkedProcess, ‘message’);
const success$ = message$.pipe(
filter(msg => msg === ‘success’)
);
const failure$ = message$.pipe(
filter(msg => msg === ‘failure’)
);
zip
// Return status of forked process
const prc1$ = fromEvent(forkedProcess1, ‘exit’);
const prc2$ = fromEvent(forkedProcess2, ‘exit’);
const prc3$ = fromEvent(forkedProcess3, ‘exit’);
const fullProcess$ = zip(prc1$, prc2$, prc3$).pipe(
map(([prc1, prc2, prc3]) => ‘completed’)
);
bindNodeCallback
// Transform node callbackfunction to Observable
import * as fs from 'fs';
const readFileAsObservable = bindNodeCallback(fs.readFile);
const result$ = readFileAsObservable('./roadNames.txt', 'utf8');
100+ Operators
Transformation
Filtering
Joining
Buffer
groupBy
Pairewaise
Scan
...
take/takeWhile/takeUntil
skip/skipWhile/skipUntil
Last, first
Throttle, audit
...
combineLatest
Concat
forkJoin
Race
...
Thank You!
Saif Eddine JERBI
@JerbiSaif