1 of 45

4 �RxJS Pipelines�for the Real World

Deborah Kurata

Developer | Author | MVP | GDE

@deborahkurata

2 of 45

Deborah Kurata

Developer

Pluralsight Author

Angular Getting Started

Angular Reactive Forms

Angular Routing

RxJS in Angular: Reactive Development

Angular NgRx: Getting Started

C# OOP & Best Practices

Microsoft Most Valuable Professional (MVP)

Google Developer Expert (GDE)

@deborahkurata

3 of 45

RxJS Pipelines

Retrieve Related Data Pipeline

Lookup Reference Property Pipeline

Grouping Pipeline

Autocomplete Pipeline

@deborahkurata

4 of 45

Tip:

What do you have?

What do you want?

When do you want it?

@deborahkurata

5 of 45

Retrieve Related Data Pipeline

@deborahkurata

6 of 45

Retrieve Related Data Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

7 of 45

Retrieve Related Data Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

8 of 45

Retrieve Related Data Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

9 of 45

Tip:

To respond to an action, use a Subject or BehaviorSubject

@deborahkurata

10 of 45

Subject / BehaviorSubject

private userSubject = new Subject<string>();

enteredUser$ = this.userSubject.asObservable();

private userSubject = new BehaviorSubject<string>('');

enteredUser$ = this.userSubject.asObservable();

@deborahkurata

11 of 45

Retrieve Related Data Pipeline

What about exception handling?

What if the userName isn't found?

Q

postsForUser$ = this.enteredUser$.pipe(

switchMap(userName =>

this.http.get<User[]>(`${this.usersUrl}?userName=^${userName}$`)),

switchMap(users =>

this.http.get<Post[]>(`${this.postsUrl}?userId=^${users[0].id}$`))

);

Uses regEx

Returns an array

First user

@deborahkurata

12 of 45

Tip:

Refactor code into manageable and reusable pieces

@deborahkurata

13 of 45

Retrieve Related Data Pipeline

getUserId(userName: string): Observable<number> {

return this.http.get<User[]>(`${this.usersUrl}?userName=^${userName}$`).pipe(

catchError(this.handleError),

map(users => (users.length === 0) ? 0 : users[0].id)

);

}

private getPostsForUser(userId: number): Observable<Post[]> {

return this.http.get<Post[]>(`${this.postsUrl}?userId=^${userId}$`).pipe(

catchError(this.handleError)

);

}

@deborahkurata

14 of 45

Retrieve Related Data Pipeline

postsForUser$ = this.enteredUser$.pipe(

switchMap(userName => this.userService.getUserId(userName)),

switchMap(userId => this.getPostsForUser(userId))

);

getUserId(userName: string): Observable<number> {

return this.http.get<User[]>(`${this.usersUrl}?userName=^${userName}$`).pipe(

catchError(this.handleError),

map(users => (users.length === 0) ? 0 : users[0].id)

);}

private getPostsForUser(userId: number): Observable<Post[]> {

return this.http.get<Post[]>(`${this.postsUrl}?userId=^${userId}$`).pipe(

catchError(this.handleError)

);}

@deborahkurata

15 of 45

Retrieve Related Data Pipeline

@deborahkurata

16 of 45

Lookup Reference Property Pipeline

@deborahkurata

17 of 45

Lookup Reference Property Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

18 of 45

Lookup Reference Property Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

19 of 45

Lookup Reference Property Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

20 of 45

Tip:

To work with multiple streams, use a combination operator

@deborahkurata

21 of 45

Lookup Reference Property Pipeline

What about exception handling?

How do we reuse the retrieved categories?

Q

postsWithCategory$ = combineLatest([

this.http.get<Post[]>(this.postsUrl),

this.http.get<PostCategory[]>(this.postCategoriesUrl)

]);

@deborahkurata

22 of 45

Lookup Reference Property Pipeline

allPosts$ = this.http.get<Post[]>(this.postsUrl).pipe(

catchError(this.handleError)

);

allCategories$ = this.http.get<PostCategory[]>(this.catUrl).pipe(

catchError(this.handleError),

shareReplay(1)

);

postsWithCategory$ = combineLatest([

this.allPosts$,

this.categoryService.allCategories$

]) ...;

@deborahkurata

23 of 45

Lookup Reference Property Pipeline

postsWithCategory$ = combineLatest([

this.allPosts$,

this.categoryService.allCategories$

]).pipe(

map(([posts, cats]) => posts.map(post => ({

...post,

category: cats.find(c => post.categoryId === c.id)?.name

}) as Post))

);

// Observable<Post[]>

Array destructuring

Array map

Strong typing

@deborahkurata

24 of 45

Lookup Reference Property Pipeline

What if the id isn't found?

What if there are multiple lookups?

How do we reuse this logic?

Q

postsWithCategory$ = combineLatest([

this.allPosts$,

this.categoryService.allCategories$

]).pipe(

map(([posts, cats]) => posts.map(post => ({

...post,

category: cats.find(c => post.categoryId === c.id)?.name

}) as Post))

);

// Observable<Post[]>

@deborahkurata

25 of 45

Lookup Reference Property Pipeline

postsWithCategory$ = combineLatest([

this.allPosts$,

this.categoryService.allCategories$

]).pipe(

map(([posts, cat]) => this.mapCategories(posts, cat)),

);

mapCategories(posts: Post[], cat: PostCategory[]): Post[] {

return posts.map(post => ({

...post,

category: cat.find(c => post.categoryId === c.id)?.name

}) as Post);

}

@deborahkurata

26 of 45

Lookup Reference Property Pipeline

@deborahkurata

27 of 45

Grouping Pipeline

@deborahkurata

28 of 45

Grouping Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

29 of 45

Grouping Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

30 of 45

Grouping Pipeline

postsGroupedByCategory$ = this.postsWithCategory$.pipe(

concatAll(),

groupBy(post => post.categoryId, post => post),

mergeMap(group => zip(of(group.key), group.pipe(toArray()))),

toArray()

);

Emits each array element

Key selector

Element selector

@deborahkurata

31 of 45

Grouping Pipeline

postsGroupedByCategory$ = this.postsWithCategory$.pipe(

concatAll(),

groupBy(post => post.categoryId, post => post),

mergeMap(group => zip(of(group.key), group.pipe(toArray()))),

toArray()

);

For each group, emits one tuple with the id and post[]

Emits one array with all tuples

@deborahkurata

32 of 45

Grouping Pipeline

@deborahkurata

33 of 45

Grouping Pipeline

@deborahkurata

34 of 45

Autocomplete Pipeline

@deborahkurata

35 of 45

Autocomplete Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

36 of 45

Autocomplete Pipeline

What do we have?

What do we want?

When do we want it?

@deborahkurata

37 of 45

Autocomplete Pipeline

What do we want? | When do we want it?

Type: Filter the list

Click: Open the list

Focus: Open the list

Click x: Clear and open the list

Action -> Subject/BehaviorSubject

@deborahkurata

38 of 45

Autocomplete Pipeline

focus$ = new Subject<string>();

click$ = new Subject<string>();

clear$ = new Subject<string>();

<input type="text"

[(ngModel)]="selectedCategory"

(selectItem)="categorySelected($event.item)"

[ngbTypeahead]="search"

...

(focus)="focus$.next($any($event).target.value)"

(click)="click$.next($any($event).target.value)" />

<button type="button"

(click)="onClear()">

<i class="fa fa-times"></i>

</button>

@deborahkurata

39 of 45

Autocomplete Pipeline

focus$ = new Subject<string>();

click$ = new Subject<string>();

clear$ = new Subject<string>();

search = (text$: Observable<string>) => {

const debouncedText$ = text$.pipe(debounceTime(200), distinctUntilChanged());

const clicksClosed$ = this.click$.pipe(

filter(() => !this.instance.isPopupOpen()));

const operations$ = merge(debouncedText$, clicksClosed$, this.focus$, this.clear$);

return combineLatest([

operations$,

this.categories$]).pipe(

map(([txt, cat]) =>

txt === '' ? categories : cat.filter(c => new RegExp(`^${txt}`, 'i').test(c.name)))

);

}

@deborahkurata

40 of 45

Autocomplete Pipeline

focus$ = new Subject<string>();

click$ = new Subject<string>();

clear$ = new Subject<string>();

search = (text$: Observable<string>) => {

const debouncedText$ = text$.pipe(debounceTime(200), distinctUntilChanged());

const clickToOpen$ = this.click$.pipe(filter(() => !this.instance.isPopupOpen()));

const operations$ = merge(debouncedText$, clicksClosed$, this.focus$, this.clear$);

return combineLatest([

operations$,

this.categories$]).pipe(

map(([txt, cat]) =>

txt === '' ? categories : cat.filter(c => new RegExp(`^${txt}`, 'i').test(c.name)))

);

}

@deborahkurata

41 of 45

Autocomplete Pipeline

focus$ = new Subject<string>();

click$ = new Subject<string>();

clear$ = new Subject<string>();

search = (text$: Observable<string>) => {

const debouncedText$ = text$.pipe(debounceTime(200), distinctUntilChanged());

const clickToOpen$ = this.click$.pipe(filter(() => !this.instance.isPopupOpen()));

const operations$ = merge(debouncedText$, clickToOpen$, this.focus$, this.clear$);

return combineLatest([

operations$,

this.categories$]).pipe(

map(([txt, cat]) =>

txt === '' ? categories : cat.filter(c => new RegExp(`^${txt}`, 'i').test(c.name)))

);

}

@deborahkurata

42 of 45

Autocomplete Pipeline

focus$ = new Subject<string>();

click$ = new Subject<string>();

clear$ = new Subject<string>();

search = (text$: Observable<string>) => {

const debouncedText$ = text$.pipe(debounceTime(200), distinctUntilChanged());

const clickToOpen$ = this.click$.pipe(filter(() => !this.instance.isPopupOpen()));

const operations$ = merge(debouncedText$, clickToOpen$, this.focus$, this.clear$);

return combineLatest([

operations$,

this.categories$]).pipe(

map(([txt, cat]) =>

txt === '' ? categories : cat.filter(c => new RegExp(`^${txt}`, 'i').test(c.name)))

);

}

@deborahkurata

43 of 45

Autocomplete Pipeline

focus$ = new Subject<string>();

click$ = new Subject<string>();

clear$ = new Subject<string>();

search = (text$: Observable<string>) => {

const debouncedText$ = text$.pipe(debounceTime(200), distinctUntilChanged());

const clickToOpen$ = this.click$.pipe(filter(() => !this.instance.isPopupOpen()));

const operations$ = merge(debouncedText$, clickToOpen$, this.focus$, this.clear$);

return combineLatest([

operations$,

this.categories$]).pipe(

map(([txt, cat]) =>

txt === '' ? categories : cat.filter(c => new RegExp(`^${txt}`, 'i').test(c.name)))

);

}

case insensitive check

@deborahkurata

44 of 45

Autocomplete Pipeline

@deborahkurata

45 of 45

4 �RxJS Pipelines

@deborahkurata

https://github.com/DeborahK/Angular-Posts

@deborahkurata