3.2 Shared Queue

Author: Steven Bogaerts, DePauw University

To Cite:

Bogaerts, Steven. “Shared Queue”. PDC for Beginners, edited by CSinParallel. 2022. Available Online. https://doi.org/10.55682/ONJD7789

Introduction

In this section, we will see how to make a process and how to communicate between processes via a shared queue. Recall that a queue is a first-in-first-out (FIFO) data structure. The queue is shared in that each process has access to the same queue structure. The Python multiprocessing module uses the shared queue data structure to allow processes to pass messages to each other.

After giving an overview of the Python multiprocessing module, we will then practice computing the greatest common divisor (GCD) of many pairs of numbers through an unplugged activity, and finally develop code using the multiprocessing module to perform this task more quickly with parallelism.

Note

The multiprocessing library is not yet supported Runestone activeCode blocks. As such, you will not be able to run code interactively in this section. We recommend that you copy these code examples to your computer and run them locally to complete this section.

The Python multiprocessing module

The Python multiprocessing module comes with the standard Python distribution. It allows for parallelism through the creation and management of multiple processes in a Python program. To create a worker process, we must first instantiate the Process class. The process specifies a function to execute via the target formal parameter, and the arguments to the target function via the args formal parameter. Consider the “hello world”-style code below, which specifies a target function called sayHi which receives the single argument i:

 1from multiprocessing import Process
 2
 3def sayHi(id):
 4    print("Hello from {}!".format(id))
 5
 6def main():
 7    numProcesses = 20
 8    for i in range(numProcesses):
 9        p = Process(target=sayHi, args=(i, ))
10        p.start()
11
12if __name__ == '__main__':
13    main()

Note

  • In line 9, there are no parentheses after sayHi, because we are not actually calling the function there. Rather, we are passing the sayHi function as an argument to the Process constructor, taking advantage of the fact that functions are first-class in Python.

  • In line 9, the args formal parameter must be a tuple. A tuple in Python is an immutable list, and is created using parentheses. To make a tuple with just one value (i in this case), we must include the trailing comma. Otherwise, we would just have (i), which the interpreter would consider a parenthesized version of the expression i, rather than a tuple.

  • It is essential that we include lines 12 and 13 in this code. If we only had line 13 (without the if), then each worker process would also try to execute main, leading to workers creating workers, which would create more workers, and so on…

  • After creating Process object p, we simply call the start method on it to start the process. Each process, once started, runs the sayHi function with the provided id value passed in to the constructor via the args formal parameter.

Creating a shared queue

One way that processes can communicate is by putting and getting values into and out of a shared queue data structure. A shared queue can be created by calling the Queue constructor, which has no arguments. It can then be passed to each process via the Process constructor’s args formal parameter. The blocking put and get methods can then be used to put data on the queue and remove data from the queue, respectively.

For example, consider the code below. Here, two worker processes play the roles of two parrots, speaking (printing) sentences provided by the main process. The sentences are passed to the worker processes via a shared queue.

 1from multiprocessing import Process, Queue
 2import time
 3
 4def parrotSpeak(id, q):
 5    msg = q.get()
 6    while(msg != "DONE"):
 7        print("Parrot {} says: *squawk* *whistle* {} *squawk*".format(id, msg))
 8        time.sleep(0.1)
 9        msg = q.get()
10
11def main():
12    print("START")
13    numProcesses = 2
14    q = Queue()
15    messages = ["Time for a cracker!",
16                "I need a vacation.",
17                "This perch is comfy!",
18                "Time for dinner!",
19                "What'd you say?",
20                "Where did you go today?",
21                "Did you bring me any treats?",
22                "I wonder what the other birds are doing right now.",
23                "DONE",
24                "DONE"]
25
26    p1 = Process(target=parrotSpeak, args=(1, q))
27    p2 = Process(target=parrotSpeak, args=(2, q))
28    p1.start()
29    p2.start()
30
31    print("Main process will now sleep, to show that the child processes block on q.get()...")
32    time.sleep(3)
33    print("Main process is done sleeping. Ready to put messages on the queue!")
34    for i in range(len(messages)):
35        q.put(messages[i])
36
37if __name__ == '__main__':
38    main()

The main process creates the Queue object and passes it to each of two worker processes. When the workers start executing the parrotSpeak function, there is not yet anything on the queue, and so they both block on the q.get() call on line 5. After a pause on line 32, the main process begins putting data on the queue. The two workers race to get each message. The 0.1 second sleep in the parrotSpeak function makes it more likely that the race winner will vary even with such a small set of messages.

Since this is a shared queue, values are “getted” off the queue in the same order they are “putted” on the queue. Note, however, that due to a race condition, it is possible that the worker processes won’t actually complete the processing of a value in this same order.

Note also the two "DONE" strings at the end of the messages list. These two sentinels are used to tell each worker when to stop calling get to obtain more messages. There are other approaches that can be used instead, but this approach is simple and effective.

View the video below in full-screen to see this program in action.

The Greatest Common Divisor (GCD)

Recall that the GCD of two positive integers is the highest number that goes evenly into both numbers. For example, the GCD of 24 and 36 is 12, because 24/12 = 2 and 36/12 = 3, and there is no number larger than 12 that divides both numbers evenly. If you’re uncertain how to compute the GCD, one simple way for now is just to choose relatively small positive integers and see with a calculator if they divide both numbers.

A function that implements the GCD using Euclid’s algorithm is shown below. In short, Euclid’s algorithm is based on the observation that the GCD of two numbers does not change if the larger number is replaced by the difference of the two numbers. The modulo operation simply finds the difference between the larger and smaller numbers after a sequence of subtractions of the smaller number(i.e. division). Make sure you understand this code before continuing on. Pay close attention to Line 4: a, b = b%a, a. This line simultaneously assigns a the value of b%a (i.e., the remainder when b is divided by a), and assigns b the old value of a.

Run this code with different values of x and y to learn how this function works. What happens when you use numbers that are not divisible by each other? Use the “Show CodeLens” button to visualize the execution of the code in both instances.

Computing the GCD with a Shared Queue

Suppose the goal is to compute the GCD of many pairs of numbers. Using a shared queue allows many processes to participate in this task.

An Unplugged Activity

Let’s first practice computing the GCD with a shared queue through an unplugged activity. In this activity, a main process will generate numbers and then put them on a shared queue that workers can access for GCD calculations. To compare parallel and serial computation, the main process will also compute these GCDs itself.

This activity requires at least five people. For three of these five people:
  • One person will play the main process and will need 1 sheet of paper and one writing utensil.

  • One person will play the input queue and will need 1 writing utensil.

  • One person will play the output queue and will need 1 sheet of paper and one writing utensil.

At least two additional people are needed. Each will play a separate worker and need one sheet of paper, one writing utensil, and one calculator.

Each of the workers should sit as far apart as possible in the room. The main process and both queues should be in a central location in the room.

Main process procedure:
  1. Fill out one line at a time on a sheet of paper. Each line should contain two randomly chosen two-digit positive integers. Write 7 lines of numbers in this manner.

  2. Give this sheet of paper to the input queue.

  3. Tell the workers to begin their procedures.

  4. Compute all GCDs yourself as well, reading one pair of numbers at a time from the input queue’s paper. This serves as an example of serial calculation.

Worker procedure:
  1. Say “get” to the input queue and wait until there is a response from the input queue (ignore verbal responses to other workers)

  2. While the input queue has not said “done”:
    1. Write down the pair of numbers received

    2. Compute the greatest common divisor (GCD) of the two numbers.

    3. Say “put” to the output queue and wait until the output queue acknowledges you.

    4. Tell the output queue the pair of numbers and their GCD.

    5. Say “get” to the input queue and wait until there is a response from the input queue (ignore verbal responses to other workers)

    6. Loop back up to step 2

Input queue procedure:
  1. Receive a sheet of paper from the main process, with pairs of numbers. Keep the paper in a location where the main process can see it too, since it will need to access the paper for the serial calculations.

  2. Wait for a “get” message from a worker.
    1. When received, give the worker a pair of non-crossed-out numbers, and cross out the numbers given.

    2. If multiple requests come in simultaneously, handle them one at a time, with distinct pairs of numbers for each request.

    3. If there are no more numbers when a worker requests some, say “done” to that worker.

Output queue procedure:
  1. Start with a blank sheet of paper.

  2. Wait for messages:
    1. “put” (from a worker): Receive the worker’s three numbers and write them on the highest blank line on the paper.

    2. “get” (from the main process): Tell the main process the three values on the highest non-crossed-out line of the blank paper, and then cross out that line.
      1. If a get message comes when no values are on the paper, tell the main process to do nothing until you respond later with three numbers.

    3. If multiple requests come in simultaneously, handle them one at a time.

In the unplugged activity above, very little synchronization between the workers is needed. Only one worker can access a queue at a time, but otherwise each worker is free to work as quickly or slowly as needed. If each task (computing the GCD) is enough work to justify potentially waiting briefly for queue access, the parallel approach will show speedup over the sequential approach. This will most likely be the case in this activity: the parallel workers will complete all GCD calculations before the main process is able to do the same.

Note

The unplugged activity could be implemented directly in code. However, computing the GCD, even of very large numbers, is a fast operation. To see why this is a bit of a problem, imagine if the unplugged activity asked workers to add 1 to each of the provided numbers, instead of computing the GCD. The workers would be saying “get” and “put” much more frequently, and so the proportion of time spent waiting for queue access, instead of calculating, would grow. In other words, the overhead of process communication will outweigh any benefits of parallelism.

A similar situation would occur if we were to program the unplugged activity as is. GCD is easy for a computer to compute quickly. To make the overhead of the shared queue worthwhile, then, we need to increase the amount of work required per task. We can easily accomplish this by having each worker request not just a single pair of numbers, but rather a block of many pairs of numbers. Only when the worker has handled the entire block will it put all results on the output queue and get another block from the input queue. This increases the amount of work that each process performs, reducing the number of times workers would be saying “get” and “put”, thus also reducing communication overhead.

Translating it to code: Serial Version

Let’s now write a Shared Queue implementation of GCD. In the serial version, one process will “put” all the GCD pairs in the input queue, and then “get” a block of GCD pairs, find the GDC of each pair, and then place the result in the output queue.

Download the file: gcdWithBlanksSerial.py.

The code begins running in the main namespace at the bottom.

if __name__ == '__main__':
    main()

The main process then runs the gcdSequential function:

def gcdSequential():
    print("----------\ngcdSequential")
    inputQ = Queue()
    outputQ = Queue()

    buildInputs(inputQ)
    startTime = timeit.default_timer()
    computeGCDs(0, inputQ, outputQ)
    processOutputs(outputQ)
    elapsedTime = timeit.default_timer() - startTime
    print("Elapsed time (s):", elapsedTime)
    return elapsedTime

Specifically, gcdSequential executes the following steps:

  • The buildInputs function is called, which creates NUM_BLOCKS blocks of numbers, with PAIRS_PER_BLOCK pairs of numbers in each block. The numbers range from MIN_VALUE to MAX_VALUE.

  • computeGCDs is then called, which gets one block at a time from inputQ, then one pair at a time from the current block. It computes the GCD by calling the provided gcd function (an implementation of Euclid’s algorithm), and appends the results to a blockOutput variable. When the block is fully processed, the results for the block are put on the output, and a new block is obtained.

  • processOutputs is then called, which simply obtains one output at a time. If VERBOSE is set to True, then all results will be printed. Otherwise, nothing is really done to “process” the outputs beyond simply obtaining them, but this function simulates what would be done as the next step in a larger application.

Try it out: the buildInputs function

A copy of the buildInputs function from gcdWithBlanksSerial.py is shown below:

def buildInputs(inputQ):
    for i in range(_______________):  #a
        block = []
    for i in range(_______________): #b
        block.append([random.randint(MIN_VALUE, MAX_VALUE), random.randint(MIN_VALUE, MAX_VALUE)])
        inputQ.put(block)

    # use sentinels to avoid possible race condition (multiple checks for not empty, then one gets, other hangs on get)
    for i in range(_______________): #c
        inputQ.put("DONE")

A single variable is needed for each of the range() functions in the above code snippets. Can you figure out what they are? Read the description of the buildInputs function above if you are having trouble.

Try it out: the computeGCDs function

Now, let’s look at of the computeGCDs function.

def computeGCDs(id, inputQ, outputQ):
    block = _______________ #d
    while(block != _______________): #e
        blockOutput = []
        for i in range(_______________): #f
            pair = block[i]
            blockOutput.append([id, pair[0], pair[1], gcd(pair[0], pair[1])])
            outputQ.put(blockOutput)
        block = _______________ #d

Try it out: timing the serial version

If you haven’t done it yet, download this incomplete version of gcd: gcdWithBlanksSerial.py

Fill in the blanks in your local copy of gcdWithBlanksSerial.py. Once you are done, add a print() statement to the main() function to display the amount of time it takes to run the code:

def main():
    print("NUM_BLOCKS: {}\nPAIRS_PER_BLOCK: {}\nMIN_VALUE: {}\nMAX_VALUE: {}\nNUM_WORKERS: {}".format(NUM_BLOCKS, PAIRS_PER_BLOCK, MIN_VALUE, MAX_VALUE, NUM_WORKERS))
    seqTime = gcdSequential()
    print("Sequential Time is: {} seconds".format(seqTime)) #add this line

2.7-7: Run the code a few times. What kind of performance do you see?

Study the sequential code as best as you can before moving forward.

Translating it to code: Parallel Version

To create a parallel version of this program, first copy your completed gcdWithBlanksSerial.py file to a new file called gcdWithBlanks.py. Next, modify the main function so that a new function, gcdParallel() is called:

def main():
    print("NUM_BLOCKS: {}\nPAIRS_PER_BLOCK: {}\nMIN_VALUE: {}\nMAX_VALUE: {}\nNUM_WORKERS: {}".format(NUM_BLOCKS, PAIRS_PER_BLOCK, MIN_VALUE, MAX_VALUE, NUM_WORKERS))
    seqTime = gcdSequential()
    print("Sequential Time is: {} seconds".format(seqTime))
    parTime = gcdParallel() #add this line
    print("Parallel Time is: {} seconds".format(seqTime)) #add this line too

The great news is that the parallel version of this code uses exactly the same functions! The only difference is that the main process creates NUM_WORKERS child processes that will each do the work in parallel. Copy and paste the following fill-in-the-blank function to your gcdWithBlanks.py file:

def gcdParallel():
    print("----------\ngcdParallel")
    inputQ = _______________ #g
    outputQ = _______________ #g

    buildInputs(inputQ)

    startTime = timeit.default_timer()
    for i in range(1, NUM_WORKERS+1):
        #h: create and start a process here

    ________________________________ #i
    elapsedTime = timeit.default_timer() - startTime
    print("Elapsed time (s):", elapsedTime)
    return elapsedTime

The code contains many blanks. Let’s fill in the blanks together to complete the implementation.

Initializing new queues

Completing the for loop:

The for loop in gcdParallel() is where all the parallelization happens. Essentially, we want each worker to compute the gcd, using the created input and output queues. Let’s build this code together:

for i in range(1, NUM_WORKERS+1):
     #h: create a start a process here

Finishing up the function

Putting it all together

Using the solutions to the previous exercises, complete the gcdParallel() function.

Timing the parallel version

2.7-14: Run the code a few times, modifying the WORKERS variable. What kind of times do you see?

2.7-15: Add code to the main function to compute the speedup of the parallel version over the serial version. Re-run your code, modifying the WORKERS variable each time. What kind of speedup do you see?

Complete version

A fully working program with both sequential and parallel versions timed can be downloaded from this link: gcd.py.

You have attempted of activities on this page