8.1. Dynamic Load Balancing¶
In this example we combine the conductor-worker pattern with message passing. The conductor has many tasks that need to be completed. The conductor starts by sending some data needed to complete a task to each worker process. Then the conductor loops and waits to hear back from each worker by receiving a message from any of them. When the conductor receives a message from a worker, it sends that worker more data for its next task, unless there are no more tasks to complete, in which case it sends a special message to the worker to stop running.
In this simple example, each worker is sent the number of seconds it should ‘sleep’, which can vary from 1 to 8. This illustrates varying sizes of workloads. Because of the code’s simplicity, the number of tasks each worker does doesn’t vary by much. In some real examples, the time for one task my be quite different than the time for another, which could have a different outcome, in which some workers were able to complete more tasks as others were doing long ones.
This approach can sometimes be an improvement on the assignment of an equal number of tasks to all processes.
Program file: 17dynamicLoadBalance.py
Example usage:
python run.py ./17dynamicLoadBalance.py N
Here the N signifies the number of processes to start up in mpi.
run.py executes this program within mpirun using the number of processes given.
Exercises:
Run, using N = 4 processes
Study the execution carefully. Note that with 4 processes, 3 are workers. The total number of tasks is \(3 \times 4\), or 12. Which process does the most work? You can count by looking for the lines that end with “… from X”, where X is a worker process id.
Try with N = 8 (7 workers).
8.1.1. Explore the code¶
Note in this case how the conductor, whose id is 0, handles the assignment of tasks, while the workers simply do what they are sent until they are told to stop.
from mpi4py import MPI
import numpy as np
import time
def genTasks(numTasks):
np.random.seed(1000) # run the same set of timed tasks
return np.random.randint(low=1, high=9, size=numTasks)
# tags that can be applied to messages
WORKTAG = 1
DIETAG = 2
def main():
comm = MPI.COMM_WORLD
id = comm.Get_rank() #number of the process running the code
numProcesses = comm.Get_size() #total number of processes running
myHostName = MPI.Get_processor_name() #machine name running the code
if (id == 0) :
# create an arbitrary array of numbers for how long each
# worker task will 'work', by sleeping that amount of seconds
numTasks = (numProcesses-1)*4 # avg 4 tasks per worker process
workTimes = genTasks(numTasks)
print("conductor created {} values for sleep times:".format(workTimes.size), flush=True)
print(workTimes, flush=True)
handOutWork(workTimes, comm, numProcesses)
else:
worker(comm)
def handOutWork(workTimes, comm, numProcesses):
totalWork = workTimes.size
workcount = 0
recvcount = 0
print("conductor sending first tasks", flush=True)
# send out the first tasks to all workers
for id in range(1, numProcesses):
if workcount < totalWork:
work=workTimes[workcount]
comm.send(work, dest=id, tag=WORKTAG)
workcount += 1
print("conductor sent {} to {}".format(work, id), flush=True)
# while there is still work,
# receive result from a worker, which also
# signals they would like some new work
while (workcount < totalWork) :
# receive next finished result
stat = MPI.Status()
workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
recvcount += 1
workerId = stat.Get_source()
print("conductor received {} from {}".format(workTime, workerId), flush=True)
#send next work
comm.send(workTimes[workcount], dest=workerId, tag=WORKTAG)
workcount += 1
print("conductor sent {} to {}".format(work, workerId), flush=True)
# Receive results for outstanding work requests.
while (recvcount < totalWork):
stat = MPI.Status()
workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
recvcount += 1
workerId = stat.Get_source()
print("end: conductor received {} from {}".format(workTime, workerId), flush=True)
# Tell all workers to stop
for id in range(1, numProcesses):
comm.send(-1, dest=id, tag=DIETAG)
def worker(comm):
# keep receiving messages and do work, unless tagged to 'die'
while(True):
stat = MPI.Status()
waitTime = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat)
print("worker {} got {}".format(comm.Get_rank(), waitTime), flush=True)
if (stat.Get_tag() == DIETAG):
print("worker {} dying".format(comm.Get_rank()), flush=True)
return
# simulate work by sleeping
time.sleep(waitTime)
# indicate done with work by sending to Conductor
comm.send(waitTime, dest=0)
########## Run the main function
main()