1 of 49

Parallel and Concurrent Programming

CSCI 334: Principles of Programming Languages

Williams College�Spring 2023

2 of 49

Memory

Program

3 of 49

4 of 49

Concurrent Programming With Threads

5 of 49

5

A.html

B.html

C.html

...

<cow,5>�<moo,3>

<wombat,15>�<cow,23>

<the,3>

<wombat,1>�<purple,3>

<the,11>

cow:[5,23]�

moo:[3]

the:[11,3]

wombat:[1,15]

purple:[3]

cow:28

moo:3

the:35

wombat:16

purple:3

Count

Count

Count

Sort

Sum List

Sum List

Sum List

Sum List

Sum List

Count Words

<word,count>

word:[counts]

6 of 49

Concurrent Programming With Threads

Thread 1

Thread 2

data

Amazon.com

network

Thread 3

Thread 4

7 of 49

Bank�Server

8 of 49

9 of 49

Concurrent Programming with Threads

Thread 1

Thread 2

data

Amazon.com

network

Thread 3

Thread 4

10 of 49

Parallelism and Concurrency

Benefits

Speed

Availability

Distribution

Code structure

Challenges

Hard to write

Not always possible

Specifics

  • communication: send/receive info
  • synchronization: wait for another process
  • atomicity: don't stop in middle

11 of 49

Basic Question for Us

How can programming languages make concurrent and distributed programming easier?

12 of 49

Shared-Memory Deterministic Parallelism

pc

pc

pc

13 of 49

Deterministic Parallelism

14 of 49

do i=1, n

z(i) = x(i) + y(i)

enddo

z(1) = 1

do i=2, n

z(i) = z(i-1)*2

enddo

z(1) = 1

do i=2, n

z(i) = z(1)*power(2,i-1)

enddo

15 of 49

Occam cobegin/end

cobegin

x = x + 1 || y = y + 1

end

16 of 49

MATLAB parfor Loop

clear A

for i = 1:8

A(i) = i;

end

clear A

parfor i = 1:8

A(i) = i;

end

clear A

parfor i = 1:8

A(i) = A(i-1)+1;

end

?

17 of 49

Yep, Java Too...

IntStream range = IntStream.rangeClosed(0, 9);

range.forEach(i -> {

a[i] = i;

});

18 of 49

Fork-Join Parallelism

  • Define class Worker extending Thread
    • override public void run() method
  • Create object o of class Worker
  • Invoke o.start()

ans1

ans2

ans3

ans4

19 of 49

Thread States (more to come...)

New

Runnable

Running

Done

run() ends

start

scheduled

descheduled

Self control

JVM Scheduler

External control

Blocked

join

join

complete

Why is it bad to have many more Threads than processors?

20 of 49

SumArray Speedup vs. Num. Threads

21 of 49

Bank�Server

22 of 49

Non-Deterministic Concurrency

Shared Memory �Concurrency Control

(Java, C, C++, ...)�

mutual exclusion

monitors

signals

transactions

Communication Abstractions

(Go, Scala, Erlang)�

message passing

Actors

23 of 49

Race Condition Demo

24 of 49

Concurrency and Race Conditions

Thread 1

t1 = bal

bal = t1 + 10

Thread 2

t2 = bal

bal = t2 - 10

t1 = bal

bal = t1 + 10

t2 = bal

bal = t2 - 10

Thread 1

Thread 2

int bal = 0;

bal == 0

25 of 49

Concurrency and Race Conditions

Thread 1

t1 = bal

bal = t1 + 10

Thread 2

t2 = bal

bal = t2 - 10

t1 = bal

bal = t1 + 10

t2 = bal

bal = t2 - 10

Thread 1

Thread 2

int bal = 0;

bal == -10

26 of 49

Concurrency and Race Conditions

Thread 1

synchronized(m) {

t1 = bal

bal = t1 + 10

}

Thread 2

synchronized(m) {

t2 = bal

bal = t2 - 10

release(m)

acquire(m)

bal = t1 + 10

acquire(m)

bal = t2 - 10

Thread 1

Thread 2

Lock m = new Lock();

int bal = 0;

t1 = bal

release(m)

t2 = bal

release(m)

27 of 49

Account Monitor [Hoare]

class Account {

private int balance;

public synchronized void add(int n) {

balance += n;

}

public synchronized String toString() {

return "balance = " + balance;

}

}

acquire lock of

receiver object

28 of 49

Thread States (more to come...)

New

Runnable

Running

Done

run() ends

start

scheduled

descheduled

Self control

JVM Scheduler

External control

Blocked

join

lock acquire

wait

sleep

join complete

lock released

notifyAll

sleep

interrupted

interrupted

interrupted

29 of 49

Producer-Consumer

  • Buffer with finite size
    • Producers add values to it
    • Consumers remove values from it
  • Used "everywhere"
    • buffer messages on network
    • OS events
    • events in simulation
    • messages between threads...

Thread 1

Thread 2

data

Amazon.com

network

Thread 3

Thread 4

30 of 49

Others

  • Server request processing

  • Router�� input-> queue -> control -> queue -> output

31 of 49

Using Buffers

class Example {

public static void main(String[] args) {

Buffer<Character> buffer = � new Buffer<Character>(5);

Producer prod = new Producer(buffer);

Consumer cons1 = new Consumer(buffer);

Consumer cons2 = new Consumer(buffer);

prod.start();

cons1.start();

cons2.start();

}

}

32 of 49

Producers

class Producer extends Thread {

private final Buffer<Character> buffer;

public Producer(Buffer<Character> b) {

buffer = b;

}

public void run() {

while (moreData()) {

char c = next();

buffer.insert(c);

}

}

}

33 of 49

Consumers

class Consumer extends Thread {

private final Buffer<Character> buffer;

public Consumer(Buffer<Character> b) {

buffer = b;

}

public void run() {

while (true) {

char c = buffer.delete();

System.out.print(c);

}

}

}

34 of 49

Java Buffer

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

X

Y

Z

0 1 2 3 4 5

start

end

elementCount = 3

35 of 49

Java Buffer

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

X

Y

Z

start

end

elementCount = 3

b.insert("A");

0 1 2 3 4 5

36 of 49

Java Buffer

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

elementCount = 4

X

Y

Z

A

start

end

0 1 2 3 4 5

37 of 49

Java Buffer

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

elementCount = 4

X

Y

Z

A

start

end

0 1 2 3 4 5

s = b.delete()

38 of 49

Java Buffer

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

elementCount = 3

Y

Z

A

start

end

0 1 2 3 4 5

39 of 49

Unsafe Buffer Ops

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

public void insert(T t) {

end = (end + 1) % elementData.length;

elementData[end] = t;

elementCount++;

}

public T delete() {

T elem = elementData[start];

start = (start + 1) % elementData.length;

elementCount--;

return elem;

}

...

40 of 49

Safe Buffer Ops

public class Buffer<T> {

private T[] elementData;

private int elementCount;

private int start;

private int end;

public synchronized void insert(T t) throws InterruptedException {

while (elementCount == elementData.length) wait();

end = (end + 1) % elementData.length;

elementData[end] = t;

elementCount++;

notifyAll();

}

public synchronized T delete() throws InterruptedException {

while (elementCount == 0) wait();

T elem = elementData[start];

start = (start + 1) % elementData.length;

elementCount--;

notifyAll();

return elem;

}

...

41 of 49

Consumers With Handler

class Consumer extends Thread {

private final Buffer<Character> buffer;

public Consumer(Buffer<Character> b) {

buffer = b;

}

public void run() {

try {

while (true) {

char c = buffer.delete();

System.out.print(c);

}

} catch (InterruptedException e) {

// thread interrupted, so stop loop

}

}

}

42 of 49

Interrupting Threads

class Example {

public static void main(String[] args) {

Buffer<String> buffer = new Buffer<String>(5);

Producer prod = new Producer(buffer);

Consumer cons = new Consumer(buffer);

prod.start();

cons.start();

try {

prod.join();

cons.interrupt();

} catch (InterruptedException e) {

System.out.println("...");

}

}

}

43 of 49

Account Monitor, redux

class Account {

int balance;

synchronized void add(int n) {

balance += n;

}

synchronized void transfer(Account other,

int n) {

balance -= n;

other.add(n);

}

44 of 49

Deadlock

class Account {

int balance;

synchronized void add(int n) {

balance += n;

}

synchronized void transfer(Account other,

int n) {

balance -= n;

other.add(n);

}

acquire(a)

acquire(b)

b.bal -= n

Thread 1

Thread 2

a.bal -= n

wait for b

wait for a

45 of 49

java.lang.StringBuffer

public final class StringBuffer {

int count;

char chars[];

synchronized int length() { return count; }

synchronized int getChars(...) { ... }

synchronized void clear() { ... }

synchronized StringBuffer append(StringBuffer sb) {

int len = sb.length();

...

sb.getChars(0, len, value, count);

...

}

}

46 of 49

Atomicity Demo

47 of 49

java.util.StringBuffer

public final class StringBuffer {

int count;

char chars[];

atomic int length() { return count; }

atomic int getChars(...) { ... }

atomic StringBuffer append(StringBuffer sb) {

int len = sb.length();

...

sb.getChars(0, len, value, count);

...

}

}

48 of 49

Atomic As a Language Feature

class Account {

int balance;

atomic void add(int n) {

balance += n;

}

atomic void transfer(Account other, int n) {

balance -= n;

other.add(n);

}

49 of 49

Pessimistic Atomicity

class Account {

int balance;

void add(int n) {

synchronized(global_lock) {

balance += n;

}

}

void transfer(Account other, int n) {

synchronized(global_lock) {

balance -= n;

other.add(n);

}

}

What's good?

What's bad? Alternatives?