Concurrency and Multi-threaded Programming
CS 240 – Advanced Software Construction
Multi-Threaded Programming Overview
Threads
main()
Threads (2)
Threads (3)
Java Threads Example
public class JavaThreadExample {
public static void main(String[] args) {
CountingThread countUp =
new CountingThread("UP", 0, 50, 1);
CountingThread countDown =
new CountingThread("DOWN", 50, 0, -1);
countUp.start();
countDown.start();
System.out.println("Leaving Main Thread");
}
}
class CountingThread extends Thread {
private String name;
private int start;
private int stop;
private int increment;
public CountingThread(
String name, int start, int stop, int increment) {
this.name = name;
this.start = start;
this.stop = stop;
this.increment = increment;
}
public void run() {
for (int i = start; i != stop; i += increment) {
System.out.println(name + ": " + i);
}
}
}
Parallel vs. Concurrent vs. Sequential Execution
Thread Synchronization
(just the basics)
Making a Thread Wait for Another
public class JavaThreadExample {
public static void main(String[] args)throws
InterruptedException {
CountingThread countUp =
new CountingThread("UP", 0, 50, 1);
CountingThread countDown =
new CountingThread("DOWN", 50, 0, -1);
countUp.start();
countUp.join();
countDown.start();
countDown.join();
System.out.println("Leaving Main Thread");
}
}
class CountingThread extends Thread {
private String name;
private int start;
private int stop;
private int increment;
public CountingThread(
String name, int start, int stop, int increment) {
this.name = name;
this.start = start;
this.stop = stop;
this.increment = increment;
}
public void run() {
for (int i = start; i != stop; i += increment) {
System.out.println(name + ": " + i);
}
}
}
Thread Pools
Thread Pools
Thread Pools (2)
Java Thread Pools
Java Thread Pools (2)
Runnable vs. Callable
Java Thread Pool Example
�import java.util.concurrent.*;���public class JavaThreadPoolExample {� � public static void main(String[] args) throws InterruptedException {� � ExecutorService threadPool = Executors.newFixedThreadPool(5);�� for (int i = 0; i < 10; ++i) {� threadPool.submit(new Counter("UP_" + i, 0, 50, 1));� threadPool.submit(new Counter("DOWN_" + i, 50, 0, -1));� }� � threadPool.shutdown();� � threadPool.awaitTermination(1, TimeUnit.MINUTES);�� System.out.println("Leaving Main Thread");� }�}���class Counter implements Runnable {� � private String _name;� private int _start;� private int _stop;� private int _increment;� � public Counter(String name, int start, int stop, int increment) {� _name = name;� _start = start;� _stop = stop;� _increment = increment;� }� � public void run() {� for (int i = _start; i != _stop; i += _increment) { � System.out.println(_name + ": " + i);� }� }�}
Citations (Multi-Threaded Programming)
Diagrams created by course authors: Jerod Wilkerson and Ken Rodham
Race Conditions�(or Race Hazards)
Race Condition
In-memory Data structure Race Condition Example
public class Stack<T> {� private LinkedList<T> values;�� public Stack() {� values = new LinkedList<T>();� }�� public void push(T value) {� values.addFirst(value);� }�� public T pop() throws NoSuchElementException {� return values.removeFirst();� }�� public int size() {� return values.size();� }�}
In-memory Data structure Race Condition Example (2)
/**� * Stores web socket connections (Session) for each game� */�public class WebSocketSessions {�� //map of gameID to sessions participating in that game� private final Map<Integer, Set<Session>> gameMap;�� //map of session to gameID� private final Map<Session, Integer> sessionMap;�� private WebSocketSessions() {� this.gameMap = new HashMap<>();� this.sessionMap = new HashMap<>();� }�� public void addSessionForGame(Integer gameID, Session session) {� ...
...� }�� public boolean removeSessionFromGame(Integer gameID, Session session) {� ...
...� }�� public void removeSession(Session session) {
...
...� }�}�
File Race Condition Example
package demo;��import java.io.*;�import java.util.*;�import java.util.concurrent.*;���public class FileRaceConditionExample {�� public static void main(String[] args) throws InterruptedException {�� final File FILE_PATH = new File("count");� final int ITERATIONS = 100;�� try (var output = new PrintWriter(FILE_PATH)) {� output.print(0);� } catch (IOException ex) {� ex.printStackTrace();� }�� ExecutorService threadPool = Executors.newCachedThreadPool();�� for (int i = 0; i < 2; ++i) {� threadPool.submit(new FileCounter(i, FILE_PATH, ITERATIONS));� }�� threadPool.shutdown();�� threadPool.awaitTermination(1, TimeUnit.MINUTES);�� System.out.println("Leaving Main Thread");� }�}
class FileCounter implements Runnable {�� //private static Object lockObject = new Object();�� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private void incrementCount() throws IOException {� //synchronized (lockObject) {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);� //}� }��}
Database Race Condition Example
Database Race Condition Example (2)
Writing Thread-safe Code Part 1: Database Transactions
Writing Thread-safe Code
Database Transactions
Database Transactions (2)
public Connection openConnection() throws DataAccessException {� try {� if (conn != null){� throw new DataAccessException("Database connection already open");� }�� final String CONNECTION_URL = DATABASE_DRIVER + MYSQL_URL;�
// Open a connection� conn = DriverManager.getConnection(CONNECTION_URL, DB_USERNAME, DB_PASSWORD);�� // Start a transaction� conn.setAutoCommit(false);�� } catch (SQLException e) {� e.printStackTrace();� throw new DataAccessException("Unable to open connection to database");� }
public void closeConnection(boolean commit) {� if (conn == null) {� return;� }�� try {� if (commit) {
// Commit the transaction� conn.commit();� } else {
// Rollback the transaction� conn.rollback();� }�
// Close the connection� conn.close();� conn = null;
� } catch (SQLException e) {� e.printStackTrace();� }�}
Database Transactions (3)
Database Transactions (4)
Database Transactions (5)
Database Transactions (6)
Writing Thread-safe Code Part 2: Synchronized Methods
Critical Sections
Critical Sections
public class Stack<T> {� private LinkedList<T> values;�� public Stack() {� values = new LinkedList<T>();� }�� public void push(T value) {� values.addFirst(value);� }�� public T pop() throws NoSuchElementException {� return values.removeFirst();� }�� public int size() {� return values.size();� }�}
Synchronized Methods in Java
public class Stack<T> {� private LinkedList<T> values;�� public Stack() {� values = new LinkedList<T>();� }�� public synchronized void push(T value) {� values.addFirst(value);� }�� public synchronized T pop() throws NoSuchElementException {� return values.removeFirst();� }�� public synchronized int size() {� return values.size();� }�}
Synchronized Methods in Java (2)
public class WebSocketSessions {�� //map of gameID to map of authToken to session� private final Map<Integer, Map<String, Session>> gameMap;�� //map of session to gameID� private final Map<Session, Integer> sessionMap;�� private WebSocketSessions() {� this.gameMap = new HashMap<>();� this.sessionMap = new HashMap<>();� }�� public synchronized void addSessionForGame(Integer gameID, String auth, Session session) {� ...
...� }�� public synchronized boolean removeSessionFromGame(Integer gameID, String auth) {� ...
...� }�� public synchronized void removeSession(Session session) {
...
...� }�}�
Writing Thread-safe Code Part 3: Synchronized Code Blocks
Synchronized Code Blocks in Java
Synchronized Code Blocks in Java (2)
class FileCounter implements Runnable {� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private synchronized void incrementCount() throws IOException {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);� }�}
This won’t work
Synchronized Code Blocks in Java (3)
class FileCounter implements Runnable {� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private synchronized void incrementCount() throws IOException {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);� }�}
This won’t work
because there are
multiple FileCounter
objects, not just one
How can we synchronize
this code across multiple
FileCounter objects?
Synchronized code blocks!
Synchronized Code Blocks in Java (4)
class FileCounter implements Runnable {�� private static Object lockObject = new Object();�� private int _threadNumber;� private File _filePath;� private int _iterations;�� public FileCounter(int threadNumber, File filePath, int iterations) {� _threadNumber = threadNumber;� _filePath = filePath;� _iterations = iterations;� }�� public void run() {� for (int i = 0; i < _iterations; ++i) {� try {� incrementCount();� } catch (Exception ex) {� ex.printStackTrace();� }� }� }�� private void incrementCount() throws IOException {� synchronized (lockObject) {� int count;�� try (var input = new Scanner(_filePath)) {� count = input.nextInt();� }�� ++count;�� try (var output = new PrintWriter(_filePath)) {� output.print(count);� }�� System.out.println("Thread_" + _threadNumber + ": " + count);� }� }�}
Writing Thread-safe Code Part 4: Atomic Variables
Atomic Variables Overview
AtomicInteger Example
Synchronized Example
public class SynchronizedIncrement {
private int x = 0;
public synchronized void increment() {
x++;
}
public synchronized int getX() {
return x;
}
}
AtomicInteger Example
public class AtomicIncrement {
private AtomicInteger x = new
AtomicInteger(0);
public int increment() {
return x.incrementAndGet();
}
public int getX() {
return x.get();
}
}
AtomicReference Example (synchronized)
public class StringReference {
private String stringRef = "Hello";
public synchronized void compareAndSet(String expectedValue, String newValue)
{
if(stringRef == expectedValue) {
stringRef = newValue;
}
}
public synchronized String getStringRef() {
return stringRef;
}
}
AtomicReference Example (atomic)
import java.util.concurrent.atomic.AtomicReference;
public class AtomicStringReference {
private AtomicReference<String> stringRef = new AtomicReference<>("Hello");
public void compareAndSet(String expectedValue, String newValue){
stringRef.compareAndSet(expectedValue, newValue);
}
public String getStringRef() {
return stringRef.get();
}
}
Atomic Variable Class Summary
Partial List of Atomic Methods
Atomic Boolean
Atomic Integer
Atomic Reference
See Javadoc for the java.util.concurrent.atomic package
Race Conditions in Chess
Race Conditions in Chess Project
Race Conditions in Chess Server
Race Conditions in Chess Client