1 of 60

Concurrency and Multi-threaded Programming

CS 240 – Advanced Software Construction

2 of 60

Multi-Threaded Programming Overview

3 of 60

Threads

  • By default, programs do one thing at a time
  • They start executing in main(), and when main() completes, the program terminates

main()

4 of 60

Threads (2)

  • Often, it is desirable for a program to do multiple things at the same time (concurrently)
  • To do this, a program can create multiple “threads” of control, each of which represents something the program is working on

5 of 60

Threads (3)

  • A program starts with one “main” thread
  • Additional threads can be created as needed
  • Each thread has its own runtime stack, so it can run independently from the other threads

6 of 60

Java Threads Example

public class JavaThreadExample {

public static void main(String[] args) {

CountingThread countUp =

new CountingThread("UP", 0, 50, 1);

CountingThread countDown =

new CountingThread("DOWN", 50, 0, -1);

countUp.start();

countDown.start();

System.out.println("Leaving Main Thread");

}

}

class CountingThread extends Thread {

private String name;

private int start;

private int stop;

private int increment;

public CountingThread(

String name, int start, int stop, int increment) {

this.name = name;

this.start = start;

this.stop = stop;

this.increment = increment;

}

public void run() {

for (int i = start; i != stop; i += increment) {

System.out.println(name + ": " + i);

}

}

}

7 of 60

Parallel vs. Concurrent vs. Sequential Execution

  • When you have multiple tasks and a single CPU (or core), the operating system will swap which task is executing so that each task gets a chance to run. This allows the tasks to run concurrently.

  • If you have multiple CPUs (or cores) then the operating system can actually run the tasks at the same time, or in parallel.

  • If each task has to run to completion before another task can start then tasks are running sequentially.

8 of 60

9 of 60

Thread Synchronization

(just the basics)

10 of 60

Making a Thread Wait for Another

public class JavaThreadExample {

public static void main(String[] args)throws

InterruptedException {

CountingThread countUp =

new CountingThread("UP", 0, 50, 1);

CountingThread countDown =

new CountingThread("DOWN", 50, 0, -1);

countUp.start();

countUp.join();

countDown.start();

countDown.join();

System.out.println("Leaving Main Thread");

}

}

class CountingThread extends Thread {

private String name;

private int start;

private int stop;

private int increment;

public CountingThread(

String name, int start, int stop, int increment) {

this.name = name;

this.start = start;

this.stop = stop;

this.increment = increment;

}

public void run() {

for (int i = start; i != stop; i += increment) {

System.out.println(name + ": " + i);

}

}

}

11 of 60

12 of 60

Thread Pools

13 of 60

Thread Pools

  • Often, a program will need to run several or even many threads concurrently
    • Example: Chess server needs a thread to process each incoming request (web API or web socket)
    • Example: A web browser needs to download many files when loading a web page

  • A simple approach would be to create a new thread every time you need to run a background task

  • Thread creation incurs a certain amount of overhead

  • Rather than creating a new thread every time one is needed, it can be more efficient to reuse threads you’ve previously created

14 of 60

Thread Pools (2)

  • A “thread pool” can be used to optimize this scenario. A thread pool works as follows:
    • Create a number of threads at initialization time before you need them, and store them in a list. This list constitutes the “thread pool”
    • When you want to run a task on another thread:
      • If there’s a thread currently available in the pool, run your task on one of the available threads
      • If there’s not a thread currently available in the pool, add your task to a “task queue”
    • When a task finishes running:
      • If there are tasks waiting to run in the task queue, remove the next task from the queue, and run it on the newly available thread
      • If there are no waiting tasks, put the newly available thread back into the pool

15 of 60

Java Thread Pools

16 of 60

Java Thread Pools (2)

  • Java’s Executors class has several methods for creating pre-configured thread pool instances

17 of 60

Runnable vs. Callable

  • Two ways to write tasks that can be executed by an ExecutorService:
    • Write a class that implements the Runnable interface
      • Your code goes in the public void run() method
      • Preferred when you don’t need to return a result
    • Write a class that implements the Callable<V> interface
      • Your code goes in the public V call() method
      • Preferred when you need to return a result

18 of 60

Java Thread Pool Example

import java.util.concurrent.*;���public class JavaThreadPoolExample {� � public static void main(String[] args) throws InterruptedException {� � ExecutorService threadPool = Executors.newFixedThreadPool(5);�� for (int i = 0; i < 10; ++i) {� threadPool.submit(new Counter("UP_" + i, 0, 50, 1));� threadPool.submit(new Counter("DOWN_" + i, 50, 0, -1));� }� � threadPool.shutdown();� � threadPool.awaitTermination(1, TimeUnit.MINUTES);�� System.out.println("Leaving Main Thread");� }�}���class Counter implements Runnable {� � private String _name;� private int _start;� private int _stop;� private int _increment;� � public Counter(String name, int start, int stop, int increment) {� _name = name;� _start = start;� _stop = stop;� _increment = increment;� }� � public void run() {� for (int i = _start; i != _stop; i += _increment) { � System.out.println(_name + ": " + i);� }� }�}

19 of 60

Citations (Multi-Threaded Programming)

Diagrams created by course authors: Jerod Wilkerson and Ken Rodham

20 of 60

Race Conditions�(or Race Hazards)

21 of 60

Race Condition

  • The correctness of a program depends on the relative timing or interleaving of multiple threads or processes
    • Program’s behavior is non-deterministic
    • Sometimes it works, sometimes it doesn’t
    • Threads interleave differently each time the program runs
  • Caused by shared resources accessed concurrently by multiple threads
    • In-memory data structures
    • Input sources / Output destinations
      • Files
      • Databases
      • Sockets (e.g., web sockets)
      • Terminal/screen

22 of 60

In-memory Data structure Race Condition Example

  • What could happen if two threads tried to push a value onto a stack at the same time?

public class Stack<T> {� private LinkedList<T> values;�� public Stack() {� values = new LinkedList<T>();� }�� public void push(T value) {� values.addFirst(value);� }�� public T pop() throws NoSuchElementException {� return values.removeFirst();� }�� public int size() {� return values.size();� }�}

23 of 60

In-memory Data structure Race Condition Example (2)

  • What could happen if two users join or leave a game at the same time (even different games)?
    • Data structures that track web socket sessions could be corrupted

/**� * Stores web socket connections (Session) for each game� */�public class WebSocketSessions {�� //map of gameID to sessions participating in that game� private final Map<Integer, Set<Session>> gameMap;�� //map of session to gameID� private final Map<Session, Integer> sessionMap;�� private WebSocketSessions() {� this.gameMap = new HashMap<>();� this.sessionMap = new HashMap<>();� }�� public void addSessionForGame(Integer gameID, Session session) {� ...

...� }�� public boolean removeSessionFromGame(Integer gameID, Session session) {� ...

...� }�� public void removeSession(Session session) {

...

...� }�}�

24 of 60

File Race Condition Example

package demo;��import java.io.*;�import java.util.*;�import java.util.concurrent.*;���public class FileRaceConditionExample {�� public static void main(String[] args) throws InterruptedException {�� final File FILE_PATH = new File("count");� final int ITERATIONS = 100;�� try (var output = new PrintWriter(FILE_PATH)) {� output.print(0);� } catch (IOException ex) {� ex.printStackTrace();� }�� ExecutorService threadPool = Executors.newCachedThreadPool();�� for (int i = 0; i < 2; ++i) {� threadPool.submit(new FileCounter(i, FILE_PATH, ITERATIONS));� }�� threadPool.shutdown();�� threadPool.awaitTermination(1, TimeUnit.MINUTES);�� System.out.println("Leaving Main Thread");� }�}

class FileCounter implements Runnable {�� //private static Object lockObject = new Object();�� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private void incrementCount() throws IOException {� //synchronized (lockObject) {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);� //}� }��}

25 of 60

Database Race Condition Example

  • What could happen if two users register with the same user name ‘allie’ at the same time?

26 of 60

Database Race Condition Example (2)

  • What could happen if two users try to claim the same color in the same game at the same time?

27 of 60

28 of 60

Writing Thread-safe Code Part 1: Database Transactions

29 of 60

Writing Thread-safe Code

  • How can we write multi-threaded programs that don’t have race conditions and work reliably?

  • Complicated subject

  • We will discuss a few basic ideas
    • Database transactions
    • Synchronized methods in Java
    • Synchronized code blocks in Java
    • Atomic operations

  • CS 324 teaches the subject more deeply

30 of 60

Database Transactions

  • By default, the effects of a single SQL UPDATE or DELETE statement are “committed” (made permanent) immediately
  • Database transactions allow a sequence of multiple SQL statements to be executed as an atomic unit
    • All SQL statements in the transaction are either ALL executed or NONE are executed (partial execution is prevented)
    • This preserves the integrity of the data in the database (keeps it consistent)
  • Database transactions “serialize” transactions that are concurrently executed by different threads or processes
    • The overall effect of executing transactions T1 and T2 concurrently will be equivalent to one of the following:
      • T1 executed first, T2 executed second
      • T2 executed first, T1 executed second

31 of 60

Database Transactions (2)

public Connection openConnection() throws DataAccessException {� try {� if (conn != null){� throw new DataAccessException("Database connection already open");� }�� final String CONNECTION_URL = DATABASE_DRIVER + MYSQL_URL;�

// Open a connection� conn = DriverManager.getConnection(CONNECTION_URL, DB_USERNAME, DB_PASSWORD);�� // Start a transaction� conn.setAutoCommit(false);�� } catch (SQLException e) {� e.printStackTrace();� throw new DataAccessException("Unable to open connection to database");� }

public void closeConnection(boolean commit) {� if (conn == null) {� return;� }�� try {� if (commit) {

// Commit the transaction� conn.commit();� } else {

// Rollback the transactionconn.rollback();� }�

// Close the connection� conn.close();� conn = null;

} catch (SQLException e) {� e.printStackTrace();� }�}

32 of 60

Database Transactions (3)

  • Use db transactions to prevent two users from registering with the same user name

33 of 60

Database Transactions (4)

  • Use db transactions to prevent two users from registering with the same user name

34 of 60

Database Transactions (5)

  • Use db transactions to prevent two users from claiming the same side in a game

35 of 60

Database Transactions (6)

  • Use db transactions to prevent two users from claiming the same side in a game

36 of 60

37 of 60

Writing Thread-safe Code Part 2: Synchronized Methods

38 of 60

Critical Sections

  • A critical section of code is a section of code that only one thread should be allowed to run at a time

  • These are sections of code that access shared resources (variables, data structures, files, etc.) that are used by concurrent tasks

Critical Sections

public class Stack<T> {� private LinkedList<T> values;�� public Stack() {� values = new LinkedList<T>();� }�� public void push(T value) {� values.addFirst(value);� }�� public T pop() throws NoSuchElementException {� return values.removeFirst();� }�� public int size() {� return values.size();� }�}

39 of 60

Synchronized Methods in Java

  • Marking a method synchronized prevents multiple threads from entering the method on the same object at the same time

    • Threads execute the method one at a time

    • Multiple threads can execute the method at the same time, but not on the same object

public class Stack<T> {� private LinkedList<T> values;�� public Stack() {� values = new LinkedList<T>();� }�� public synchronized void push(T value) {� values.addFirst(value);� }�� public synchronized T pop() throws NoSuchElementException {� return values.removeFirst();� }�� public synchronized int size() {� return values.size();� }�}

40 of 60

Synchronized Methods in Java (2)

public class WebSocketSessions {�� //map of gameID to map of authToken to session� private final Map<Integer, Map<String, Session>> gameMap;�� //map of session to gameID� private final Map<Session, Integer> sessionMap;�� private WebSocketSessions() {� this.gameMap = new HashMap<>();� this.sessionMap = new HashMap<>();� }�� public synchronized void addSessionForGame(Integer gameID, String auth, Session session) {� ...

...� }�� public synchronized boolean removeSessionFromGame(Integer gameID, String auth) {� ...

...� }�� public synchronized void removeSession(Session session) {

...

...� }�}�

41 of 60

42 of 60

Writing Thread-safe Code Part 3: Synchronized Code Blocks

43 of 60

Synchronized Code Blocks in Java

  • Synchronized methods ensure single-threaded access to a method on a single object
    • What if you want to synchronize threads across multiple objects?
    • What if you want to synchronize only part of a method rather than the entire method?
  • Example: FileRaceConditionExample
    • Making the incrementCount method synchronized in the FileRaceConditionExample will not fix the race condition
    • Why not?

44 of 60

Synchronized Code Blocks in Java (2)

class FileCounter implements Runnable {� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private synchronized void incrementCount() throws IOException {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);}�}

This won’t work

45 of 60

Synchronized Code Blocks in Java (3)

class FileCounter implements Runnable {� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private synchronized void incrementCount() throws IOException {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);}�}

This won’t work

because there are

multiple FileCounter

objects, not just one

How can we synchronize

this code across multiple

FileCounter objects?

Synchronized code blocks!

46 of 60

Synchronized Code Blocks in Java (4)

class FileCounter implements Runnable {�� private static Object lockObject = new Object();�� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private void incrementCount() throws IOException {� synchronized (lockObject) {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);� }� }�}

47 of 60

48 of 60

Writing Thread-safe Code Part 4: Atomic Variables

49 of 60

Atomic Variables Overview

  • Synchronized code blocks protect critical sections
    • We only need ‘synchronized’ when we have critical sections
    • Protecting critical sections with ‘synchronized’ adds overhead
  • Some critical sections contain only one simple operation
    • x++
    • x = x + 10 (or x += 10)
    • if (stringRef == “Hi”) { stringRef = “Hello” }
  • Atomic variables can be used to read or write variable values with one CPU operation, thereby eliminating the need for synchronized methods or code blocks (more efficient)

50 of 60

AtomicInteger Example

Synchronized Example

public class SynchronizedIncrement {

private int x = 0;

public synchronized void increment() {

x++;

}

public synchronized int getX() {

return x;

}

}

AtomicInteger Example

public class AtomicIncrement {

private AtomicInteger x = new

AtomicInteger(0);

public int increment() {

return x.incrementAndGet();

}

public int getX() {

return x.get();

}

}

51 of 60

AtomicReference Example (synchronized)

public class StringReference {

private String stringRef = "Hello";

public synchronized void compareAndSet(String expectedValue, String newValue)

{

if(stringRef == expectedValue) {

stringRef = newValue;

}

}

public synchronized String getStringRef() {

return stringRef;

}

}

52 of 60

AtomicReference Example (atomic)

import java.util.concurrent.atomic.AtomicReference;

public class AtomicStringReference {

private AtomicReference<String> stringRef = new AtomicReference<>("Hello");

public void compareAndSet(String expectedValue, String newValue){

stringRef.compareAndSet(expectedValue, newValue);

}

public String getStringRef() {

return stringRef.get();

}

}

53 of 60

Atomic Variable Class Summary

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReference
  • AtomicReferenceArray
  • Others…

54 of 60

Partial List of Atomic Methods

Atomic Boolean

  • compareAndSet
  • get
  • getAndSet
  • set

Atomic Integer

  • addAndGet
  • compareAndSet
  • decrementAndGet
  • get
  • getAndAdd
  • getAndDecrement
  • getAndIncrement
  • getAndSet
  • incrementAndGet
  • updateAndGet

Atomic Reference

  • compareAndSet
  • get
  • getAndSet
  • getAndUpdate
  • set
  • updateAndGet

55 of 60

56 of 60

Race Conditions in Chess

57 of 60

Race Conditions in Chess Project

  • Although your Chess server code does not use Thread objects or thread pools, Javalin does create multiple thread objects behind the scenes to handle multiple incoming client requests/messages concurrently.

  • Similarly, the websocket library used by your Chess client code creates additional threads to handle websocket messages sent by the server.

  • The Chess project has several potential race conditions. We have already seen some of them:
    • Multiple users registering concurrently with the same username
    • Multiple users concurrently claiming the same side in the same game
    • Multiple users concurrently joining or leaving a game (even different ones)

58 of 60

Race Conditions in Chess Server

  • Race Condition: Multiple threads writing to the same web socket at the same time
  • Solution: Use “game locks” so the server processes only one web socket message per game at a time

  • Race Condition: A database only supports a limited number of simultaneous open connections. If too many requests/messages come in at a time, the database will run out of connections.
  • Solution: Use a “connection pool” that limits how many connections can be open at a time. If no connections are currently available, a thread will have to wait until one becomes available.

59 of 60

Race Conditions in Chess Client

  • Chess client has two threads:
    • Main thread that processes user input
    • Web socket thread that processes messages coming from the server

  • Race Condition: Both threads read/write the local Game object simultaneously
  • Solution: Synchronize all code that reads or writes the Game object so only one thread accesses it at a time

  • Race Condition: Both threads print output to the terminal simultaneously (draw the board and print messages)
  • Solution: Synchronize all code that prints output to the terminal so only one thread writes to the terminal at a time

60 of 60