Parallel and Concurrent Programming
CSCI 334: Principles of Programming Languages
Williams College�Spring 2023
Memory
Program
Concurrent Programming With Threads
5
A.html
B.html
C.html
...
<cow,5>�<moo,3>
<wombat,15>�<cow,23>
<the,3>
<wombat,1>�<purple,3>
<the,11>
cow:[5,23]�
moo:[3]
the:[11,3]
wombat:[1,15]
purple:[3]
cow:28
moo:3
the:35
wombat:16
purple:3
Count
Count
Count
Sort
Sum List
Sum List
Sum List
Sum List
Sum List
Count Words
<word,count>
word:[counts]
Concurrent Programming With Threads
Thread 1
Thread 2
data
Amazon.com
network
Thread 3
Thread 4
Bank�Server
Concurrent Programming with Threads
Thread 1
Thread 2
data
Amazon.com
network
Thread 3
Thread 4
Parallelism and Concurrency
Benefits
Speed
Availability
Distribution
Code structure
Challenges
Hard to write
Not always possible
Specifics
Basic Question for Us
How can programming languages make concurrent and distributed programming easier?
Shared-Memory Deterministic Parallelism
pc
pc
pc
Deterministic Parallelism
do i=1, n
z(i) = x(i) + y(i)
enddo
z(1) = 1
do i=2, n
z(i) = z(i-1)*2
enddo
z(1) = 1
do i=2, n
z(i) = z(1)*power(2,i-1)
enddo
Occam cobegin/end
cobegin
x = x + 1 || y = y + 1
end
MATLAB parfor Loop
clear A
for i = 1:8
A(i) = i;
end
clear A
parfor i = 1:8
A(i) = i;
end
clear A
parfor i = 1:8
A(i) = A(i-1)+1;
end
?
Yep, Java Too...
IntStream range = IntStream.rangeClosed(0, 9);
range.forEach(i -> {
a[i] = i;
});
Fork-Join Parallelism
ans1
ans2
ans3
ans4
Thread States (more to come...)
New
Runnable
Running
Done
run() ends
start
scheduled
descheduled
Self control
JVM Scheduler
External control
Blocked
join
join
complete
Why is it bad to have many more Threads than processors?
SumArray Speedup vs. Num. Threads
Bank�Server
Non-Deterministic Concurrency
Shared Memory �Concurrency Control
(Java, C, C++, ...)�
mutual exclusion
monitors
signals
transactions
Communication Abstractions
(Go, Scala, Erlang)�
message passing
Actors
Race Condition Demo
Concurrency and Race Conditions
Thread 1
t1 = bal
bal = t1 + 10
Thread 2
t2 = bal
bal = t2 - 10
t1 = bal
bal = t1 + 10
t2 = bal
bal = t2 - 10
Thread 1
Thread 2
int bal = 0;
bal == 0
Concurrency and Race Conditions
Thread 1
t1 = bal
bal = t1 + 10
Thread 2
t2 = bal
bal = t2 - 10
t1 = bal
bal = t1 + 10
t2 = bal
bal = t2 - 10
Thread 1
Thread 2
int bal = 0;
bal == -10
Concurrency and Race Conditions
Thread 1
synchronized(m) {
t1 = bal
bal = t1 + 10
}
Thread 2
synchronized(m) {
t2 = bal
bal = t2 - 10
release(m)
acquire(m)
bal = t1 + 10
acquire(m)
bal = t2 - 10
Thread 1
Thread 2
Lock m = new Lock();
int bal = 0;
t1 = bal
release(m)
t2 = bal
release(m)
Account Monitor [Hoare]
class Account {
private int balance;
public synchronized void add(int n) {
balance += n;
}
public synchronized String toString() {
return "balance = " + balance;
}
}
acquire lock of
receiver object
Thread States (more to come...)
New
Runnable
Running
Done
run() ends
start
scheduled
descheduled
Self control
JVM Scheduler
External control
Blocked
join
lock acquire
wait
sleep
join complete
lock released
notifyAll
sleep
interrupted
interrupted
interrupted
Producer-Consumer
Thread 1
Thread 2
data
Amazon.com
network
Thread 3
Thread 4
Others
Using Buffers
class Example {
public static void main(String[] args) {
Buffer<Character> buffer = � new Buffer<Character>(5);
Producer prod = new Producer(buffer);
Consumer cons1 = new Consumer(buffer);
Consumer cons2 = new Consumer(buffer);
prod.start();
cons1.start();
cons2.start();
}
}
Producers
class Producer extends Thread {
private final Buffer<Character> buffer;
public Producer(Buffer<Character> b) {
buffer = b;
}
public void run() {
while (moreData()) {
char c = next();
buffer.insert(c);
}
}
}
Consumers
class Consumer extends Thread {
private final Buffer<Character> buffer;
public Consumer(Buffer<Character> b) {
buffer = b;
}
public void run() {
while (true) {
char c = buffer.delete();
System.out.print(c);
}
}
}
Java Buffer
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
X
Y
Z
0 1 2 3 4 5
start
end
elementCount = 3
Java Buffer
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
X
Y
Z
start
end
elementCount = 3
b.insert("A");
0 1 2 3 4 5
Java Buffer
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
elementCount = 4
X
Y
Z
A
start
end
0 1 2 3 4 5
Java Buffer
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
elementCount = 4
X
Y
Z
A
start
end
0 1 2 3 4 5
s = b.delete()
Java Buffer
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
elementCount = 3
Y
Z
A
start
end
0 1 2 3 4 5
Unsafe Buffer Ops
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
public void insert(T t) {
end = (end + 1) % elementData.length;
elementData[end] = t;
elementCount++;
}
public T delete() {
T elem = elementData[start];
start = (start + 1) % elementData.length;
elementCount--;
return elem;
}
...
Safe Buffer Ops
public class Buffer<T> {
private T[] elementData;
private int elementCount;
private int start;
private int end;
public synchronized void insert(T t) throws InterruptedException {
while (elementCount == elementData.length) wait();
end = (end + 1) % elementData.length;
elementData[end] = t;
elementCount++;
notifyAll();
}
public synchronized T delete() throws InterruptedException {
while (elementCount == 0) wait();
T elem = elementData[start];
start = (start + 1) % elementData.length;
elementCount--;
notifyAll();
return elem;
}
...
Consumers With Handler
class Consumer extends Thread {
private final Buffer<Character> buffer;
public Consumer(Buffer<Character> b) {
buffer = b;
}
public void run() {
try {
while (true) {
char c = buffer.delete();
System.out.print(c);
}
} catch (InterruptedException e) {
// thread interrupted, so stop loop
}
}
}
Interrupting Threads
class Example {
public static void main(String[] args) {
Buffer<String> buffer = new Buffer<String>(5);
Producer prod = new Producer(buffer);
Consumer cons = new Consumer(buffer);
prod.start();
cons.start();
try {
prod.join();
cons.interrupt();
} catch (InterruptedException e) {
System.out.println("...");
}
}
}
Account Monitor, redux
class Account {
int balance;
synchronized void add(int n) {
balance += n;
}
synchronized void transfer(Account other,
int n) {
balance -= n;
other.add(n);
}
Deadlock
class Account {
int balance;
synchronized void add(int n) {
balance += n;
}
synchronized void transfer(Account other,
int n) {
balance -= n;
other.add(n);
}
acquire(a)
acquire(b)
b.bal -= n
Thread 1
Thread 2
a.bal -= n
wait for b
wait for a
java.lang.StringBuffer
public final class StringBuffer {
int count;
char chars[];
synchronized int length() { return count; }
synchronized int getChars(...) { ... }
synchronized void clear() { ... }
synchronized StringBuffer append(StringBuffer sb) {
int len = sb.length();
...
sb.getChars(0, len, value, count);
...
}
}
Atomicity Demo
java.util.StringBuffer
public final class StringBuffer {
int count;
char chars[];
atomic int length() { return count; }
atomic int getChars(...) { ... }
atomic StringBuffer append(StringBuffer sb) {
int len = sb.length();
...
sb.getChars(0, len, value, count);
...
}
}
Atomic As a Language Feature
class Account {
int balance;
atomic void add(int n) {
balance += n;
}
atomic void transfer(Account other, int n) {
balance -= n;
other.add(n);
}
Pessimistic Atomicity
class Account {
int balance;
void add(int n) {
synchronized(global_lock) {
balance += n;
}
}
void transfer(Account other, int n) {
synchronized(global_lock) {
balance -= n;
other.add(n);
}
}
What's good?
What's bad? Alternatives?