MergeSort Example using Python Multiprocessing

Sorting algorithms are everywhere. In filesystems, databases, in the sort methods of the Javascript & Ruby Array object or the Python list object.
There are many algoriths but I believe some of the most known methods of sorting are:

Merge sort divides the list (array) of elements in sublists of a single element and then merges the sublists to finally produce the sorted list. Wikipedia offers great pseudocode and I implemented this on my example. It uses recursion for the division in sublists and because of the divide-and-conquer logic it can be parallelized.

merge_sort function

We start by writing the divider function:

def merge_sort(a):
  length_a = len(a)
  if length_a <= 1: return a
  m = int(math.floor(length_a / 2))
  a_left = a[0:m]
  a_right = a[m:]
  a_left = merge_sort(a_left)
  a_right = merge_sort(a_right)
  return merge(a_left, a_right)

Let us take it a little step-by-step:

  if length_a <= 1: return a
Return immediately if the list has only one item, thus sorted.
 m = int(math.floor(length_a / 2))
a_left = a[0:m]
a_right = a[m:]
Find the middle of the list (m) and divide the input list a into 2 sublists, a_left & a_right.
  a_left = merge_sort(a_left)
  a_right = merge_sort(a_right)
The recursive part: call merge_sort for the new sublists, a_left & a_right. The new unsorted sublists are perfect input for our function. Let’s say that we have an initial list of 10 elements that needs to be sorted; in the first call of merge_sort the list will be divided in 2 sublists of 5 elements and it will be called again for each of the 2 sublists. Actually, the program will get “stuck” until the merge_sort(a_left) returns and then proceed with merge_sort(a_right). Wait a minute… these 2 processes could be done separately or in parallel ! Based on this simple logical assumption, we’ll see how Python multiprocessing can be harnessed to take advantage of this ability for parallelization.

I intentionally left out the merge function in the previous explanation. Finally in merge_sort the 2 sorted sublists are merged into a single sorted list.

merge function

And the merge function which will merge 2 sorted sublists:

def merge(left, right):
  a = [] #create an empty list
  while len(left) > 0 or len(right) > 0:
    if len(left) > 0 and len(right) > 0:
      if left[0] <= right[0]:
        a.append(left.pop(0))
      else:
        a.append(right.pop(0))
    elif len(left) > 0:
      a.append(left.pop(0))
    elif len(right) > 0:
      a.append(right.pop(0))
  return a

Taking it a bit step-by-step again:

while len(left) > 0 or len(right) > 0:
We need to exhaust both lists. Remember that the element count may be different by 1 element.
  if len(left) > 0 and len(right) > 0: 
If both lists are not exhausted.
if left[0] <= right[0]:
  a.append(left.pop(0))
else:
  a.append(right.pop(0))
Compare the first element of each list, if the element of the “left” sublist is lower then remove it from the “left” sublist and add it to our initially empty list, a. Otherwise take the first element from the right sublist. Remember both sublists are sorted.
elif len(left) > 0:
  a.append(left.pop(0))
elif len(right) > 0:
  a.append(right.pop(0))
If either one of the 2 lists is exhausted then pick up elements from the non-empty list. In second thought, this could probably be done in one step, which may save us some computational time:

elif len(left) > 0:
  a.extend(left)
  break
elif len(right) > 0:
  a.extend(right)
  break

Finally, the function returns the merged list.

Observations

For small lists this procedure is rather trivial for the core of a modern CPU. But what happens when a list has a significant size? We talked about parallelization. Without having to modify the code of the functions described previously, we could just divide our list in a number of sublists equal to the number of cores that we are able to occupy and assign to each a different process.

Caveat: the final merging will be accomplished by a single core, which is the largest fraction of the multi-core calculation time. Nevertheless, the process speeds up significantly for large lists, but the whole process of spawning new processes will notably delay the overall procedure for small lists. Apparently, the amount of time required for creating the objects and handling the multiple processes (let’s call this MP) “disappears” for large lists, as the percentage of MP over the total sorting time (let’s call this S), diminishes with the increase in the denominator.

Putting it all together

I have written an annotated version of the code here (click to expand):

#!/usr/bin/python
import os, sys, time
import math, random
from multiprocessing import Process, Manager

# a wrapper function which appends the result of "merge_sort" to the "responses" list
def merge_sort_multi( list_part ):
  responses.append( merge_sort( list_part ) )

# a wrapper function which appends the result of "merge" to the "responses" list
def merge_multi( list_part_left, list_part_right ):
  responses.append( merge(list_part_left, list_part_right ) )

# explained earlier
def merge_sort(a):
  length_a = len(a)
  if length_a <= 1: return a
  m = int(math.floor(length_a / 2))
  a_left = a[0:m]
  a_right = a[m:]
  a_left = merge_sort(a_left)
  a_right = merge_sort(a_right)
  return merge(a_left, a_right)

# ... also explained earlier
def merge(left, right):
  a = []
  while len(left) > 0 or len(right) > 0:
    if len(left) > 0 and len(right) > 0:
      if left[0] <= right[0]:
        a.append(left.pop(0))
      else:
        a.append(right.pop(0))
    elif len(left) > 0:
      a.extend(left)
      break
    elif len(right) > 0:
      a.extend(right)
      break
  return a

if __name__ == '__main__':
  try:
    cores = int(sys.argv[1]) #get the number of cores
    if cores > 1:
      if cores % 2 != 0:  # restrict core count to even numbers
        cores -= 1
    print 'Using %d cores'%cores
  except:
    cores = 1
  
  '''instantiate a multiprocessing.Manager object to store the output of each process, 
  see example here http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes '''
  manager = Manager() 
  responses = manager.list()
  
  # randomize the length of our list
  l = random.randint(3*10**4, 3*10**5)
  print 'List length : ', l

  # create an unsorted list with random numbers 
  start_time = time.time()
  a = [ random.randint(0, n*100) for n in range(0, l) ]
  print 'Random list generated in ', time.time() - start_time
  # start timing the single-core procedure 
  start_time = time.time()
  single = merge_sort(a)
  single_core_time = time.time() - start_time
  a_sorted = a[:]
  a_sorted.sort()
  ''' comparison with Python list's "sort" method, validation of the algorithm 
  (it has to work right, doesn't it??) '''
  print 'Verification of sorting algorithm', a_sorted == single
  print 'Single Core: %4.6f sec'%( single_core_time )
  if cores > 1:
    ''' we collect the list element count and the time taken 
    for each of the procedures in a file '''
    f = open('mergesort-'+str(cores)+'.dat', 'a')
    print 'Starting %d-core process'%cores
    start_time = time.time()
    # divide the list in "cores" parts
    step = int( math.floor( l / cores ) )
    offset = 0
    p = []
    for n in range(0, cores):
      ''' we create a new Process object and assign the "merge_sort_multi" function to it,
      using as input a sublist '''
      if n < cores - 1:
        proc = Process( target=merge_sort_multi, args=( a[n*step:(n+1)*step], ) )
      else:
        # get the remaining elements in the list
        proc = Process( target=merge_sort_multi, args=( a[n*step:], ) )
      p.append(proc)

    ''' http://docs.python.org/library/multiprocessing.html#multiprocessing.Process.start & 
    http://docs.python.org/library/multiprocessing.html#multiprocessing.Process.join each Process '''    
    for proc in p:
      proc.start()
    ''' Corrected! '''
    for proc in p:
      proc.join()
    print 'Performing final merge...'
    start_time_final_merge = time.time()
    p = []
    ''' For a core count greater than 2, we can use multiprocessing 
    again to merge sublists in parallel '''
    if len(responses) > 2:
      while len(responses) > 0:
        ''' we remove sublists from the "responses" list and pass it as input to the 
        "merge_multi" wrapper function of "merge" '''
        proc = Process( target=merge_multi, args=(responses.pop(0),responses.pop(0)) )
        p.append( proc )
      # again starting and joining ( this seems like a pattern, doesn't it ... ? )
      for proc in p:
        proc.start()
      for proc in p:
        proc.join()
    # finally we have 2 sublists which need to be merged 
    a = merge(responses[0], responses[1])
    # ... anddd time!
    final_merge_time = time.time() - start_time_final_merge
    print 'Final merge duration : ', final_merge_time
    multi_core_time = time.time() - start_time
    # of course we double-check that we did everything right
    print 'Sorted arrays equal : ', (a == single)
    print '%d-Core ended: %4.6f sec'%(cores, multi_core_time)
    # we write down the results in our log file
    f.write("%d %4.3f %4.3f %4.2f %4.3f\n"%(l, single_core_time, multi_core_time,\  
            multi_core_time/single_core_time, final_merge_time))
    f.close()

I have done some test runs on my VPS, which uses a 4-core CPU, Intel Xeon L5520 @ 2.27GHz, as /proc/cpuinfo declares. However, the test runs have led me to the (easy and obvious) conclusion that performance is not stable (surprise-surprise) since I am using a virtual machine. All time measurements are in seconds. The 2 charts depict the change in processing time (y-axis) with the increase in the number of list elements (x-axis).

Abbreviations:

SP
Single Process Time
MP
Multiple Process Time
MPMT
Multi-process Final Merge Time

Using 2 cores

Elements SP MP MP/SP (%) MPMT MPMT/MP (%)
464,365 132.278 104.410 78.93 66.288 63.49
479,585 141.993 107.780 75.91 64.916 60.23
628,561 244.518 183.056 74.86 114.838 62.73
680,722 285.694 229.938 80.48 142.639 62.03
703,865 305.931 234.565 76.67 148.318 63.23
729,973 330.974 254.887 77.01 162.184 63.63
762,260 372.593 277.687 74.53 173.243 62.39
787,132 401.388 296.826 73.95 189.222 63.75
827,259 432.098 335.185 77.57 212.990 63.54
831,855 445.834 338.542 75.93 217.752 64.32

Using 4 cores

Elements SP MP MP/SP (%) MPMT MPMT/MP (%)
321,897 68.038 45.711 67.18 38.777 84.83
347,426 82.131 54.614 66.50 46.272 84.73
479,979 153.305 101.320 66.09 87.410 86.27
574,747 215.026 147.384 68.54 127.574 86.56
657,324 272.733 196.504 72.05 173.142 88.11
693,975 311.907 219.233 70.29 191.414 87.31
703,379 332.943 232.885 69.95 196.705 84.46
814,617 440.827 312.758 70.95 273.334 87.39
837,964 446.744 323.348 72.38 283.348 87.63
858,084 476.886 363.580 76.24 320.736 88.22

You can download the code here.
I’d be more than happy to read any comments!

Cheers.


Edits:

  1. I have corrected the code because when attempting to sort the list with Python sort method, it actually operates also on the original list because assignments creates a reference to the same object and not a copy. I will re-run the benchmarks some time soon. Code is also corrected in the tar archive file available for download.
  2. Benchmarks and charts are corrected!
Advertisements
Tagged , ,

7 thoughts on “MergeSort Example using Python Multiprocessing

  1. Amr says:

    Good stuff 🙂

    but instead of :

    for proc in p:
    proc.start()
    proc.join()

    you should do:

    for proc in p:
    proc.start()
    for proc in p:
    proc.join()

    actually without this, only one core would run, because proc.join() blocks until the process is done.

  2. harish says:

    is it not at your line 74 your a is already sorted and hence use of merge sort reduces time…I treid usiing b = list(a) and used merge_sort first but nw parallel is showing more time than the serial one

  3. alex says:

    Hello,
    I’m running this program using Python3.4 on a Windows 8.1 machine. All works fine until this line: a = merge(responses[0], responses[1]) #which merges the two sub lists.
    I get the following error message:

    IndexError: list index out of range

    Not sure why this is happening? Any suggestions?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: