2.7 Collective Communication¶
There are many cases when a conductor process obtains or creates data that needs to be sent or received from all the other processes. Indeed, we have already seen examples of this in previous sections. In this section, we will discuss some special communication constructs specifically designed to make this collective communication among processes easier.
Broadcast¶
A broadcast sends data from one process to all other processes. In this example, we use a library to parse the command line arguments (this is used later in section 2.9 when we introduce a full application). The arguments are placed in a list to be broadcast, i.e. sent to all workers.
- True.
- Yes, every process must execute the comm.bcast() function.
- False.
- Note how the coom.bcast() function is called above.
2.7-1: All processes, conductor and workers (sender and receivers) execute the comm.bcast() function.
Exercises
Run, using -np from 1 through 8 processes.
Trace the code with the output that you see.
- True.
- Yes, this feature of message passing systems is shown here also.
- False.
- Run several times. Is the output the same?
2.7-2: The order of when each process gets its data and reports it is not guaranteed.
Scatter and Gather¶
There are often cases when each process can work on some portion of a larger data structure. This can be carried out by having the conductor process maintain the larger structure and send parts to each of the worker processes, keeping part of the structure on the conductor. Each process then works on their portion of the data, and then the conductor can get the completed portions back.
This is so common in message passing parallel processing that there are two special collective communication functions called Scatter()
and Gather()
that handle this.
In mpi4py, the uppercase Scatter method works on arrays generated by numpy routines. The following examples use numpy arrays.
Here the conductor will use the function called genArray
to create a single array, called data, using a numpy function called linspace
(introduced in section 2.4). The code is designed to then scatter 3 elements of it into a smaller array on each process (both conductor and workers).
The mpi4py Scatter function, with a capital S, can be used to send portions of a larger array on the conductor to the workers. In the following figure, the left side depicts the data array on the conductor, with portions of it colored to show which sections will go to each of 4 processes (conductor and workers) during the Scatter that is about to take place. The result of the Scatter is shown on the right, where each process has a portion of the original data in an array called smallerPart that they can then work on:
Before Scatter |
After Scatter |
The reverse of this process can be done using the Gather function.
In this example, a 1-D array is created by the conductor, then scattered, using Scatter (capital S). After each smaller array used by each process is changed and placed in another array called newValues, the Gather (capital G) function brings the full array with the changes back into the conductor.
Note
In the code below, note these important points:
All processes have a variable called data, but only the conductor creates an actual data array; the workers set data to be ‘None’.
Scatter and Gather require arrays that can be split evenly between the processes. We insure this in the code below by setting the smallerPart array size to 3 as an illustration. In the next section you will see how we normally ensure this by checking the size of the data array to be scattered and the number of processes chosen.
All processes must call the Scatter and Gather functions, similar to what we did for broadcast above.
Exercises:
Run, using -np from 2 through 8 processes.
If you want to study the numpy part of the code, look up the numpy method
linspace()
used ingenArray()
.
Applying Gather to PopulateArray¶
Let’s use the Gather()
function to simplify the code in the PopulateArray program. The revised code is shown below:
If you compare this code to when we first introduced this problem in section 2.4, we have been able to use comm.Gather()
in place of the send-receive-manual merge technique used there. Let’s compare them below. The first code block contains the portion of that first version, which in essence does the gather manually, and the second code block below is the code that replaces it using Gather.
if (myId != 0):
comm.send(local, dest=0)
else:
#initialize global array
global_array = np.empty(N)
for i in range(len(local)):
global_array[i] = local[i]
pos = len(local)
#merge into one array
for i in range(1, numProcesses):
loc = comm.recv(source=i)
for j in range(len(loc)):
global_array[pos+j] = loc[j]
pos += len(loc)
if myId == 0:
global_array = np.empty(N)
else:
global_array = None
#gather the local results into global_array
comm.Gather(local, global_array, root=0)
Exercise:
Modify the above program to also sum the values of the resulting array by doing it locally and using a reduce to get the overall sum back to the conductor. (We often do this when working with large amounts of data because reduce is faster than a squential sum on the conductor.) Comment out line 42 in the above code. Each process should compute a local sum of the local array it produces. In addition to the
Gather()
function, perform a reduce to get a total overall sum into the variable called total.
Warning
If you have errors in your code, you may simply get output that looks like this:
===== NO STANDARD OUTPUT =====
===== NO STANDARD ERROR =====
You will need to try a fix. It is best to work by adding single lines of code and debug prints and try running to be sure you haven’t introduced an error.