1 of 44

Handling problems: Coroutines & ReactiveX

Milos Marinkovic

Android, OpenSource @ Blacklane

2 of 44

A recap of where we are

2

3 of 44

Why did we start using ReactiveX?

Do you remember?

  • Issues with AsyncTask
  • Threads are heavy
  • Abusing background services
  • Issues with canceling tasks
  • Callback hell

3

4 of 44

What does ReactiveX code look like?

repository.getSlowTask()

.subscribeOn(Schedulers.computation())

.observeOn(AndroidSchedulers.mainThread())

.subscribe({ result ->

// successful call, use the result

}, { error ->

// oops, there was an error

})

A long-running task executed using ReactiveX:

4

5 of 44

What did we get from ReactiveX?

  • Type safety
  • Deferred task start
  • One success, one error callback

5

fun getSlowTask() : Single<Long> {

return Single.fromCallable { ... }

}

@NonNull @CheckReturnValue

public final Disposable subscribe(

final Consumer<? super T> onSuccess,

final Consumer<? super Throwable> onError

) { ... }

6 of 44

What did we get from ReactiveX?

  • Easy worker thread switch
  • Easy reaction thread switch
  • Stream continuation with a new observable

6

@NonNull @CheckReturnValue

public final Single<T> subscribeOn(

final Scheduler scheduler

) { ... }

@NonNull @CheckReturnValue

public final Single<T> observeOn(

final Scheduler scheduler

) { ... }

7 of 44

What did we get from ReactiveX?

  • Thread-safe task cancelation
  • Check if task is alive or not

7

@Override public void dispose() {

if (runner == Thread.currentThread() &&

w instanceof NewThreadWorker) {

((NewThreadWorker)w).shutdown();

} else {

w.dispose();

}

}

@Override public boolean isDisposed() {

return w.isDisposed();

}

8 of 44

So... why did we (suddenly) stop liking ReactiveX?�

  • Large 3rd-party library
  • Steep learning curve
  • Debugging became difficult in long chains
  • Reactive programming abuse

  • A new, built-in, async concept was available

8

9 of 44

Enter: Kotlin Coroutines

9

10 of 44

What does Coroutine code look like?

launch {

try {

val result = withContext(IO) {

repository.runSlowTask()

}

// success, use the result

} catch (t: Throwable) {

// oops, an error happened

}

}

A long-running task executed using Coroutines:

10

11 of 44

Some benefits we get from using Coroutines

* An extension package is needed to fully utilize Coroutines

** Depends on what you’re coming from, so debatable

  • Eliminate callbacks
  • Simplified “futures” syntax
  • Async code looks the same as sync code
  • Small software package
  • Cleaner stack trace
  • Built into Kotlin *
  • Easier modeling of job hierarchy **
  • Recap: Scope vs. Context, hierarchy

11

12 of 44

Job Hierarchy with Coroutines ...

… and how errors work here

12

13 of 44

On to the main point

Working with (real) long-running tasks

14 of 44

Active work vs.

Simulated work

14

  • Looping keeps the CPU busy
  • Execution cannot be stopped
  • Cooperative jobs vs. uncooperative jobs
  • Coroutine delay

vs. Thread sleep

vs. rx.Timer subscribe

15 of 44

The main screen

and what it does

15

16 of 44

Running tasks

object SlowTask {

const val TASK_DELAY = 2000L

fun work() {

val start = currentTimeMillis()

while (currentTimeMillis() - start < TASK_DELAY) {

// loop actively

}

}

suspend fun simulate() = delay(TASK_DELAY)

}

16

17 of 44

The common use-case

(samples from the app)

17

18 of 44

Base Class

class BaseFragment : Fragment(), CoroutineScope {

private val parentJob = SupervisorJob()

private val defaultHandler =

CoroutineExceptionHandler { _, e -> showError(e) }

override val coroutineContext: CoroutineContext =

parentJob + Dispatchers.Main + defaultHandler

override fun onStop() {

super.onStop()

parentJob.cancelChildren()

}

abstract fun showError(error: Throwable)

}

18

19 of 44

Case 1:

Blocking Run

runBlocking {

showProgress()

try {

val result = task.execute()

showSuccess(result)

} catch (e: Throwable) {

showError(e)

// rethrow to see if our default

// exception handler catches it

throw e

}

}

19

20 of 44

Case 2:

Using launch

launch {

showProgress()

try {

val result = task.execute()

showSuccess(result)

} catch (e: Throwable) {

showError(e)

// rethrow to see if our default

// exception handler catches it

throw e

}

}

20

21 of 44

Case 3:

Using async

launch { // await() is a ‘suspend’ function

showProgress()

try {

val result = async { task.execute() }

showSuccess(result.await())

} catch (e: Throwable) {

showError(e)

// rethrow to see if our default

// exception handler catches it

throw e

}

}

21

22 of 44

Case 4:

Using withContext

launch {

showProgress()

try {

val result = withContext(IO) {

task.execute()

}

showSuccess(result)

} catch (e: Throwable) {

showError(e)

// rethrow to see if our default

// exception handler catches it

throw e

}

}

22

23 of 44

Case 5:

Using

coroutineScope

// coroutineScope is a 'suspend' function

launch {

showProgress()

try {

val result = coroutineScope {

task.execute()

}

showSuccess(result)

} catch (e: Throwable) {

showError(e)

// rethrow to see if our default

// exception handler catches it

throw e

}

}

23

24 of 44

Testing error handling with Coroutines

24

25 of 44

Coroutines that throw on completion

After actively looping

25

Blocking

Using�launch

Using�async

With�Context

In a new�Scope

Default

Crash

Default�Handler

Default�Handler

Default�Handler

Default�Handler

Reuse Job

Crash

Default�Handler

Default�Handler

Default�Handler

Default�Handler

Additional Handler

Crash

New�Handler

New�Handler

New�Handler

New�Handler

Thread-blocking

Interesting (blocks & crashes)

26 of 44

  • Uses a thread-local event loop
  • Starts in a new context
  • Joins for children after starting
  • Uses await
  • Continues in the current thread
  • Waits for the chain to complete

App crashing with ReactiveX and Coroutines

val c = runBlocking {

workWith(repo.initial())

}

val c = Single.just(repo.initial())

.map { workWith(it) }

.blockingGet()

26

27 of 44

Why was launch erroring there?

Also, why was it blocking?

fun CoroutineScope.launch(

context: CoroutineContext = EmptyCoroutineContext,

start: CoroutineStart = CoroutineStart.DEFAULT,

block: suspend CoroutineScope.() -> Unit

): Job {

val newContext = newCoroutineContext(context)

val coroutine = if (start.isLazy)

LazyStandaloneCoroutine(newContext, block) else

StandaloneCoroutine(newContext, active = true)

coroutine.start(start, coroutine, block)

return coroutine

}

  • Merges current context with the new one
    • Current context runs on the Main dispatcher
  • Errors are delivered to the resolved handler

27

28 of 44

What about async?

fun <T> CoroutineScope.async(

context: CoroutineContext = EmptyCoroutineContext,

start: CoroutineStart = CoroutineStart.DEFAULT,

block: suspend CoroutineScope.() -> T

): Deferred<T> {

val newContext = newCoroutineContext(context)

val coroutine = if (start.isLazy)

LazyDeferredCoroutine(newContext, block) else

DeferredCoroutine<T>(newContext, active = true)

coroutine.start(start, coroutine, block)

return coroutine

}

  • Behaves by default like a launched coroutine
  • Call await() to get a value (or error) from Deferred
    • This is similar to join()

28

29 of 44

Coroutines that throw on completion

After simulating work using ‘delay

29

Blocking

Using�launch

Using�async

With�Context

In a new�Scope

Default

Crash

Default�Handler

Default�Handler

Default�Handler

Default�Handler

Reuse Job

Crash

Default�Handler

Default�Handler

Default�Handler

Default�Handler

Additional Handler

Crash

New�Handler

New�Handler

New�Handler

New�Handler

Thread-blocking

Interesting (non-blocking)

30 of 44

Using delay is not blocking the thread

In fact, it’s not doing anything!

suspend fun delay(timeMillis: Long) {

if (timeMillis <= 0) return // don't delay

return suspendCancellableCoroutine sc@ {

// ‘it’ is a continuation

it.context.delay.scheduleResumeAfterDelay(

timeMillis, it

)

}

}

fun schedule(now: Long, delayedTask: DelayedTask) {

when (scheduleImpl(now, delayedTask)) {

SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()

SCHEDULE_COMPLETED -> reschedule(now, delayedTask)

SCHEDULE_DISPOSED -> {} // nothing - task disposed

else -> error("unexpected result")

}

}

30

31 of 44

Canceling tasks before they finish�(and more about cancelation errors)

31

32 of 44

Cancellation is a very specific error for Coroutines

  • Thread.sleep() throws InterruptedException
  • Coroutine delay() throws CancellationException

32

33 of 44

ReactiveX stream cancelation

33

  • Nothing to return after disposing
  • Disposables unsubscribe upstream

  • Canceling a sub-stream from outside?

34 of 44

ReactiveX hierarchy and errors

34

But what you react to is…�(merged stream)

35 of 44

Canceling Coroutines that throw

While actively looping

35

Blocking

Using�launch

Using�async

With�Context

In a new�Scope

Default

Crash

Def. Handler�(delayed cancel)

Default�Handler

Default�Handler

Def. Handler�(delayed cancel)

Reuse Job

Crash

Def. Handler�(delayed cancel)

Default�Handler

Default�Handler

Def. Handler�(delayed cancel)

Additional Handler

Crash

New Handler�(delayed cancel)

New�Handler

New�Handler

New Handler�(delayed cancel)

Thread-blocking

Interesting (async schedules result delivery)

36 of 44

A few more things...

val CoroutineScope.isActive: Boolean

get() = coroutineContext[Job]?.isActive ?: true

  • suspend modifier ≠ a new Scope
  • Check before returning/throwing

Compared to ReactiveX:

  • Awareness of being in a coroutine (suspend)

36

37 of 44

What about coroutineScope?

How are decomposed work errors handled?

suspend fun runStuff() = coroutineScope {

// you get a Deferred here

val data = async(IO) {

repository.getData()

}

// you get a value here

val sent = withContext(EventDispatcher) {

// true/false for sending success

collectOtherEvents()

sendRunningEventFor(data.await())

}

display("Message sent [$sent]")

}

  • collectOtherEvents() might block
  • withContext() will be suspended: await() call
  • display() will be suspended: $sent usage site

37

38 of 44

Canceling Coroutines that throw

While simulating work using ‘delay

38

Blocking

Using�launch

Using�async

With�Context

In a new�Scope

Default

Crash

Canceled

Canceled

Canceled

Canceled

Reuse Job

Crash

Canceled

Canceled

Canceled

Canceled

Additional Handler

Crash

Canceled

Canceled

Canceled

Canceled

Thread-blocking

Interesting (both waiting for children)

39 of 44

More about connected scopes

39

40 of 44

What happens if there’s an error…

suspend fun runStuff() = coroutineScope {

// you get a Deferred here

val data = async(IO) {

repository.getData()

}

// you get a value here

val sent = withContext(EventDispatcher) {

collectOtherEvents()

sendRunningEventFor(data.await())

}

display("Message sent [$sent]")

}

  • Repository call fails

40

41 of 44

Key takeaways

41

42 of 44

About ReactiveX

  • Single stream reactions
  • Cancelation does not deliver errors
  • You don’t have to throw it away

  • Completable ≈ launch
  • Single async
  • Single + Scheduler withContext
  • Zip of Singles coroutineScope

42

43 of 44

About Coroutines

TestYourAssumptions!!!!!!!!!!111oneeleven

43

44 of 44

Thanks for bearing with me! Questions?

Milos Marinkovic - Handling problems: Coroutines & ReactiveX

Samples - https://github.com/milosmns/talk-rx-coroutines-errors

44

Get in touch

Social handle: @ milosmns