Parallel programming using Message passing paradigm
Georges Da Costa�georges.da-costa@irit.fr
Context
Interconnexion Network
Distributed memory computer
RAM
CPU
NIC
RAM
CPU
NIC
RAM
CPU
NIC
RAM
CPU
NIC
RAM
CPU
NIC
RAM
CPU
NIC
RAM
NIC
Cores
Network 1
Network 3
Network 2
Interconnexion Network
RAM
CPU
NIC
Cores
Goals of this lecture
Know the basic mechanisms of programming parallel machines with distributed memory
Learn how to parallelize a code
Know how to evaluate the quality of parallelization
Know the MPI (Message Passing Interface) library
Core characteristics
No shared memory :
Core properties of message passing systems
Explicits parallel processes
Executions are asynchronous, so they must synchronize to send messages to each other
Each process executes potentially different instructions
Processes can be dynamic (in number and therefore in communications)
Explicit communications by passing messages (what?, to whom?)
Parallelization
for (i=0; i<N; i++) a[i] = i^3;
b[0] = 1�for (i=1; i<N; i++) b[i] = b[i-1] * 3;
Ensure proper load balancing
If we have P processors (numbered from 0 to P-1), how do we parallelize the following code?
for (i=0; i<N; i++) a[i] = i^3;
Solution 1
Solution 2
Solution 3
What is the time for each solution? (in number of multiplications)
What if P>N ?
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| | | | | | | | | | | | |
T1 = 4*2
T2 = 10*2
T3 = 4*2
Take into account communication time
If we have P processors, how can we parallelize the following code?
b[0] = 1; ��for (i=1; i<N; i++) b[i] = b[i-1] * 3;
How many network messages in the three following cases ?
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| | | | | | | | | | | | |
M1 = 3
T2 = 3
T3 = 12
Formally
Time of a parallel program = Computation time + communication time
Speed-up
Let T = the time of the sequential program
Let T2 = T//(N) the time of the program in parallel with N processors
Speedup(N) = S(N) = T//(1) / T//(N)
Example :
S(5) = 12 / 4 = 3. It goes 3 times faster with 5 processors than with 1 processor.
Speedup taxonomy
Linear acceleration if the ratio between Ts and T//(N) is equal to N
Sublinear acceleration if the ratio is < N
Slowdown if the ratio is < 1
Over-linear acceleration if the ratio is > N
More complicated situation, the characterization depends on N (example: over-linear then sublinear...)
Speed-up
Amdahl’s Law
Not all the code can be parallelized
Let s be the percentage of time of the purely sequential program.
T = sT + (1-s)T
so T//(N) = sT + (1-s) T / N (with N the number of processors)
then S(N) = T / T//(N) = 1 / (s + (1-s) / N)
Example:
What is the acceleration with 5 processors?
T = 3/12 T + 9/12 T → T// = 3/12 T + 9/12 T / N
→ S(5) = 1 / (3/12 + (9/12) / 5) = 2.5
Example: N = 32. If the acceleration is linear on the parallel part, and if s=10%,
then S(32) = 1 / (0.10 + 0.90 / 32) = 7.8
limN→+∞S(N)=1/s
S(N)<=1/s
from Wikipedia
Data parallelism versus task parallelism
Data parallelism: data are distributed on the different processors (example of the previous table a)
Parallelism of tasks : the tasks are distributed on the different processors
P1:
while True:
r1 = compute_sum(tab)
send_to(2, r1)
r2 = recv_from(2)
tab = update(tab, r1*r2)
P2:�while True� r1 = compute_product(tab)
r2 = recv_from(1)
send_to(1, r1)
tab = update(tab, r1+r2)
Parallelization of a code
Trapezoidal method for estimating an integral
The integral of the function f in blue is approximated by the area under the line in red
Parallelization of a code
Trapezoidal method for estimating an integral
To improve the estimation, intermediate points are added, for a total of N+1 points
Parallelization of a code
It is assumed that we have P processors and N trapezoids with P that divides N.
Algorithm 1:
Example: P=2, N=10, y=x², x in [0,1]
Parallelization of a code
It is assumed that we have P processors and N trapezoids with P that divides N.
Algorithm 1:
Comparison with Synchronization
Synchronization
Parallelism
MPI : Message Passing Interface
MPI : Message Passing Interface
How to use MPI
MPI is used by running the mpirun command
mpirun starts <nb> processes each running <cmd>
Reminder: processes do not share memory. Communication must be explicit !
Simplest mpi program
echo !
Execution:
mpirun -n 2 echo Hi!
Will print
Hi!�Hi!
mpirun -n 2 python3 hello.py
from mpi4py import MPI
comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
print('Hello from', rank, "of", size)
from mpi4py import MPI
comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
print('Hello from', rank, "of", size)
Hello from 0 of 2
Hello from 1 of 2
Rank = 0
Rank = 1
mpirun -n 2 python3 demo.py
from mpi4py import MPI�import numpy as np
comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
if rank == 0:� data = np.array([24, 17], dtype='i')�else:� data = None
l_data = np.empty(1, dtype='i')
comm.Scatter(data, l_data, root=0)
print(rank, "of", size, ":", l_data)
from mpi4py import MPI�import numpy as np
comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
if rank == 0:� data = np.array([24, 17], dtype='i')�else:� data = None
l_data = np.empty(1, dtype='i')
comm.Scatter(data, l_data, root=0)
print(rank, "of", size, ":", l_data)
1 of 2 : [17]
0 of 2 : [24]
Rank = 0
Rank = 1
mpirun -n 2 python3 demo.py
from mpi4py import MPI�import numpy as np
comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
if rank == 0:� data = np.array([24, 17, 3, 30], dtype='i')�else:� data = None
l_data = np.empty(2, dtype='i')
comm.Scatter(data, l_data, root=0)
print(rank, "of", size, ":", l_data)
from mpi4py import MPI�import numpy as np
comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
if rank == 0:� data = np.array([24, 17, 3, 30], dtype='i')�else:� data = None
l_data = np.empty(2, dtype='i')
comm.Scatter(data, l_data, root=0)
print(rank, "of", size, ":", l_data)
1 of 2 : [ 3 30]
0 of 2 : [24 17]
Rank = 0
Rank = 1
MPI : Communicator
At the beginning, the processes are all in the same group (MPI.COMM_WORLD)
MPI: Point-to-point communications
Sending and receiving data (com.Send, com.Receive)
Blocking and non-blocking
and ready-send (sending is only done if the associated receiving has also been done)
Rarely used
MPI: Collective operations
Communication for all the group�https://www.irit.fr/~Georges.Da-Costa/par/mpi_collectives/
Example:
Remark:
from mpi4py import MPI�import numpy as np�comm = MPI.COMM_WORLD�size = comm.Get_size()�rank = comm.Get_rank()
if rank == 0:� data = np.array([24, 17], dtype='i')� result = np.empty(1, dtype='i')�Else:� data = None� result = None
l_data = np.empty(1, dtype='i')�comm.Scatter(data, l_data, root=0)
l_data = 3 * l_data
comm.Reduce(l_data, result, op=MPI.SUM, root=0)
if rank == 0:� print(result)
[123]
MPI: Actions on other processors
Capability to directly change memory on remote processes
MPI: Data types
Interoperable MPI type (can be used on all languages):
Python-specific type also exists (easier but slower, not used during the session)
MPI and Python
mpirun can start processes on several computers at the same time. From inside the code, being on one computer or multiple one is transparent
> mpirun -n 4 python3 hello.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print ("hello world from process ", rank)
hello.py
Starts 4 processes, each being a python interpreter
MPI and Python: Communications
2 types of communication primitives :
Example 1: Python objects
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
randNum = np.random.randint(100, size=1, dtype='i')
if rank == 1:
print ("Process", rank, "drew the number", randNum)
comm.Send(randNum, dest=0)
if rank == 0:
print ("Process", rank, "before receiving has the number", randNum)
comm.Recv(randNum, source=1)
print ("Process", rank, "received the number", randNum)
1stMPI.py
Blocking sending and receiving primitives
Send : the object to send, where ?
Receiving : from whom ? returns the received object
The treatment on each process is distinguished by its rank
Process 0 before receiving has the number [17]
Process 1 drew the number [27]
Process 0 received the number [27]
Example 1bis: Python objects (more complex)
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 1:
randNum = np.random.randint(100, size=10, dtype='i')
comm.Send(randNum, dest=0, tag=11)
if rank == 0:
results = np.empty(10, dtype='i')
comm.Recv(results, source=1, tag=11)
print ("Process", rank, "received", results)
1stBis.py
If source = MPI.ANY_SOURCE
—> receives from any source
tag: allows to identify the messages (optional)
Here data is an array. Can be any type of Python numpy data.
Process 0 received [47 64 68 77 37 73 30 65 10 54]
Asynchronous primitives
1stTer.py
Non-blocking send and receive primitives.
wait(): waits for the end of the sending/receiving
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 1:
randNum = np.random.randint(100, size=10, dtype='i')
req = comm.Isend(randNum, dest=0, tag=11)
# This process can continue working during the communication
req.wait()
if rank == 0:
results = np.empty(10, dtype='i')
req = comm.Irecv(results, source=1, tag=11)
# This process can continue working during the communication
req.wait()
print ("Process", rank, "received", results)
Type management
2stMPI.py
***************** VERSION 2 ****************
[1 2 3 4 5 6 7 8 9]
***************** VERSION 2 ****************
[1. 2. 3. 4. 5. 6. 7. 8. 9.]
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# passing MPI datatypes explicitly
if rank == 0:
data = numpy.arange(1000, dtype='i')
comm.Send([data, MPI.INT], dest=1, tag=77)
elif rank == 1:
data = numpy.empty(1000, dtype='i')
comm.Recv([data, MPI.INT], source=0, tag=77)
print(data[1:10])
print("***************** VERSION 2 ****************")
# automatic MPI datatype discovery
if rank == 0:
data = numpy.arange(1000, dtype=numpy.float64)
comm.Send(data, dest=1)
elif rank == 1:
data = numpy.empty(1000, dtype=numpy.float64)
comm.Recv(data, source=0)
print(data[1:10])
Trapezoid method (sequential)
import sys
a, b, n = float(sys.argv[1]), float(sys.argv[2]), int(sys.argv[3])
def f(x):
return x*x
def integrateRange(a, b, n):
integral = -(f(a) + f(b))/2.0
x, h = a, (b-a)/n
while (x<b):
integral = integral + f(x)
x = x + h
integral = integral * h
return integral
integral = integrateRange(a, b, n)
print ("With n =", n, "trapezoids, our estimate of the integral from", a, "to", b, "is", integral)
with h = (b-a) / N
trapezeSerial.py
$ python3 trapezeSerial.py 0.0 10.0 100000
With n = 100000 trapezoids, our estimate of the integral from 0.0 to 10.0 is 333.333333349691
Trapezoid method (parallel)
#h is the step size. n is the total number of trapezoids
h = (b-a)/n
#local_n is the number of trapezoids each process will calculate
#note that size must divide n
local_n = n//size
#we calculate the interval that each process handles
#local_a is the starting point and local_b is the endpoint
local_a = a + rank*local_n*h
local_b = local_a + local_n*h
#initializing variables. mpi4py requires that we pass numpy objects.
integral = numpy.zeros(1)
recv_buffer = numpy.zeros(1)
# perform local computation. Each process integrates its own interval
integral[0] = integrateRange(local_a, local_b, local_n)
# communication
# root node receives results from all processes and sums them
if rank == 0:
total = integral[0]
for i in range(1, size):
comm.Recv(recv_buffer, ANY_SOURCE)
total += recv_buffer[0]
else:
# all other process send their result
comm.Send(integral, dest=0)
# root process prints results
if comm.rank == 0:
print ("With n =", n, "trapezoids, our estimate of the integral from" , a, "to", b, "is", total)
import numpy
import sys
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
#takes in command-line arguments [a,b,n]
a = float(sys.argv[1])
b = float(sys.argv[2])
n = int(sys.argv[3])
#we arbitrarily define a function to integrate
def f(x):
return x*x
#this is the serial version of the trapezoidal rule
def integrateRange(a, b, n):
integral = -(f(a) + f(b))/2.0
# n+1 endpoints, but n trapazoids
for x in numpy.linspace(a,b,n+1):
integral = integral + f(x)
integral = integral* (b-a)/n
return integral
trapezeParallel-1-Numpy.py
$ mpirun -n 4 python3 trapezeParallel-1-numy.py 0.0 10.0 100000
With n = 100000 trapezoids, our estimate of the integral from 0.0 to 10.0 is 333.3302083498043
Trapezoid method (parallel V2)
h = (b-a)/n
local_n = n//size
#we calculate the interval that each process handles
local_a = a + rank*local_n*h
local_b = local_a + local_n*h
#initializing variables. mpi4py requires that we pass numpy objects.
integral = numpy.zeros(1)
total = numpy.zeros(1)
# perform local computation. Each process integrates its own interval
integral[0] = integrateRange(local_a, local_b, local_n)
# communication
# root node receives results with a collective "reduce"
comm.Reduce(integral, total, op=MPI.SUM, root=0)
# root process prints results
if comm.rank == 0:
print ("With n =", n, "trapezoids, our estimate of the integral from", a, "to", b, "is", total)
import numpy
import sys
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
#takes in command-line arguments [a,b,n]
a = float(sys.argv[1])
b = float(sys.argv[2])
n = int(sys.argv[3])
def f(x):
return x*x
def integrateRange(a, b, n):
integral = -(f(a) + f(b))/2.0
# n+1 endpoints, but n trapazoids
for x in numpy.linspace(a,b,n+1):
integral = integral + f(x)
integral = integral* (b-a)/n
return integral
trapezeParallel-2-Numpy.py
Collective operation: Reduce
$ mpirun -n 4 python3 reduce.py
Reduced on 1 : rank= 3 somme= 0
Reduced on 1 : rank= 0 somme= 0
Reduced on 1 : rank= 2 somme= 0
Reduced on 1 : rank= 1 somme= 6
Allreduce on 1 maximum= 3
Allreduce on 3 maximum= 3
Allreduce on 2 maximum= 3
Allreduce on 0 maximum= 3
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = numpy.zeros(1, dtype='i')
rank[0] = comm.Get_rank()
somme = numpy.zeros(1, dtype='i')
comm.Reduce(rank, somme, op=MPI.SUM, root=1)
print ("Reduced on 1 : rank=", rank[0], "somme=", somme[0])
maxi = numpy.zeros(1, dtype='i')
comm.Allreduce(rank, maxi, op=MPI.MAX)
print ("Allreduce on ", rank[0], "maximum=", maxi[0])
reduce.py
Possible operations: MPI.SUM, MPI.MAX, MPI.MIN, …
Collective operation: Broadcast
from mpi4py import MPI
comm = MPI.COMM_WORLD
import numpy as np
rank = comm.Get_rank()
if rank == 0:
A = np.arange(0, 10, dtype='f')
else:
A = np.zeros(10, dtype = 'f')
comm.Bcast(A, root=0)
print ("A on ", rank, "=", A)
bcast.py
$ mpirun -n 3 python3 bcast.py
A on 1 = [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
A on 0 = [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
A on 2 = [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
Only the processus 0 knows A
After the Broadcast all nodes know A
Collective operation: Scatter
from mpi4py import MPI
comm = MPI.COMM_WORLD
import numpy as np
rank = comm.Get_rank()
if rank == 0:
A = np.arange(0, 9, dtype='f')
else:
A = None
local_A = np.zeros(3, dtype = 'f')
comm.Scatter(A, local_A, root=0)
print ("local_A on ", rank, "=", local_A)
scatter.py
$ mpirun -n 3 python3 scatter.py
local_A on 2 = [6. 7. 8.]
local_A on 0 = [0. 1. 2.]
local_A on 1 = [3. 4. 5.]
At first, only process 0 know A
After Scatter, all processes know their element from A
�Important: The number of elements of A must be the a multiple of the number of processes
Collective operation: Gather
from mpi4py import MPI
comm = MPI.COMM_WORLD
import numpy as np
rank = comm.Get_rank()
size = comm.Get_size()
local_rank = np.array([rank], dtype='i')
A = np.empty(size, dtype = 'i')
print ("local_rank on ", rank, "=", local_rank)
comm.Gather(local_rank, A, root=0)
print ("A After Gather on ", rank, "=", A)
comm.Allgather(local_rank, A)
print ("A After AllGather on ", rank, "=", A)
gather.py
$ mpirun -n 3 python3 gather.py
local_rank on 1 = [1]
local_rank on 2 = [2]
local_rank on 0 = [0]
A After Gather on 1 = [1600851200 32511 17736512]
A After Gather on 2 = [-1852859136 32694 42946160]
A After Gather on 0 = [0 1 2]
A After AllGather on 2 = [0 1 2]
A After AllGather on 0 = [0 1 2]
A After AllGather on 1 = [0 1 2]
Measure performance
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
comm.Barrier()
start = MPI.Wtime()
A = numpy.array([[1.,2.,3.],[4.,5.,6.],[7.,8.,9.]])
local_a = numpy.zeros(3)
comm.Scatter(A,local_a)
print ("process", rank, "has", local_a)
comm.Barrier()
end = MPI.Wtime()
print ("Durée sur", rank, ": ", end - start)
timeNumpy.py
$ mpirun -n 3 python3 time.py
process 0 has [1.0, 2.0, 3.0]
Time on 0 : 0.00011200000000000099
process 1 has [4.0, 5.0, 6.0]
Time on 1 : 0.00012699999999998823
process 2 has [7.0, 8.0, 9.0]
Time on 2 : 0.00011099999999999999
Only needed to synchronize
the start of all processes and
increase precision of measure.