Handling problems: Coroutines & ReactiveX
Milos Marinkovic
Android, OpenSource @ Blacklane
A recap of where we are
2
Why did we start using ReactiveX?
Do you remember?
3
What does ReactiveX code look like?
repository.getSlowTask()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ result ->
// successful call, use the result
}, { error ->
// oops, there was an error
})
A long-running task executed using ReactiveX:
4
What did we get from ReactiveX?
5
fun getSlowTask() : Single<Long> {
return Single.fromCallable { ... }
}
@NonNull @CheckReturnValue
public final Disposable subscribe(
final Consumer<? super T> onSuccess,
final Consumer<? super Throwable> onError
) { ... }
What did we get from ReactiveX?
6
@NonNull @CheckReturnValue
public final Single<T> subscribeOn(
final Scheduler scheduler
) { ... }
@NonNull @CheckReturnValue
public final Single<T> observeOn(
final Scheduler scheduler
) { ... }
What did we get from ReactiveX?
7
@Override public void dispose() {
if (runner == Thread.currentThread() &&
w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override public boolean isDisposed() {
return w.isDisposed();
}
So... why did we (suddenly) stop liking ReactiveX?�
8
Enter: Kotlin Coroutines
9
What does Coroutine code look like?
launch {
try {
val result = withContext(IO) {
repository.runSlowTask()
}
// success, use the result
} catch (t: Throwable) {
// oops, an error happened
}
}
A long-running task executed using Coroutines:
10
Some benefits we get from using Coroutines
* An extension package is needed to fully utilize Coroutines
** Depends on what you’re coming from, so debatable
11
Job Hierarchy with Coroutines ...
… and how errors work here
12
On to the main point
Working with (real) long-running tasks
Active work vs.
Simulated work
14
vs. Thread sleep
vs. rx.Timer subscribe
The main screen
and what it does
15
Running tasks
object SlowTask {
const val TASK_DELAY = 2000L
fun work() {
val start = currentTimeMillis()
while (currentTimeMillis() - start < TASK_DELAY) {
// loop actively
}
}
suspend fun simulate() = delay(TASK_DELAY)
}
16
The common use-case
(samples from the app)
17
Base Class
class BaseFragment : Fragment(), CoroutineScope {
private val parentJob = SupervisorJob()
private val defaultHandler =
CoroutineExceptionHandler { _, e -> showError(e) }
override val coroutineContext: CoroutineContext =
parentJob + Dispatchers.Main + defaultHandler
override fun onStop() {
super.onStop()
parentJob.cancelChildren()
}
abstract fun showError(error: Throwable)
}
18
Case 1:
Blocking Run
runBlocking {
showProgress()
try {
val result = task.execute()
showSuccess(result)
} catch (e: Throwable) {
showError(e)
// rethrow to see if our default
// exception handler catches it
throw e
}
}
19
Case 2:
Using launch
launch {
showProgress()
try {
val result = task.execute()
showSuccess(result)
} catch (e: Throwable) {
showError(e)
// rethrow to see if our default
// exception handler catches it
throw e
}
}
20
Case 3:
Using async
launch { // await() is a ‘suspend’ function
showProgress()
try {
val result = async { task.execute() }
showSuccess(result.await())
} catch (e: Throwable) {
showError(e)
// rethrow to see if our default
// exception handler catches it
throw e
}
}
21
Case 4:
Using withContext
launch {
showProgress()
try {
val result = withContext(IO) {
task.execute()
}
showSuccess(result)
} catch (e: Throwable) {
showError(e)
// rethrow to see if our default
// exception handler catches it
throw e
}
}
22
Case 5:
Using
coroutineScope
// coroutineScope is a 'suspend' function
launch {
showProgress()
try {
val result = coroutineScope {
task.execute()
}
showSuccess(result)
} catch (e: Throwable) {
showError(e)
// rethrow to see if our default
// exception handler catches it
throw e
}
}
23
Testing error handling with Coroutines
24
Coroutines that throw on completion
After actively looping
25
| Blocking | Using�launch | Using�async | With�Context | In a new�Scope |
Default | Crash | Default�Handler | Default�Handler | Default�Handler | Default�Handler |
Reuse Job | Crash | Default�Handler | Default�Handler | Default�Handler | Default�Handler |
Additional Handler | Crash | New�Handler | New�Handler | New�Handler | New�Handler |
Thread-blocking
Interesting (blocks & crashes)
App crashing with ReactiveX and Coroutines
val c = runBlocking {
workWith(repo.initial())
}
val c = Single.just(repo.initial())
.map { workWith(it) }
.blockingGet()
26
Why was launch erroring there?
Also, why was it blocking?
fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
27
What about async?
fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
28
Coroutines that throw on completion
After simulating work using ‘delay‘
29
| Blocking | Using�launch | Using�async | With�Context | In a new�Scope |
Default | Crash | Default�Handler | Default�Handler | Default�Handler | Default�Handler |
Reuse Job | Crash | Default�Handler | Default�Handler | Default�Handler | Default�Handler |
Additional Handler | Crash | New�Handler | New�Handler | New�Handler | New�Handler |
Thread-blocking
Interesting (non-blocking)
Using delay is not blocking the thread
In fact, it’s not doing anything!
suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ {
// ‘it’ is a continuation
it.context.delay.scheduleResumeAfterDelay(
timeMillis, it
)
}
}
fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // nothing - task disposed
else -> error("unexpected result")
}
}
30
Canceling tasks before they finish�(and more about cancelation errors)
31
Cancellation is a very specific error for Coroutines
32
ReactiveX stream cancelation
33
ReactiveX hierarchy and errors
34
But what you react to is…�(merged stream)
Canceling Coroutines that throw
While actively looping
35
| Blocking | Using�launch | Using�async | With�Context | In a new�Scope |
Default | Crash | Def. Handler�(delayed cancel) | Default�Handler | Default�Handler | Def. Handler�(delayed cancel) |
Reuse Job | Crash | Def. Handler�(delayed cancel) | Default�Handler | Default�Handler | Def. Handler�(delayed cancel) |
Additional Handler | Crash | New Handler�(delayed cancel) | New�Handler | New�Handler | New Handler�(delayed cancel) |
Thread-blocking
Interesting (async schedules result delivery)
A few more things...
val CoroutineScope.isActive: Boolean
get() = coroutineContext[Job]?.isActive ?: true
Compared to ReactiveX:
36
What about coroutineScope?
How are decomposed work errors handled?
suspend fun runStuff() = coroutineScope {
// you get a Deferred here
val data = async(IO) {
repository.getData()
}
// you get a value here
val sent = withContext(EventDispatcher) {
// true/false for sending success
collectOtherEvents()
sendRunningEventFor(data.await())
}
display("Message sent [$sent]")
}
37
Canceling Coroutines that throw
While simulating work using ‘delay‘
38
| Blocking | Using�launch | Using�async | With�Context | In a new�Scope |
Default | Crash | Canceled | Canceled | Canceled | Canceled |
Reuse Job | Crash | Canceled | Canceled | Canceled | Canceled |
Additional Handler | Crash | Canceled | Canceled | Canceled | Canceled |
Thread-blocking
Interesting (both waiting for children)
More about connected scopes
39
What happens if there’s an error…
suspend fun runStuff() = coroutineScope {
// you get a Deferred here
val data = async(IO) {
repository.getData()
}
// you get a value here
val sent = withContext(EventDispatcher) {
collectOtherEvents()
sendRunningEventFor(data.await())
}
display("Message sent [$sent]")
}
40
Key takeaways
41
About ReactiveX
42
About Coroutines
Test�Your�Assumptions�!!!!!!!!!!111oneeleven
43
Thanks for bearing with me! Questions?
Milos Marinkovic - Handling problems: Coroutines & ReactiveX
Samples - https://github.com/milosmns/talk-rx-coroutines-errors
44
Get in touch
Social handle: @ milosmns