MergeSort Example using Python Multiprocessing

Sorting algorithms are everywhere. In filesystems, databases, in the sort methods of the Javascript & Ruby Array class or the Python list type.
There are many algoriths but I believe some of the most known methods of sorting are:

Merge sort divides the list (array) of elements in sub-lists and then merges the sorted sub-lists to finally produce the complete sorted sequence. Wikipedia offers great pseudo-code and I implemented this on my example. It uses recursion for the division in sub-lists and because of the divide-and-conquer logic it can be also parallelized.

Implementation

We start by writing the the main function that divides our list to two sub-lists and then proceeds with sorting each individual sub-list:

def merge_sort(a_list):
    length = len(a_list)
    if length <= 1:
        return a_list
    middle_index = length / 2
    left = a_list[0:middle_index]
    right = a_list[middle_index:]
    left = merge_sort(left)
    right = merge_sort(right)
    return merge(left, right)

Let us take it step-by-step:

if length <= 1:
return a_list

Return immediately if the list has only one element, which is a trivial case for sorting. Note that this is also the termination condition for the recursion!

middle_index = length / 2
left = a_list[0:middle_index]
right = a_list[middle_index:]

Find the middle of the list (middle_index) and divide the input list into two distinct sub-lists, left& ​right.


left = merge_sort(left)
right = merge_sort(right)

The recursive part: call merge_sort for each of the new sub-lists, left & right. Let’s say that we have an initial list of 10 elements that needs to be sorted; in the first call of merge_sort the list will be divided into 2 sub-lists of 5 elements. merge_sort will then be called for each of left & right sub-lists. To be exact, the left sub-list must be completely sorted before proceeding to the right. Clearly these two sorting operations are independent and thus are good candidates for parallelisation.

Based on this simple logical deduction, we will later see how Python multiprocessing can be utilized to take advantage of this algorithmic property.As a final step in our algorithm, the two sub-lists must be merged to a single list, where all elements are now in sorting order.

Merging the distinct sub-lists

And the merge function which will merge 2 sorted subists:

def merge(left, right):
    sorted_list = []
    # We create shallow copies so that we do not mutate
    # the original objects.
    left = left[:]
    right = right[:]
    # We do not have to actually inspect the length,
    # as empty lists truth value evaluates to False.
    # This is for algorithm demonstration purposes.
    while len(left) > 0 or len(right) > 0:
        if len(left) > 0 and len(right) > 0:
            if left[0]  0:
            sorted_list.append(left.pop(0))
        elif len(right) > 0:
            sorted_list.append(right.pop(0))
    return sorted_list

Taking it a bit step-by-step again:

while len(left) > 0 or len(right) > 0:

We need to exhaust both lists. Remember that the element count may be different by one element if the total length of the initial list is odd.

    if len(left) > 0 and len(right) > 0:

If both lists are not exhausted, then:

       if left[0]  0:
            sorted_list.append(left.pop(0))
        elif len(right) > 0:
            sorted_list.append(right.pop(0))

If either one of the two lists is exhausted then continue extracting elements from the non-empty list.

In second thought, this could probably be done in a single step, which will save some computational time:

        elif len(left) > 0:
            sorted_list.extend(left)
            break
        elif len(right) > 0:
            sorted_list.extend(right)
            break

Finally, the function returns the merged list which now contains all initial elements in ascending order.

Observations

For small lists this procedure is rather trivial for the core of a modern CPU. But what happens when a list has a significant size? We talked about parallelization. Without having to modify the code of the functions described previously, we could just divide our list in a number of sub-lists equal to the number of cores at our disposal and assign each to a different process.

Caveat: the final merging will be accomplished by a single core, which is the largest fraction of the algorithm’s time. Nevertheless, the process speeds up significantly for large lists. An additional note, the process of spawning new processes (fork system calls) will notably delay the overall procedure for small lists. Apparently, the amount of time required for creating the objects and handling the multiple processes (let’s call this MP) becomes a less significant factor for large lists, as the percentage of MP over the total sorting time (let’s call this S), diminishes with the increase in the denominator.

Putting it all together

I have written an annotated version of the parallel MergeSort version using Python multiprocessing:

from __future__ import print_function
import random
import sys
import time
from contextlib import contextmanager
from multiprocessing import Manager, Pool
class Timer(object):
"""
Record timing information.
"""
def __init__(self, *steps):
self._time_per_step = dict.fromkeys(steps)
def __getitem__(self, item):
return self.time_per_step[item]
@property
def time_per_step(self):
return {
step: elapsed_time
for step, elapsed_time in self._time_per_step.items()
if elapsed_time is not None and elapsed_time > 0
}
def start_for(self, step):
self._time_per_step[step] = -time.time()
def stop_for(self, step):
self._time_per_step[step] += time.time()
def merge_sort_multiple(results, array):
results.append(merge_sort(array))
def merge_multiple(results, array_part_left, array_part_right):
results.append(merge(array_part_left, array_part_right))
def merge_sort(array):
array_length = len(array)
if array_length <= 1:
return array
middle_index = int(array_length / 2)
left = array[0:middle_index]
right = array[middle_index:]
left = merge_sort(left)
right = merge_sort(right)
return merge(left, right)
def merge(left, right):
sorted_list = []
# We create shallow copies so that we do not mutate
# the original objects.
left = left[:]
right = right[:]
# We do not have to actually inspect the length,
# as empty lists truth value evaluates to False.
# This is for algorithm demonstration purposes.
while len(left) > 0 or len(right) > 0:
if len(left) > 0 and len(right) > 0:
if left[0] <= right[0]:
sorted_list.append(left.pop(0))
else:
sorted_list.append(right.pop(0))
elif len(left) > 0:
sorted_list.append(left.pop(0))
elif len(right) > 0:
sorted_list.append(right.pop(0))
return sorted_list
@contextmanager
def process_pool(size):
"""Create a process pool and block until
all processes have completed.
Note: see also concurrent.futures.ProcessPoolExecutor"""
pool = Pool(size)
yield pool
pool.close()
pool.join()
def parallel_merge_sort(array, process_count):
timer = Timer('sort', 'merge', 'total')
timer.start_for('total')
timer.start_for('sort')
# Divide the list in chunks
step = int(length / process_count)
# Instantiate a multiprocessing.Manager object to
# store the output of each process.
# See example here
# http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes
manager = Manager()
results = manager.list()
with process_pool(size=process_count) as pool:
for n in range(process_count):
# We create a new Process object and assign the
# merge_sort_multiple function to it,
# using as input a sublist
if n < process_count - 1:
chunk = array[n * step:(n + 1) * step]
else:
# Get the remaining elements in the list
chunk = array[n * step:]
pool.apply_async(merge_sort_multiple, (results, chunk))
timer.stop_for('sort')
print('Performing final merge.')
timer.start_for('merge')
# For a core count greater than 2, we can use multiprocessing
# again to merge sub-lists in parallel.
while len(results) > 1:
with process_pool(size=process_count) as pool:
pool.apply_async(
merge_multiple,
(results, results.pop(0), results.pop(0))
)
timer.stop_for('merge')
timer.stop_for('total')
final_sorted_list = results[0]
return timer, final_sorted_list
def get_command_line_parameters():
# Note: see argparse for better argument handling and interface.
if len(sys.argv) > 1:
# Check if the desired number of concurrent processes
# has been given as a command-line parameter.
total_processes = int(sys.argv[1])
if total_processes > 1:
# Restrict process count to even numbers
if total_processes % 2 != 0:
print('Process count should be an even number.')
sys.exit(1)
print('Using {} cores'.format(total_processes))
else:
total_processes = 1
return {'process_count': total_processes}
if __name__ == '__main__':
parameters = get_command_line_parameters()
process_count = parameters['process_count']
main_timer = Timer('single_core', 'list_generation')
main_timer.start_for('list_generation')
# Randomize the length of our list
length = random.randint(3 * 10**4, 3 * 10**5)
# Create an unsorted list with random numbers
randomized_array = [random.randint(0, n * 100) for n in range(length)]
main_timer.stop_for('list_generation')
print('List length: {}'.format(length))
print('Random list generated in %4.6f' %
main_timer['list_generation'])
# Start timing the single-core procedure
main_timer.start_for('single_core')
single = merge_sort(randomized_array)
main_timer.stop_for('single_core')
# Create a copy first due to mutation
randomized_array_sorted = randomized_array[:]
randomized_array_sorted.sort()
# Comparison with Python list sort method
# serves also as validation of our implementation.
print('Verification of sorting algorithm:',
randomized_array_sorted == single)
print('Single Core elapsed time: %4.6f sec' %
main_timer['single_core'])
print('Starting parallel sort.')
parallel_timer, parallel_sorted_list = \
parallel_merge_sort(randomized_array, process_count)
print('Final merge duration: %4.6f sec' % parallel_timer['merge'])
print('Sorted arrays equal:',
parallel_sorted_list == randomized_array_sorted)
print(
'%d-Core elapsed time: %4.6f sec' % (
process_count,
parallel_timer['total']
)
)
# We collect the list element count and the time taken
# for each of the procedures in a file
filename = 'mergesort-{}.txt'.format(process_count)
with open(filename, 'a') as result_log:
benchmark_data = (
length,
main_timer['single_core'],
parallel_timer['total'],
parallel_timer['total'] / main_timer['single_core'],
parallel_timer['merge']
)
result_log.write("%d %4.3f %4.3f %4.2f %4.3f\n" % benchmark_data)

Benchmarks

I have executed some test runs on a VPS, which uses a 4-core CPU, Intel Xeon L5520 @ 2.27GHz. However, available CPU time fluctuates due to other VMs consuming resources from time to time. All time measurements are in seconds. The two charts display the change in processing time (y-axis) as a function of the number of list elements (x-axis).

Abbreviations:

SP
Single Process Time
MP
Multiple Process Time
MPMT
Multi-process Final Merge Time

Using 2 cores

Elements SP MP MP/SP (%) MPMT MPMT/MP (%)
464,365 132.278 104.410 78.93 66.288 63.49
479,585 141.993 107.780 75.91 64.916 60.23
628,561 244.518 183.056 74.86 114.838 62.73
680,722 285.694 229.938 80.48 142.639 62.03
703,865 305.931 234.565 76.67 148.318 63.23
729,973 330.974 254.887 77.01 162.184 63.63
762,260 372.593 277.687 74.53 173.243 62.39
787,132 401.388 296.826 73.95 189.222 63.75
827,259 432.098 335.185 77.57 212.990 63.54
831,855 445.834 338.542 75.93 217.752 64.32

Benchmarks using 2 CPU cores

Using 4 cores

Elements SP MP MP/SP (%) MPMT MPMT/MP (%)
321,897 68.038 45.711 67.18 38.777 84.83
347,426 82.131 54.614 66.50 46.272 84.73
479,979 153.305 101.320 66.09 87.410 86.27
574,747 215.026 147.384 68.54 127.574 86.56
657,324 272.733 196.504 72.05 173.142 88.11
693,975 311.907 219.233 70.29 191.414 87.31
703,379 332.943 232.885 69.95 196.705 84.46
814,617 440.827 312.758 70.95 273.334 87.39
837,964 446.744 323.348 72.38 283.348 87.63
858,084 476.886 363.580 76.24 320.736 88.22

Benchmarks using 4 CPU cores

I would be more than happy to read any comments or answer any questions!

11 thoughts on “MergeSort Example using Python Multiprocessing

  1. Hi, the test code runs nicely on my lab’s Intel(R) Xeon(R) CPU E5-2643 0 @ 3.30GHz (4 physical cores, 4 virtual), CentOS
    machine. However I run into trouble when asking the code to run with more than 4 cores.

    Is this because the script is already assuming that I am specifying the number of physical cores (not the total number of available cores)?

    cheers,
    aaron

    Result with 4 cores:
    (Validation passes.)

    $ python merge_sort.py 4
    Using 4 cores
    List length : 238553
    Random list generated in 0.327092885971
    Verification of sorting algorithm True
    Single Core: 15.029496 sec
    Starting 4-core process
    Performing final merge…
    Final merge duration : 9.20412397385
    Sorted arrays equal : True

    Result with 8 cores:
    (Validation fails.)

    $ python merge_sort.py 8
    Using 8 cores
    List length : 108100
    Random list generated in 0.164412975311
    Verification of sorting algorithm True
    Single Core: 3.292262 sec
    Starting 8-core process
    Performing final merge…
    Final merge duration : 0.463366031647
    Sorted arrays equal : False
    8-Core ended: 0.688715 sec

    • Hello Aaron,

      sorry for the late reply. Just to let you know, I have done several updates in the source code.
      Let me know what the results are if you give this another try, thanks!

  2. Hello,
    I’m running this program using Python3.4 on a Windows 8.1 machine. All works fine until this line: a = merge(responses[0], responses[1]) #which merges the two sub lists.
    I get the following error message:

    IndexError: list index out of range

    Not sure why this is happening? Any suggestions?

  3. is it not at your line 74 your a is already sorted and hence use of merge sort reduces time…I treid usiing b = list(a) and used merge_sort first but nw parallel is showing more time than the serial one

  4. Good stuff 🙂

    but instead of :

    for proc in p:
    proc.start()
    proc.join()

    you should do:

    for proc in p:
    proc.start()
    for proc in p:
    proc.join()

    actually without this, only one core would run, because proc.join() blocks until the process is done.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.