Parallel & Concurrent Programming
Kotlin
@kotlin
| Developed by JetBrains
Definition
According to Wikipedia:
Motivation
Parallelism vs concurrency
Concurrency: processes vs threads
Single-threaded process
Multi-threaded process
Preemptive vs cooperative scheduling
Preemptive: OS interrupts tasks
Cooperative: task yields control
Parallel and concurrent Programming
in the JVM
Parallel programming in the JVM
2 Java packages
Kotlin package
Throwback: Single abstract method interfaces
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Interface with a single method. We can instantiate it with a lambda.
class RunnableWrapper(val runnable: Runnable)
val myWrapperObject =
RunnableWrapper(
object : Runnable {
override fun run() {
println("I run")
}
}
)
val myWrapperLambda = RunnableWrapper { println("yo") }
Ways to create threads
You can inherit from the Thread class, which also implements Runnable.
class MyThread : Thread() {
override fun run() {
println("${currentThread()} is running")
}
}
fun main() {
val myThread = MyThread()
myThread.start()
}
run vs start
Never call Thread.run()!
run will execute on your thread, while start will create a new thread where run will be executed.
fun main() {
val myThread1 = MyThread()
myThread1.start() // OK
val myThread2 = MyThread()
myThread2.run() // Current thread gets blocked
}
Ways to create threads
You can implement the Runnable interface and pass it to a thread. You can pass the same Runnable to several threads.
fun main() {
val myRunnable = Runnable { println("Sorry, gotta run!") }
val thread1 = Thread(myRunnable)
thread1.start()
val thread2 = Thread(myRunnable)
thread2.start()
}
Ways to create threads
Kotlin has an even simpler way to create threads, but under the hood the same old thread is created and started.
import kotlin.concurrent.thread
fun main() {
val kotlinThread = thread {
println("I start instantly, but you can pass an option to start me later")
}
}
This is the preferable way to create threads.
Thread properties
A thread's properties cannot be changed after it is started.
Main properties of a thread:
State of a thread
state | isAlive |
NEW | false |
RUNNABLE | true |
BLOCKED | true |
WAITING | true |
TIMED_WAITING | true |
TERMINATED | false |
State of a thread
New
Runnable
start
Running
Terminated
terminate
Waiting
Blocked
wait
sleep
…
yield
sched
notify
timeout
…
Ways to manipulate a thread's state
sleep, join, yield, interrupt
Classic worker
class ClassicWorker : Runnable {
override fun run() {
try {
while (!Thread.interrupted()) {
// do stuff
}
} catch (e: InterruptedException) {} // absolutely legal empty catch block
}
}
Parallelism and shared memory: Examples of problematic interleaving
Parallel threads have access to the same shared memory.
This often leads to problems that cannot arise in a single-threaded environment.
class Counter {
private var c = 0
fun increment() {
c++
}
fun decrement() {
c--
}
fun value(): Int {
return c
}
}
Both operations on c are single, simple statements.
However, even simple statements can be translated into multiple steps by the virtual machine, and those steps can be interleaved.
Parallelism and shared memory: Examples of problematic interleaving
Parallel threads have access to the same shared memory.
This often leads to problems that cannot arise in a single-threaded environment.
class Counter {
private var c = 0
fun increment() {
c++
}
fun decrement() {
c--
}
fun value(): Int {
return c
}
}
Suppose both Thread#1 and Thread#2 invoke increment at the same time. If the initial value of c is 0, their interleaved actions might follow this sequence:
Synchronization mechanisms
Locks
class LockedCounter {
private var c = 0
private val lock = ReentrantLock()
fun increment() {
lock.withLock { c++ }
}
// same for other methods
…
}
The lock interface
Conditions
class PositiveLockedCounter {
private var c = 0
private val lock = ReentrantLock()
private val condition = lock.newCondition()
fun increment() {
lock.withLock {
c++
condition.signal()
}
}
fun decrement() {
lock.withLock {
while (c == 0) {
condition.await()
}
c--
}
}
fun value(): Int {
return lock.withLock { c }
}
}
A condition allows a thread holding a lock to wait until another thread signals it about a certain event. Internally, the await method releases the associated lock upon call, and acquires it back before finally returning it again.
The ReentrantLock class
The synchronized statement
class Counter {
private var c = 0
fun increment() {
synchronized(this) { c++ }
}
…
}
In the JVM, every object has an intrinsic lock associated with it (aka a monitor).
Synchronized method
Java
public class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
…
}
Kotlin
class SynchronizedCounter {
private var c = 0
@Synchronized
fun increment() {
c++
}
…
}
The ReadWriteLock class
ReadWriteLock allows multiple readers to access a resource concurrently but only lets a single writer modify it.
The ReadWriteLock Class
class PositiveLockedCounter {
private var c = 0
private val rwLock = ReadWriteReentrantLock()
fun increment() {
rwLock.write { c++ }
}
fun decrement() {
rwLock.write { c-- }
}
fun value(): Int {
return rwLock.read { c }
}
}
Concurrent blocking collections
java.util.concurrent is a Java package that implements both blocking and non-blocking concurrent collections, such as:
Concurrent non-blocking collections
java.util.concurrent is a Java package that implements both blocking and non-blocking concurrent collections, such as:
Synchronization primitives
java.util.concurrent also implements concurrent data structures and synchronization primitives.
Java Memory Model: Weak behaviors
There are no guarantees when it comes to ordering!
class OrderingTest {
var x = 0
var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
Possible outputs:
Java Memory Model: Weak behaviors
There are no guarantees when it comes to progress!
class ProgressTest {
var flag = false
fun test() {
thread {
while (!flag) {}
println("I am free!")
}
thread { flag = true }
}
}
Possible outputs:
Java Memory Model: Weak behaviors
There are no guarantees when it comes to progress!
class ProgressTest {
var flag = false
fun test() {
thread {
while (true) {}
println("I am free!")
}
thread { flag = true }
}
}
Possible outputs:
JMM: Data-Race-Freedom Guarantee
But what does JMM guarantee?
Well-synchronized programs have simple interleaving semantics.
JMM: Data-Race-Freedom Guarantee
But what does JMM guarantee?
Well-synchronized programs have simple interleaving semantics.
Well-synchronized = Data-race-free
Simple interleaving semantics = Sequentially consistent semantics
Data-race-free programs have sequentially consistent semantics
JMM: Volatile fields
Volatile fields can be used to restore sequential consistency.
class OrderingTest {
@Volatile var x = 0
@Volatile var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
class ProgressTest {
@Volatile var flag = false
fun test() {
thread {
while (!flag) {}
println("I am free!")
}
thread { flag = true }
}
}
JMM: Volatile fields
Volatile variables can be used for synchronization.
class OrderingTest {
var x = 0
@Volatile var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
How do we know there is enough synchronization?
JMM: Happens-before relation
class OrderingTest {
var x = 0
@Volatile var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
Wx0
Wy0
Wx1
Ry1
Wy1
Rx1
V
V
rf
rf
po
po
po
rf
program-order
reads-from
po
po
JMM: Happens-before relation
class OrderingTest {
var x = 0
@Volatile var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
Wx0
Wy0
Wx1
Ry1
Wy1
Rx1
V
V
rf
sw
po
po
po
rf
program-order
reads-from
po
po
sw
Synchronizes-with
-e.g. reads-from on Volatile field
JMM: Happens-before relation
class OrderingTest {
var x = 0
@Volatile var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
Wx0
Wy0
Wx1
Ry1
Wy1
Rx1
V
V
hb
sw
po
po
po
rf
program-order
reads-from
po
po
sw
Synchronizes-with
-e.g. reads-from on Volatile field
hb
happens-before
= (po ∪ sw)+
JMM: Happens-before relation
class OrderingTest {
var x = 0
@Volatile var y = 0
fun test() {
thread {
x = 1
y = 1
}
thread {
val a = y
val b = x
println("$a, $b")
}
}
}
Wx0
Wy0
Wx1
Ry1
Wy1
Rx0
V
V
hb
sw
po
po
po
rf
program-order
reads-from
po
po
sw
Synchronizes-with
-e.g. reads-from on Volatile field
hb
happens-before
= (po ∪ sw)+
rf
JMM: Synchronizing actions
JMM: DRF-SC again
Two events form a data race if:
Data-race-free programs have sequentially consistent semantics
A program is data-race-free if, for every possible execution of this program, no two events form a data race.
JMM: Atomics
But what about atomic operators on shared variables?
class Counter {
private val c = AtomicInteger()
fun increment() {
c.incrementAndGet()
}
fun decrement() {
c.decrementAndGet()
}
fun value(): Int {
return c.get()
}
}
JMM: Atomics
Atomic classes from package the java.util.concurrent.atomic package:
And their array counterparts:
JMM: Atomics
JMM: Atomics
Methods of atomic classes:
In these cases, XXX is an access mode: Acquire, Release, Opaque, Plain
You can learn more about Java Access Modes here:
https://gee.cs.oswego.edu/dl/html/j9mm.html
JMM: Atomics Problem
class Node<T>(val value: T) {
val next = AtomicReference<Node<T>>()
}
JMM: Atomic field updaters
Use AtomicXXXFieldUpdater classes to directly modify volatile fields:
class Counter {
@Volatile private var c = 0
companion object {
private val updater = AtomicIntegerFieldUpdater.newUpdater(Counter::class.java, "c")
}
fun increment() {
updater.incrementAndGet(this)
}
fun decrement() {
updater.decrementAndGet(this)
}
fun value(): Int {
return updater.get(this)
}
}
Starting from JDK9, there is also the VarHandle class, which serves a similar purpose.
Kotlin: AtomicFU
The AtomicFU library is a recommended way to use atomic operations in Kotlin: https://github.com/Kotlin/kotlinx-atomicfu
class Counter {
private val c = atomic(0)
fun increment() {
c += 1
}
fun decrement() {
c -= 1
}
fun value(): Int {
return c.value
}
}
Thanks!
@kotlin
| Developed by JetBrains