1 of 45

Parallel programming using Message passing paradigm

Georges Da Costa�georges.da-costa@irit.fr

2 of 45

Context

Interconnexion Network

Distributed memory computer

RAM

CPU

NIC

RAM

CPU

NIC

RAM

CPU

NIC

RAM

CPU

NIC

RAM

CPU

NIC

RAM

CPU

NIC

RAM

NIC

Cores

Network 1

Network 3

Network 2

Interconnexion Network

RAM

CPU

NIC

Cores

3 of 45

Goals of this lecture

Know the basic mechanisms of programming parallel machines with distributed memory

Learn how to parallelize a code

Know how to evaluate the quality of parallelization

Know the MPI (Message Passing Interface) library

4 of 45

Core characteristics

No shared memory :

  • All communications between processes are explicit:
    • message passing
  • Or use of a distributed file system
    • (e.g. NFS, Network File System),
    • which is in fact a message passing

5 of 45

Core properties of message passing systems

Explicits parallel processes

Executions are asynchronous, so they must synchronize to send messages to each other

Each process executes potentially different instructions

Processes can be dynamic (in number and therefore in communications)

Explicit communications by passing messages (what?, to whom?)

6 of 45

Parallelization

  • Identification of pieces of code that can be executed independently of each other
    • example: (with N processes)

for (i=0; i<N; i++) a[i] = i^3;

  • Identify what needs to be communicated between processes
    • example : (with N processes)

b[0] = 1�for (i=1; i<N; i++) b[i] = b[i-1] * 3;

7 of 45

Ensure proper load balancing

If we have P processors (numbered from 0 to P-1), how do we parallelize the following code?

for (i=0; i<N; i++) a[i] = i^3;

Solution 1

Solution 2

Solution 3

What is the time for each solution? (in number of multiplications)

What if P>N ?

T1 = 4*2

T2 = 10*2

T3 = 4*2

8 of 45

Take into account communication time

If we have P processors, how can we parallelize the following code?

b[0] = 1; ��for (i=1; i<N; i++) b[i] = b[i-1] * 3;

How many network messages in the three following cases ?

M1 = 3

T2 = 3

T3 = 12

9 of 45

Formally

Time of a parallel program = Computation time + communication time

  • Reduce the computation time:
    • Add processors, but at the risk of increasing communications
  • Reduce the communication time:
    • Asynchronous communications
    • Overlap calculation-communication
    • Avoid waiting between processors (barriers, synchro)

10 of 45

Speed-up

Let T = the time of the sequential program

Let T2 = T//(N) the time of the program in parallel with N processors

Speedup(N) = S(N) = T//(1) / T//(N)

Example :

  • A sequential program runs in 12s.
  • The same program adapted to run in parallel runs in 4s when it is run in parallel on 5 processors
  • Speedup(5)?

S(5) = 12 / 4 = 3. It goes 3 times faster with 5 processors than with 1 processor.

11 of 45

Speedup taxonomy

Linear acceleration if the ratio between Ts and T//(N) is equal to N

Sublinear acceleration if the ratio is < N

Slowdown if the ratio is < 1

Over-linear acceleration if the ratio is > N

More complicated situation, the characterization depends on N (example: over-linear then sublinear...)

Speed-up

12 of 45

Amdahl’s Law

Not all the code can be parallelized

  • For example a read on a parameter file at the beginning of the program.

Let s be the percentage of time of the purely sequential program.

T = sT + (1-s)T

so T//(N) = sT + (1-s) T / N (with N the number of processors)

then S(N) = T / T//(N) = 1 / (s + (1-s) / N)

Example:

  • A sequential program runs in 12s.
  • It starts by reading a parameter file for 3s
  • The remaining is parallelizable.
  • The parallel part benefits linearly from the addition of processors.

What is the acceleration with 5 processors?

T = 3/12 T + 9/12 T → T// = 3/12 T + 9/12 T / N

→ S(5) = 1 / (3/12 + (9/12) / 5) = 2.5

13 of 45

Example: N = 32. If the acceleration is linear on the parallel part, and if s=10%,

then S(32) = 1 / (0.10 + 0.90 / 32) = 7.8

limN→+∞S(N)=1/s

S(N)<=1/s

from Wikipedia

14 of 45

Data parallelism versus task parallelism

Data parallelism: data are distributed on the different processors (example of the previous table a)

Parallelism of tasks : the tasks are distributed on the different processors

  • Example : at the initialization, an array tab is copied on 2 machines P1 and P2.

P1:

while True:

r1 = compute_sum(tab)

send_to(2, r1)

r2 = recv_from(2)

tab = update(tab, r1*r2)

P2:while True� r1 = compute_product(tab)

r2 = recv_from(1)

send_to(1, r1)

tab = update(tab, r1+r2)

15 of 45

Parallelization of a code

Trapezoidal method for estimating an integral

The integral of the function f in blue is approximated by the area under the line in red

16 of 45

Parallelization of a code

Trapezoidal method for estimating an integral

To improve the estimation, intermediate points are added, for a total of N+1 points

17 of 45

Parallelization of a code

It is assumed that we have P processors and N trapezoids with P that divides N.

Algorithm 1:

  1. The process P0 calculates the interval [ai,bi] that each process [0...P-1] will have to calculate, and the number of trapezoids Ti in this interval [ai,bi].
  2. P0 sends the interval [ai, bi] and Ti to each process Pi, with 0<i<P
  3. Each Pi, 0<i<P receives its interval [ai, bi] and Ti
  4. Each Pi, 0≤i<P, calculates the function on its interval
  5. Each Pi, 0<i<P, sends its local result to P0
  6. P0 makes the final sum

Example: P=2, N=10, y=x², x in [0,1]

  • P0 computes intervals for all:
    • P0: [0, 0.5], 5 Trap; P1: [0.5, 1], 5 Trap
  • P0 sends [0.5, 1], T1=5 to P1
  • P1 receives them
  • Both processors compute in parallel
    • P0 computes 0.0425
    • P1 computes 0.2925
  • P1 sends 0.2925 to P0
  • P0 computes the final result
    • result = 0.0425 + 0.2925 = 0.335

18 of 45

Parallelization of a code

It is assumed that we have P processors and N trapezoids with P that divides N.

Algorithm 1:

  • The process P0 calculates the interval [ai,bi] that each process [0...P-1] will have to calculate, and the number of trapezoids Ti in this interval [ai,bi].
  • P0 sends the interval [ai, bi] and Ti to each process Pi, with 0<i<P
  • Each Pi, 0<i<P receives its interval [ai, bi] and Ti
  • Each Pi, 0≤i<P, calculates the function on its interval
  • Each Pi, 0<i<P, sends its local result to P0
  • P0 makes the final sum
  1. What are the sequential steps?
    1. Is it possible to remove them?

  • What if P does not divide N?
    • Example: N=109, P=10
    • How to modify the program to tackle this?
    • This is called load balancing

19 of 45

Comparison with Synchronization

Synchronization

  • Multiple workflow at the same time: MIMD
  • Each process works at its own speed
  • Uses events / shared data to cooperate

Parallelism

  • One synchronized workflow: SIMD
  • Each process alternates parallel tasks and synchronization
  • One process coordinates the tempo

20 of 45

MPI : Message Passing Interface

21 of 45

MPI : Message Passing Interface

  • MPI: a standard defining a library of functions for message passing (including on shared memory machines) and parallel I/O�
  • Specialized implementations optimized for parallel machines (OpenMPI, MPICHv2)�
  • C, C++, Fortran, Java, Perl, OCaml, Python interfaces (mpi4py)�
  • In MPI, a process is executed on a single processor (or on a single core).

22 of 45

How to use MPI

MPI is used by running the mpirun command

  • mpirun -n <nb> <cmd>
  • <nb> is the number of processes
  • <cmd> is the command to run

mpirun starts <nb> processes each running <cmd>

Reminder: processes do not share memory. Communication must be explicit !

Simplest mpi program

echo !

Execution:

mpirun -n 2 echo Hi!

Will print

Hi!�Hi!

23 of 45

mpirun -n 2 python3 hello.py

from mpi4py import MPI

comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

print('Hello from', rank, "of", size)

from mpi4py import MPI

comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

print('Hello from', rank, "of", size)

Hello from 0 of 2

Hello from 1 of 2

Rank = 0

Rank = 1

24 of 45

mpirun -n 2 python3 demo.py

from mpi4py import MPI�import numpy as np

comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

if rank == 0:� data = np.array([24, 17], dtype='i')�else:� data = None

l_data = np.empty(1, dtype='i')

comm.Scatter(data, l_data, root=0)

print(rank, "of", size, ":", l_data)

from mpi4py import MPI�import numpy as np

comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

if rank == 0:� data = np.array([24, 17], dtype='i')�else:� data = None

l_data = np.empty(1, dtype='i')

comm.Scatter(data, l_data, root=0)

print(rank, "of", size, ":", l_data)

1 of 2 : [17]

0 of 2 : [24]

Rank = 0

Rank = 1

25 of 45

mpirun -n 2 python3 demo.py

from mpi4py import MPI�import numpy as np

comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

if rank == 0:� data = np.array([24, 17, 3, 30], dtype='i')�else:� data = None

l_data = np.empty(2, dtype='i')

comm.Scatter(data, l_data, root=0)

print(rank, "of", size, ":", l_data)

from mpi4py import MPI�import numpy as np

comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

if rank == 0:� data = np.array([24, 17, 3, 30], dtype='i')�else:� data = None

l_data = np.empty(2, dtype='i')

comm.Scatter(data, l_data, root=0)

print(rank, "of", size, ":", l_data)

1 of 2 : [ 3 30]

0 of 2 : [24 17]

Rank = 0

Rank = 1

26 of 45

MPI : Communicator

  • Communicator: connects a group of processes together.
  • They are linked by a certain topology.
  • Communication operations are done:
    • within a group (intracommunicator) or
    • between groups (intercommunicator)

At the beginning, the processes are all in the same group (MPI.COMM_WORLD)

27 of 45

MPI: Point-to-point communications

Sending and receiving data (com.Send, com.Receive)

Blocking and non-blocking

and ready-send (sending is only done if the associated receiving has also been done)

Rarely used

28 of 45

MPI: Collective operations

Communication for all the group�https://www.irit.fr/~Georges.Da-Costa/par/mpi_collectives/

  1. Diffusion (broadcast): 1 to all
  2. Reduction (reduce): all to 1
  3. Full communication: all to all

Example:

  • Scatter: takes elements from one root and scatter them on all servers

Remark:

  • All processes must execute the command

from mpi4py import MPI�import numpy as np�comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()

if rank == 0:� data = np.array([24, 17], dtype='i')� result = np.empty(1, dtype='i')�Else:� data = None� result = None

l_data = np.empty(1, dtype='i')�comm.Scatter(data, l_data, root=0)

l_data = 3 * l_data

comm.Reduce(l_data, result, op=MPI.SUM, root=0)

if rank == 0:� print(result)

[123]

29 of 45

MPI: Actions on other processors

Capability to directly change memory on remote processes

  • Write on a remote memory
    • Put
  • Read on a remote memory
    • Get
  • 3 methods to synchronize these communications
    • global, peer to peer, or with remote locks

30 of 45

MPI: Data types

Interoperable MPI type (can be used on all languages):

  • Basic types: integers, real (single or double precision, characters, ...): MPI.INT, MPI.CHAR, MPI.FLOAT, …
  • The derived types (MPI.Datatype):
    • contiguous type (MPI_Type_contiguous) : homogeneous and contiguous data in memory
    • vector type (MPI_Type_vector) : homogeneous data spaced by a constant space in memory
    • indexed type (MPI_Type_indexed) : same but variable space in memory
    • structure type (MPI_Type_create_struct) : for heterogeneous data

Python-specific type also exists (easier but slower, not used during the session)

31 of 45

MPI and Python

mpirun can start processes on several computers at the same time. From inside the code, being on one computer or multiple one is transparent

> mpirun -n 4 python3 hello.py

from mpi4py import MPI

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

print ("hello world from process ", rank)

hello.py

Starts 4 processes, each being a python interpreter

32 of 45

MPI and Python: Communications

2 types of communication primitives :

  • Generic python object communication: lowercase primitives are used (ex: comm.send()). Simple use.

  • Communication of objects organized in an array, for example from the Numpy digital library. Primitives used have their first letter in uppercase (ex: comm.Send()). Very fast, optimized! The one we will use.

33 of 45

Example 1: Python objects

import numpy as np

from mpi4py import MPI

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

randNum = np.random.randint(100, size=1, dtype='i')

if rank == 1:

print ("Process", rank, "drew the number", randNum)

comm.Send(randNum, dest=0)

if rank == 0:

print ("Process", rank, "before receiving has the number", randNum)

comm.Recv(randNum, source=1)

print ("Process", rank, "received the number", randNum)

1stMPI.py

Blocking sending and receiving primitives

Send : the object to send, where ?

Receiving : from whom ? returns the received object

The treatment on each process is distinguished by its rank

Process 0 before receiving has the number [17]

Process 1 drew the number [27]

Process 0 received the number [27]

34 of 45

Example 1bis: Python objects (more complex)

import numpy as np

from mpi4py import MPI

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

if rank == 1:

randNum = np.random.randint(100, size=10, dtype='i')

comm.Send(randNum, dest=0, tag=11)

if rank == 0:

results = np.empty(10, dtype='i')

comm.Recv(results, source=1, tag=11)

print ("Process", rank, "received", results)

1stBis.py

If source = MPI.ANY_SOURCE

—> receives from any source

tag: allows to identify the messages (optional)

Here data is an array. Can be any type of Python numpy data.

Process 0 received [47 64 68 77 37 73 30 65 10 54]

35 of 45

Asynchronous primitives

1stTer.py

Non-blocking send and receive primitives.

wait(): waits for the end of the sending/receiving

import numpy as np

from mpi4py import MPI

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

if rank == 1:

randNum = np.random.randint(100, size=10, dtype='i')

req = comm.Isend(randNum, dest=0, tag=11)

# This process can continue working during the communication

req.wait()

if rank == 0:

results = np.empty(10, dtype='i')

req = comm.Irecv(results, source=1, tag=11)

# This process can continue working during the communication

req.wait()

print ("Process", rank, "received", results)

36 of 45

Type management

2stMPI.py

***************** VERSION 2 ****************

[1 2 3 4 5 6 7 8 9]

***************** VERSION 2 ****************

[1. 2. 3. 4. 5. 6. 7. 8. 9.]

from mpi4py import MPI

import numpy

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

# passing MPI datatypes explicitly

if rank == 0:

data = numpy.arange(1000, dtype='i')

comm.Send([data, MPI.INT], dest=1, tag=77)

elif rank == 1:

data = numpy.empty(1000, dtype='i')

comm.Recv([data, MPI.INT], source=0, tag=77)

print(data[1:10])

print("***************** VERSION 2 ****************")

# automatic MPI datatype discovery

if rank == 0:

data = numpy.arange(1000, dtype=numpy.float64)

comm.Send(data, dest=1)

elif rank == 1:

data = numpy.empty(1000, dtype=numpy.float64)

comm.Recv(data, source=0)

print(data[1:10])

37 of 45

Trapezoid method (sequential)

import sys

a, b, n = float(sys.argv[1]), float(sys.argv[2]), int(sys.argv[3])

def f(x):

return x*x

def integrateRange(a, b, n):

integral = -(f(a) + f(b))/2.0

x, h = a, (b-a)/n

while (x<b):

integral = integral + f(x)

x = x + h

integral = integral * h

return integral

integral = integrateRange(a, b, n)

print ("With n =", n, "trapezoids, our estimate of the integral from", a, "to", b, "is", integral)

with h = (b-a) / N

trapezeSerial.py

$ python3 trapezeSerial.py 0.0 10.0 100000

With n = 100000 trapezoids, our estimate of the integral from 0.0 to 10.0 is 333.333333349691

38 of 45

Trapezoid method (parallel)

#h is the step size. n is the total number of trapezoids

h = (b-a)/n

#local_n is the number of trapezoids each process will calculate

#note that size must divide n

local_n = n//size

#we calculate the interval that each process handles

#local_a is the starting point and local_b is the endpoint

local_a = a + rank*local_n*h

local_b = local_a + local_n*h

#initializing variables. mpi4py requires that we pass numpy objects.

integral = numpy.zeros(1)

recv_buffer = numpy.zeros(1)

# perform local computation. Each process integrates its own interval

integral[0] = integrateRange(local_a, local_b, local_n)

# communication

# root node receives results from all processes and sums them

if rank == 0:

total = integral[0]

for i in range(1, size):

comm.Recv(recv_buffer, ANY_SOURCE)

total += recv_buffer[0]

else:

# all other process send their result

comm.Send(integral, dest=0)

# root process prints results

if comm.rank == 0:

print ("With n =", n, "trapezoids, our estimate of the integral from" , a, "to", b, "is", total)

import numpy

import sys

from mpi4py import MPI

from mpi4py.MPI import ANY_SOURCE

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

size = comm.Get_size()

#takes in command-line arguments [a,b,n]

a = float(sys.argv[1])

b = float(sys.argv[2])

n = int(sys.argv[3])

#we arbitrarily define a function to integrate

def f(x):

return x*x

#this is the serial version of the trapezoidal rule

def integrateRange(a, b, n):

integral = -(f(a) + f(b))/2.0

# n+1 endpoints, but n trapazoids

for x in numpy.linspace(a,b,n+1):

integral = integral + f(x)

integral = integral* (b-a)/n

return integral

trapezeParallel-1-Numpy.py

$ mpirun -n 4 python3 trapezeParallel-1-numy.py 0.0 10.0 100000

With n = 100000 trapezoids, our estimate of the integral from 0.0 to 10.0 is 333.3302083498043

39 of 45

Trapezoid method (parallel V2)

h = (b-a)/n

local_n = n//size

#we calculate the interval that each process handles

local_a = a + rank*local_n*h

local_b = local_a + local_n*h

#initializing variables. mpi4py requires that we pass numpy objects.

integral = numpy.zeros(1)

total = numpy.zeros(1)

# perform local computation. Each process integrates its own interval

integral[0] = integrateRange(local_a, local_b, local_n)

# communication

# root node receives results with a collective "reduce"

comm.Reduce(integral, total, op=MPI.SUM, root=0)

# root process prints results

if comm.rank == 0:

print ("With n =", n, "trapezoids, our estimate of the integral from", a, "to", b, "is", total)

import numpy

import sys

from mpi4py import MPI

from mpi4py.MPI import ANY_SOURCE

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

size = comm.Get_size()

#takes in command-line arguments [a,b,n]

a = float(sys.argv[1])

b = float(sys.argv[2])

n = int(sys.argv[3])

def f(x):

return x*x

def integrateRange(a, b, n):

integral = -(f(a) + f(b))/2.0

# n+1 endpoints, but n trapazoids

for x in numpy.linspace(a,b,n+1):

integral = integral + f(x)

integral = integral* (b-a)/n

return integral

trapezeParallel-2-Numpy.py

40 of 45

Collective operation: Reduce

$ mpirun -n 4 python3 reduce.py

Reduced on 1 : rank= 3 somme= 0

Reduced on 1 : rank= 0 somme= 0

Reduced on 1 : rank= 2 somme= 0

Reduced on 1 : rank= 1 somme= 6

Allreduce on 1 maximum= 3

Allreduce on 3 maximum= 3

Allreduce on 2 maximum= 3

Allreduce on 0 maximum= 3

from mpi4py import MPI

import numpy

comm = MPI.COMM_WORLD

rank = numpy.zeros(1, dtype='i')

rank[0] = comm.Get_rank()

somme = numpy.zeros(1, dtype='i')

comm.Reduce(rank, somme, op=MPI.SUM, root=1)

print ("Reduced on 1 : rank=", rank[0], "somme=", somme[0])

maxi = numpy.zeros(1, dtype='i')

comm.Allreduce(rank, maxi, op=MPI.MAX)

print ("Allreduce on ", rank[0], "maximum=", maxi[0])

reduce.py

Possible operations: MPI.SUM, MPI.MAX, MPI.MIN, …

41 of 45

Collective operation: Broadcast

from mpi4py import MPI

comm = MPI.COMM_WORLD

import numpy as np

rank = comm.Get_rank()

if rank == 0:

A = np.arange(0, 10, dtype='f')

else:

A = np.zeros(10, dtype = 'f')

comm.Bcast(A, root=0)

print ("A on ", rank, "=", A)

bcast.py

$ mpirun -n 3 python3 bcast.py

A on 1 = [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]

A on 0 = [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]

A on 2 = [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]

Only the processus 0 knows A

After the Broadcast all nodes know A

42 of 45

Collective operation: Scatter

from mpi4py import MPI

comm = MPI.COMM_WORLD

import numpy as np

rank = comm.Get_rank()

if rank == 0:

A = np.arange(0, 9, dtype='f')

else:

A = None

local_A = np.zeros(3, dtype = 'f')

comm.Scatter(A, local_A, root=0)

print ("local_A on ", rank, "=", local_A)

scatter.py

$ mpirun -n 3 python3 scatter.py

local_A on 2 = [6. 7. 8.]

local_A on 0 = [0. 1. 2.]

local_A on 1 = [3. 4. 5.]

At first, only process 0 know A

After Scatter, all processes know their element from A

�Important: The number of elements of A must be the a multiple of the number of processes

43 of 45

Collective operation: Gather

from mpi4py import MPI

comm = MPI.COMM_WORLD

import numpy as np

rank = comm.Get_rank()

size = comm.Get_size()

local_rank = np.array([rank], dtype='i')

A = np.empty(size, dtype = 'i')

print ("local_rank on ", rank, "=", local_rank)

comm.Gather(local_rank, A, root=0)

print ("A After Gather on ", rank, "=", A)

comm.Allgather(local_rank, A)

print ("A After AllGather on ", rank, "=", A)

gather.py

$ mpirun -n 3 python3 gather.py

local_rank on 1 = [1]

local_rank on 2 = [2]

local_rank on 0 = [0]

A After Gather on 1 = [1600851200 32511 17736512]

A After Gather on 2 = [-1852859136 32694 42946160]

A After Gather on 0 = [0 1 2]

A After AllGather on 2 = [0 1 2]

A After AllGather on 0 = [0 1 2]

A After AllGather on 1 = [0 1 2]

44 of 45

Measure performance

from mpi4py import MPI

import numpy

comm = MPI.COMM_WORLD

rank = comm.Get_rank()

comm.Barrier()

start = MPI.Wtime()

A = numpy.array([[1.,2.,3.],[4.,5.,6.],[7.,8.,9.]])

local_a = numpy.zeros(3)

comm.Scatter(A,local_a)

print ("process", rank, "has", local_a)

comm.Barrier()

end = MPI.Wtime()

print ("Durée sur", rank, ": ", end - start)

timeNumpy.py

$ mpirun -n 3 python3 time.py

process 0 has [1.0, 2.0, 3.0]

Time on 0 : 0.00011200000000000099

process 1 has [4.0, 5.0, 6.0]

Time on 1 : 0.00012699999999998823

process 2 has [7.0, 8.0, 9.0]

Time on 2 : 0.00011099999999999999

Only needed to synchronize

the start of all processes and

increase precision of measure.

45 of 45

Credits