Multiprocessing by Message Passing MPI
Example 1.1 Integration with MPI Blocking Send/Receive
Numerical integration is chosen as the instructional example for its trivially simplistic algorithm
to parallelize; the task is narrowly confined yet computationally relevant.
For this example, the integrand is the cosine function and the range of
integration, from a to b, has length b-a. The work
share is evenly divided by the number of processors, p so that each
processor is responsible for integration of a partial range, (b-a)/p.
Upon completion of the local integration on each processor, processor
0 is designated to collect the local
integral sums of all processors to form the total sum.
First, MPI_Init is invoked to initiate MPI and synchronize all
participating processes. The MPI parallel paradigm used throughout this
tutorial is SPMD (Single Program Multiple Data). Once MPI is initiated,
the identical program is executed on multiple processors. The number of
processors is determined by the user at runtime with
my-machine% mpirun -np 4 a.out
The mpirun command runs the MPI executable a.out on 4 processors.
This is the most frequently used command to launch an MPI
job. However, it is system dependend. Consult your local system for the
proper way to run your MPI job.
The number of processors, 4 in this example, is passed to your executable
as the variable p through a call
to MPI_Comm_size. To distinguish the identity of one processor from
another, MPI_Comm_rank is called to querry for myid,
the current rank number. Essentially, myid facilitates the handling of
different data on different processors. The MPI_Send and MPI_Recv pair of
blocking, also referred to as "standard",
send and receive subroutines are used to pass the local integral sum from
individual processors to processor 0 to calculate the total sum. The MPI
standard requires that a
blocking send call blocks (and hence NOT return to the call) until the
send buffer is safe to be reused. Similarly, the Standard requires that
a blocking receive call blocks until the receive buffer actually contains
the intended message.
At the end, a call to MPI_Finalize permits an orderly shutdown of MPI
before exiting the program.
To see the explanations on an MPI library routine used (as well as its
arguments), click or place the mouse over .
For detail explanations of the MPI subroutines and functions, please read
MPI: The Complete Reference.
Example 1.1 Fortran Code
Program Example1_1
c##################################################################################
c# #
c# This is an MPI example on parallel integration to demonstrate the use of: #
c# #
c# * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize #
c# * MPI_Recv, MPI_Send #
c# #
c# Dr. Kadin Tseng #
c# Scientific Computing and Visualization #
c# Boston University #
c# 1998 #
c# #
c##################################################################################
implicit none
integer n, p, i, j, proc, ierr, master, myid, tag, comm
real h, a, b, integral, pi, ai, my_int, integral_sum
include "mpif.h" ! brings in pre-defined MPI constants, ...
integer status(MPI_STATUS_SIZE) ! size defined in mpif.h
data master/0/ ! processor 0 collects integral sums from other processors
comm = MPI_COMM_WORLD
call MPI_Init(ierr) ! starts MPI
call MPI_Comm_rank(comm, myid, ierr) ! get current proc ID
call MPI_Comm_size(comm, p, ierr) ! get number of procs
pi = acos(-1.0) ! = 3.14159...
a = 0.0 ! lower limit of integration
b = pi/2. ! upper limit of integration
n = 500 ! number of increment within each process
tag = 123 ! set the tag to identify this particular job
h = (b-a)/n/p ! length of increment
ai = a + myid*n*h ! lower limit of integration for partition myid
my_int = integral(ai, h, n)
write(*,"('Process ',i2,' has the partial sum of',f10.6)")
& myid,my_int
call MPI_Send(
& my_int, 1, MPI_REAL, ! buffer, size, datatype
& master, ! where to send message
& tag, ! message tag
& comm, ierr)
if(myid .eq. master) then ! do following only on master ...
integral_sum = 0.0 ! initialize integral_sum to zero
do proc=0,p-1 ! loop on processors to collect local sum
call MPI_Recv(
& my_int, 1, MPI_REAL,
& proc, ! message source
& tag, ! message tag
& comm, status, ierr) ! status reports source, tag
integral_sum = integral_sum + my_int ! sum my_int from processors
enddo
print *,'The integral =',integral_sum
endif
call MPI_Finalize(ierr) ! MPI finish up ...
end
real function integral(ai, h, n)
implicit none
integer n, j
real h, ai, aij
integral = 0.0 ! initialize integral
do j=0,n-1 ! sum integrals
aij = ai +(j+0.5)*h ! abscissa mid-point
integral = integral + cos(aij)*h
enddo
return
end
Example 1.1 C code
#include <mpi.h>
#include <math.h>
#include <stdio.h>
/* Prototype */
float integral(float ai, float h, int n);
void main(int argc, char* argv[])
{
/*###############################################################################
# #
# This is an MPI example on parallel integration to demonstrate the use of: #
# #
# * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize #
# * MPI_Recv, MPI_Send #
# #
# Dr. Kadin Tseng #
# Scientific Computing and Visualization #
# Boston University #
# 1998 #
# #
###############################################################################*/
int n, p, myid, tag, proc, ierr;
float h, integral_sum, a, b, ai, pi, my_int;
int master = 0; /* processor performing total sum */
MPI_Comm comm;
MPI_Status status;
comm = MPI_COMM_WORLD;
ierr = MPI_Init(&argc,&argv); /* starts MPI */
MPI_Comm_rank(comm, &myid); /* get current process id */
MPI_Comm_size(comm, &p); /* get number of processes */
pi = acos(-1.0); /* = 3.14159... */
a = 0.; /* lower limit of integration */
b = pi*1./2.; /* upper limit of integration */
n = 500; /* number of increment within each process */
tag = 123; /* set the tag to identify this particular job */
h = (b-a)/n/p; /* length of increment */
ai = a + myid*n*h; /* lower limit of integration for partition myid */
my_int = integral(ai, h, n); /* 0<=myid<=p-1 */
printf("Process %d has the partial integral of %f\n", myid,my_int);
MPI_Send(
&my_int, 1, MPI_FLOAT,
master, /* message destination */
tag, /* message tag */
comm);
if(myid == master) { /* Receives serialized */
integral_sum = 0.0;
for (proc=0;proc<p;proc++) {
MPI_Recv(
&my_int, 1, MPI_FLOAT,
proc, /* message source */
tag, /* message tag */
comm, &status); /* status reports source, tag */
integral_sum += my_int;
}
printf("The Integral =%f\n",integral_sum); /* sum of my_int */
}
MPI_Finalize(); /* let MPI finish up ... */
}
float integral(float ai, float h, int n)
{
int j;
float aij, integ;
integ = 0.0; /* initialize */
for (j=0;j<n;j++) { /* sum integrals */
aij = ai + (j+0.5)*h; /* mid-point */
integ += cos(aij)*h;
}
return integ;
}
Discussion
A number of points can be drawn from the above code:
- The assignment of master is arbitrary, the selection
of "0" is, however, a strategic one because this is the
only number that would work for p = 1 (single processor job).
- MPI_Send is used to send the message my_int from all
processors to the master.
- This send is performed concurrently on all processors.
- For each point-to-point send, a matching receive on the master is expected.
- The matching receive requires the master to call the
point-to-point receive MPI_Recv p times, each to receive
the my_int sent by a processor.
- This effectively serializes an
otherwise parallel procedure and is computationally less efficient.
- Usually, there is no problems for processors to send messages to the
master and for the master to receive messages from other
processors. In this example, however, there exists a problematic
situation in which MPI_Send and MPI_Recv -- keep in
mind that both are
blocking -- try to send and receive on processor 0 at
approximately the same time. Both send and receive processes got
started but neither process could complete its task due to the other
trying to get into the action and hence a deadlock situation arose -- at least theorectically.
This is because on many of the MPI implementations,
a system memory buffer is provided even though this is a blocking
operation which is not required by the MPI standard to provide buffer.
As a result, deadlock may not occur. Hence, this situation is considered
"unsafe".
- Some MPI implementations allow the user to control the memory buffer.
On the IBM pSeries, the memory buffer can be defined via the
environment variable MP_EAGER_LIMIT. Setting it to 0 provides
no buffer which enforces the strict definition of point-to-point
blocking communication. The present example on the IBM pSeries
deadlocks with setenv MP_EAGER_LIMIT 0. Incidentally, this
is a good trick to use to see if your code is "safe".
- tag is one of the parameters used to help define a
message uniquely. In this application, myid is sufficient
to identify a message. tag is not needed as an additional
parameter to define the message uniquely.
If multiple messages are sent and received by the
same processor, the receiver might need the tag to distinguish
among the messages to determine what to do with the message.
- In both the send and receive subroutine/function, there is a constant
MPI_COMM_WORLD, defined in mpif.h (for fortran) or mpi.h (for C).
This is called a communicator. MPI_COMM_WORLD means no processors
allocated for this job is excluded for the task (i.e., send or
receive) at hand. In some situations, may be only a restricted group of
processors (e.g., odd or even numbers) are considered in a
message passing operation. In that case, one would create a taylored
communicator to take the place of MPI_COMM_WORLD.
In all the numerical integration examples, we use only
use the MPI_COMM_WORLD communicator.
The shortcomings stated above will be remedied in the next example, Example 1.2.
Example 1  |
Example 1.1 |
Example 1.2 |
Example 1.3 |
Example 1.4 |
Example 1.5
|
|