7.1. Collective Communication: scatter and gather lists¶
When several processes need to work on portions of a data structure, such as a list of lists or a 1-d or 2-d array, at various points in a program, a way to do this is to have one node, usually the conductor, divide the data structure and send portions to each of the other processes, often keeping one portion for itself. Each process then works on that portion of the data, and then the conductor can get the completed portions back. This type of coordination is so common that MPI has special patterns for it called scatter and gather.
7.1.1. Scatter Lists¶
The following diagrams illustrate how scatter using python list structures works. The conductor contains a list of lists and all processes participate in the scatter:
After the scatter is completed, each process has one of the smaller lists to work on, like this:
In this example, some small lists are created in a list whose length is as long as the number of processes.
Program file: 14scatter.py
Example usage:
python run.py ./14scatter.py N
Here the N signifies the number of processes to start up in mpi.
run.py executes this program within mpirun using the number of processes given.
Exercises:
Run, using N = from 2 through 8 processes.
If you want to study the code, explain to yourself what genListofLists does in the code below.
7.1.1.1. Explore the code¶
Note
In the code below, note how all processes must call the scatter function.
from mpi4py import MPI
# Create a list of lists to be scattered.
def genListOfLists(numElements):
data = [[0]*3 for i in range(numElements)]
for i in range(numElements):
#make small lists of 3 distinct elements
smallerList = []
for j in range(1,4):
smallerList = smallerList + [(i+1)*j]
# place the small list in the larger list
data[i] = smallerList
return data
def main():
comm = MPI.COMM_WORLD
id = comm.Get_rank() #number of the process running the code
numProcesses = comm.Get_size() #total number of processes running
myHostName = MPI.Get_processor_name() #machine name running the code
# in mpi4py, the lowercase scatter method only works on lists whose size
# is the total number of processes.
numElements = numProcesses #total elements in list created by conductor process
# however, the list can contain lists, like this list of 3-element lists,
# for example this list of four 3-element lists:
# [[1, 2, 3], [2, 4, 6], [3, 6, 9], [4, 8, 12]]
if id == 0:
data = genListOfLists(numElements)
print("Conductor {} of {} on {} has created list: {}"\
.format(id, numProcesses, myHostName, data))
else:
data = None
print("Worker Process {} of {} on {} starts with {}"\
.format(id, numProcesses, myHostName, data))
#scatter one small list in the large list on node 0 to each of the processes
result = comm.scatter(data, root=0)
print("Process {} of {} on {} has result after scatter {}"\
.format(id, numProcesses, myHostName, result))
if id == 0:
print("Conductor {} of {} on {} has original list after scatter: {}"\
.format(id, numProcesses, myHostName, data))
########## Run the main function
main()
7.1.2. Gather Lists¶
Once several processes have their own lists of data, those lists can also be gathered back together into a list of lists, usually in the conductor process. All processes participate in a gather, like this:
The gather creates a list of lists in the conductor, like this:
In this example, each process creates some very small lists. Then a gather is used to create a list of lists on the conductor process.
Program file: 15gather.py
Example usage:
python run.py ./15gather.py N
Here the N signifies the number of processes to start up in mpi.
run.py executes this program within mpirun using the number of processes given.
Exercises:
Run, using N = from 2 through 8 processes.
Try with different values of SMALL_LIST_SIZE, perhaps changing printing of result for readability
7.1.2.1. Explore the code¶
Note
In the code below, note how all processes must call the gather function.
from mpi4py import MPI
SMALL_LIST_SIZE = 3
# create a small list whose values contain id times multiples of 10
def genSmallList(id):
smallerList = []
for j in range(1, SMALL_LIST_SIZE+1):
smallerList = smallerList + [(id * 10)*j]
return smallerList
def main():
comm = MPI.COMM_WORLD
id = comm.Get_rank() #number of the process running the code
numProcesses = comm.Get_size() #total number of processes running
myHostName = MPI.Get_processor_name() #machine name running the code
#all processes create small lists
sendData = genSmallList(id)
print("Process {} of {} on {} starts with {}"\
.format(id, numProcesses, myHostName, sendData))
# gather the small lists at the conductor node:
# final result is a list whose length == the number of processes
result = comm.gather(sendData, root=0)
# only the conductor node has all of the small lists
if id == 0:
print("Process {} of {} on {} has result after gather {}"\
.format(id, numProcesses, myHostName, result))
########## Run the main function
main()