Multithreaded Quicksort or Mergesort

How to implement a multi-threaded MergeSort in Java

The most convenient multi-threading paradigm for a Merge Sort is the fork-join paradigm. This is provided from Java 8 and later. The following code demonstrates a Merge Sort using a fork-join.

import java.util.*;
import java.util.concurrent.*;

public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
private List<N> elements;

public MergeSort(List<N> elements) {
this.elements = new ArrayList<>(elements);
}

@Override
protected List<N> compute() {
if(this.elements.size() <= 1)
return this.elements;
else {
final int pivot = this.elements.size() / 2;
MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));

leftTask.fork();
rightTask.fork();

List<N> left = leftTask.join();
List<N> right = rightTask.join();

return merge(left, right);
}
}

private List<N> merge(List<N> left, List<N> right) {
List<N> sorted = new ArrayList<>();
while(!left.isEmpty() || !right.isEmpty()) {
if(left.isEmpty())
sorted.add(right.remove(0));
else if(right.isEmpty())
sorted.add(left.remove(0));
else {
if( left.get(0).compareTo(right.get(0)) < 0 )
sorted.add(left.remove(0));
else
sorted.add(right.remove(0));
}
}

return sorted;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(Arrays.asList(7,2,9,10,1)));
System.out.println("result: " + result);
}
}

While much less straight forward the following varient of the code eliminates the excessive copying of the ArrayList. The initial unsorted list is only created once, and the calls to sublist do not need to perform any copying themselves. Before we would copy the array list each time the algorithm forked. Also, now, when merging lists instead of creating a new list and copying values in it each time we reuse the left list and insert our values into there. By avoiding the extra copy step we improve performance. We use a LinkedList here because inserts are rather cheap compared to an ArrayList. We also eliminate the call to remove, which can be expensive on an ArrayList as well.

import java.util.*;
import java.util.concurrent.*;

public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
private List<N> elements;

public MergeSort(List<N> elements) {
this.elements = elements;
}

@Override
protected List<N> compute() {
if(this.elements.size() <= 1)
return new LinkedList<>(this.elements);
else {
final int pivot = this.elements.size() / 2;
MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));

leftTask.fork();
rightTask.fork();

List<N> left = leftTask.join();
List<N> right = rightTask.join();

return merge(left, right);
}
}

private List<N> merge(List<N> left, List<N> right) {
int leftIndex = 0;
int rightIndex = 0;
while(leftIndex < left.size() || rightIndex < right.size()) {
if(leftIndex >= left.size())
left.add(leftIndex++, right.get(rightIndex++));
else if(rightIndex >= right.size())
return left;
else {
if( left.get(leftIndex).compareTo(right.get(rightIndex)) < 0 )
leftIndex++;
else
left.add(leftIndex++, right.get(rightIndex++));
}
}

return left;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(Arrays.asList(7,2,9,-7,777777,10,1)));
System.out.println("result: " + result);
}
}

We can also improve the code one step further by using iterators instead of calling get directly when performing the merge. The reason for this is that get on a LinkedList by index has poor time performance (linear) so by using an iterator we eliminate slow-down caused by internally iterating the linked list on each get. The call to next on an iterator is constant time as opposed to linear time for the call to get. The following code is modified to use iterators instead.

import java.util.*;
import java.util.concurrent.*;

public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
private List<N> elements;

public MergeSort(List<N> elements) {
this.elements = elements;
}

@Override
protected List<N> compute() {
if(this.elements.size() <= 1)
return new LinkedList<>(this.elements);
else {
final int pivot = this.elements.size() / 2;
MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));

leftTask.fork();
rightTask.fork();

List<N> left = leftTask.join();
List<N> right = rightTask.join();

return merge(left, right);
}
}

private List<N> merge(List<N> left, List<N> right) {
ListIterator<N> leftIter = left.listIterator();
ListIterator<N> rightIter = right.listIterator();
while(leftIter.hasNext() || rightIter.hasNext()) {
if(!leftIter.hasNext()) {
leftIter.add(rightIter.next());
rightIter.remove();
}
else if(!rightIter.hasNext())
return left;
else {
N rightElement = rightIter.next();
if( leftIter.next().compareTo(rightElement) < 0 )
rightIter.previous();
else {
leftIter.previous();
leftIter.add(rightElement);
}
}
}

return left;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(Arrays.asList(7,2,9,-7,777777,10,1)));
System.out.println("result: " + result);
}
}

Finally the most complex versions of the code, this iteration uses an entirely in-place operation. Only the initial ArrayList is created and no additional collections are ever created. As such the logic is particularly difficult to follow (so i saved it for last). But should be as close to an ideal implementation as we can get.

import java.util.*;
import java.util.concurrent.*;

public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
private List<N> elements;

public MergeSort(List<N> elements) {
this.elements = elements;
}

@Override
protected List<N> compute() {
if(this.elements.size() <= 1)
return this.elements;
else {
final int pivot = this.elements.size() / 2;
MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));

leftTask.fork();
rightTask.fork();

List<N> left = leftTask.join();
List<N> right = rightTask.join();

merge(left, right);
return this.elements;
}
}

private void merge(List<N> left, List<N> right) {
int leftIndex = 0;
int rightIndex = 0;
while(leftIndex < left.size() ) {
if(rightIndex == 0) {
if( left.get(leftIndex).compareTo(right.get(rightIndex)) > 0 ) {
swap(left, leftIndex++, right, rightIndex++);
} else {
leftIndex++;
}
} else {
if(rightIndex >= right.size()) {
if(right.get(0).compareTo(left.get(left.size() - 1)) < 0 )
merge(left, right);
else
return;
}
else if( right.get(0).compareTo(right.get(rightIndex)) < 0 ) {
swap(left, leftIndex++, right, 0);
} else {
swap(left, leftIndex++, right, rightIndex++);
}
}
}

if(rightIndex < right.size() && rightIndex != 0)
merge(right.subList(0, rightIndex), right.subList(rightIndex, right.size()));
}

private void swap(List<N> left, int leftIndex, List<N> right, int rightIndex) {
//N leftElement = left.get(leftIndex);
left.set(leftIndex, right.set(rightIndex, left.get(leftIndex)));
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(new ArrayList<>(Arrays.asList(5,9,8,7,6,1,2,3,4))));
System.out.println("result: " + result);
}
}

Multi threaded quick sort much slower than expected

I don't know java that well, so the example code below may be an awkward usage of runnable for the multiple threads. This example code uses 8 threads, qsortmt() does a partition and starts two instances of qsort0(). Each instance of qsort0() does a partition and invokes two instances of qsort1(). Each instance of qsort1() does a partition and invokes two instances of qsort2(). Each instance of qsort2() calls qsort(). For the 16 million integers used in this example, the 8 threaded sort takes about 1 second, while a non-threaded sort takes about 1.6 seconds, so not a huge savings. Part of the issue is the partition steps are done before invoking threads to operate operate on the sub-partitions.

Switching to C++ and Windows native threads, 8 threads took about 0.632 seconds, non-threaded about 1.352 seconds. Switching to merge sort, splitting the array into 8 parts, sorting each part, then merging the 8 parts took about 0.40 seconds, single threaded about 1.45 seconds.

package x;
import java.util.Random;

public class x {

class qsort0 implements Runnable
{
int[] a;
int lo;
int hi;

private qsort0(int[] a, int lo, int hi)
{
this.a = a;
this.lo = lo;
this.hi = hi;
}
@Override
public void run()
{
if(this.lo >= this.hi)
return;
int pi = partition(this.a, this.lo, this.hi);
Thread lt = new Thread(new qsort1(a, this.lo, pi));
Thread rt = new Thread(new qsort1(a, pi+1, this.hi));
lt.start();
rt.start();
try {lt.join();} catch (InterruptedException ex){}
try {rt.join();} catch (InterruptedException ex){}
}
}

class qsort1 implements Runnable
{
int[] a;
int lo;
int hi;

private qsort1(int[] a, int lo, int hi)
{
this.a = a;
this.lo = lo;
this.hi = hi;
}
@Override
public void run()
{
if(this.lo >= this.hi)
return;
int pi = partition(this.a, this.lo, this.hi);
Thread lt = new Thread(new qsort2(a, this.lo, pi));
Thread rt = new Thread(new qsort2(a, pi+1, this.hi));
lt.start();
rt.start();
try {lt.join();} catch (InterruptedException ex){}
try {rt.join();} catch (InterruptedException ex){}
}
}

class qsort2 implements Runnable
{
int[] a;
int lo;
int hi;
private qsort2(int[] a, int lo, int hi)
{
this.a = a;
this.lo = lo;
this.hi = hi;
}
@Override
public void run() {
if(this.lo >= this.hi)
return;
qsort(this.a, this.lo, this.hi);
}
}

// quicksort multi-threaded
@SuppressWarnings("empty-statement")
public static void qsortmt(int[] a, int lo, int hi)
{
if(lo >= hi)
return;
int pi = partition(a, lo, hi);
Thread lt = new Thread(new x().new qsort0(a, lo, pi));
Thread rt = new Thread(new x().new qsort0(a, pi+1, hi));
lt.start();
rt.start();
try {lt.join();} catch (InterruptedException ex){}
try {rt.join();} catch (InterruptedException ex){}
}

@SuppressWarnings("empty-statement")
public static int partition(int []a, int lo, int hi)
{
int md = lo+(hi-lo)/2;
int ll = lo-1;
int hh = hi+1;
int t;
int p = a[md];
while(true){
while(a[++ll] < p);
while(a[--hh] > p);
if(ll >= hh)
break;
t = a[ll];
a[ll] = a[hh];
a[hh] = t;
}
return hh;
}

@SuppressWarnings("empty-statement")
public static void qsort(int[] a, int lo, int hi)
{
while(lo < hi){
int ll = partition(a, lo, hi);
int hh = ll+1;
// recurse on smaller part, loop on larger part
if((ll - lo) <= (hi - hh)){
qsort(a, lo, ll);
lo = hh;
} else {
qsort(a, hh, hi);
hi = ll;
}
}
}

public static void main(String[] args)
{
int[] a = new int[16*1024*1024];
Random r = new Random(0);
for(int i = 0; i < a.length; i++)
a[i] = r.nextInt();
long bgn, end;
bgn = System.currentTimeMillis();
qsortmt(a, 0, a.length-1);
end = System.currentTimeMillis();
for(int i = 1; i < a.length; i++){
if(a[i-1] > a[i]){
System.out.println("failed");
break;
}
}
System.out.println("milliseconds " + (end-bgn));
}
}

Why is my multi threaded sorting algorithm not faster than my single threaded mergesort

The problem is not multi-threading: I've written a correctly multi-threaded QuickSort in Java and it owns the default Java sort. I did this after witnessing a gigantic dataset being process and had only one core of a 16-cores machine working.

One of your issue (a huge one) is that you're busy looping:

 // Wait for the two other threads to finish 
while(!ma.finished || !mb.finished) ;

This is a HUGE no-no: it is called busy looping and you're destroying the perfs.

(Another issue is that your code is not spawning any new threads, as it has already been pointed out to you)

You need to use other way to synchronize: an example would be to use a CountDownLatch.

Another thing: there's no need to spawn two new threads when you divide the workload: spawn only one new thread, and do the other half in the current thread.

Also, you probably don't want to create more threads than there are cores availables.

See my question here (asking for a good Open Source multithreaded mergesort/quicksort/whatever). The one I'm using is proprietary, I can't paste it.

Multithreaded quicksort or mergesort

I haven't implemented Mergesort but QuickSort and I can tell you that there's no array copying going on.

What I do is this:

  • pick a pivot
  • exchange values as needed
  • have we reached the thread limit? (depending on the number of cores)

    • yes: sort first part in this thread
    • no: spawn a new thread
  • sort second part in current thread
  • wait for first part to finish if it's not done yet (using a CountDownLatch).

The code spawning a new thread and creating the CountDownLatch may look like this:

            final CountDownLatch cdl = new CountDownLatch( 1 );
final Thread t = new Thread( new Runnable() {
public void run() {
quicksort(a, i+1, r );
cdl.countDown();
}
} };

The advantage of using synchronization facilities like the CountDownLatch is that it is very efficient and that your not wasting time dealing with low-level Java synchronization idiosynchrasies.

In your case, the "split" may look like this (untested, it is just to give an idea):

if ( threads.getAndIncrement() < 4 ) {
final CountDownLatch innerLatch = new CountDownLatch( 1 );
final Thread t = new Merger( innerLatch, b );
t.start();
mergeSort( a );
while ( innerLatch.getCount() > 0 ) {
try {
innerLatch.await( 1000, TimeUnit.SECONDS );
} catch ( InterruptedException e ) {
// Up to you to decide what to do here
}
}
} else {
mergeSort( a );
mergeSort( b );
}

(don't forget to "countdown" the latch when each merge is done)

Where you'd replace the number of threads (up to 4 here) by the number of available cores. You may use the following (once, say to initialize some static variable at the beginning of your program: the number of cores is unlikely to change [unless you're on a machine allowing CPU hotswapping like some Sun systems allows]):

Runtime.getRuntime().availableProcessors()

When merge sort is preferred over Quick sort?

I should probably start off by mentioning that both quicksort and mergesort can work just fine if you can't fit everything into memory at once. You can implement quicksort by choosing a pivot, then streaming elements in from disk into memory and writing elements into one of two different files based on how that element compares to the pivot. If you use a double-ended priority queue, you can actually do this even more efficiently by fitting the maximum number of possible elements into memory at once.

Others have mentioned the benefit that mergesort is worst-case O(n log n), which is definitely true. That said, you can easily modify quicksort to produce the introsort algorithm, a hybrid between quicksort, insertion sort, and heapsort, that's worst-case O(n log n) but retains the speed of quicksort in most cases.

It might be helpful to see why quicksort is usually faster than mergesort, since if you understand the reasons you can pretty quickly find some cases where mergesort is a clear winner. Quicksort usually is better than mergesort for two reasons:

  1. Quicksort has better locality of reference than mergesort, which means that the accesses performed in quicksort are usually faster than the corresponding accesses in mergesort.

  2. Quicksort uses worst-case O(log n) memory (if implemented correctly), while mergesort requires O(n) memory due to the overhead of merging.

There's one scenario, though, where these advantages disappear. Suppose you want to sort a linked list of elements. The linked list elements are scattered throughout memory, so advantage (1) disappears (there's no locality of reference). Second, linked lists can be merged with only O(1) space overhead instead of O(n) space overhead, so advantage (2) disappears. Consequently, you usually will find that mergesort is a superior algorithm for sorting linked lists, since it makes fewer total comparisons and isn't susceptible to a poor pivot choice.

Hope this helps!

Implementing a parallel / multithreaded merge sort on Vec

Since your question is about parallelizing and not sorting I've omitted the implementations for the serial_sort and merge functions in the example below but you can easily fill them in yourself using what code you have already:

#![feature(is_sorted)]

use crossbeam; // 0.8.0
use rand; // 0.7.3
use rand::Rng;

fn random_vec(capacity: usize) -> Vec<i64> {
let mut vec = vec![0; capacity];
rand::thread_rng().fill(&mut vec[..]);
vec
}

fn parallel_sort(data: &mut [i64], threads: usize) {
let chunks = std::cmp::min(data.len(), threads);
let _ = crossbeam::scope(|scope| {
for slice in data.chunks_mut(data.len() / chunks) {
scope.spawn(move |_| serial_sort(slice));
}
});
merge(data, chunks);
}

fn serial_sort(data: &mut [i64]) {
// actual implementation omitted for conciseness
data.sort()
}

fn merge(data: &mut [i64], _sorted_chunks: usize) {
// actual implementation omitted for conciseness
data.sort()
}

fn main() {
let mut vec = random_vec(10_000);
parallel_sort(&mut vec, 4);
assert!(vec.is_sorted());
}

playground

parallel_sort breaks the data into n chunks and sorts each chunk in its own thread and the merges the sorted chunks together before finally returning.



Related Topics



Leave a reply



Submit