"Parallel.For" for Java

Java 8: Parallel FOR loop

Read up on streams, they're all the new rage.

Pay especially close attention to the bit about parallelism:

"Processing elements with an explicit for-loop is inherently serial. Streams facilitate parallel execution by reframing the computation as a pipeline of aggregate operations, rather than as imperative operations on each individual element. All streams operations can execute either in serial or in parallel."

So to recap, there are no parallel for-loops, they're inherently serial. Streams however can do the job. Take a look at the following code:

    Set<Server> servers = getServers();
Map<String, String> serverData = new ConcurrentHashMap<>();

servers.parallelStream().forEach((server) -> {
serverData.put(server.getIdentifier(), server.fetchData());
});

How to parallelize for loop in java?

If you replace your for loops with Int Streams, it is very easy to make it run in parallel:

Example:

  IntStream.range(0, 10).parallel().forEach(i -> {
System.out.println(i);
});

Parallel.For for Java?

I guess the closest thing would be:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
for (final Object o : list) {
exec.submit(new Runnable() {
@Override
public void run() {
// do stuff with o.
}
});
}
} finally {
exec.shutdown();
}

Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();

Edit: Decided to add a basic "Parallel.For" implementation

public class Parallel {
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();

private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));

public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
try {
// invokeAll blocks for us until all submitted tasks in the call complete
forPool.invokeAll(createCallables(elements, operation));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
for (final T elem : elements) {
callables.add(new Callable<Void>() {
@Override
public Void call() {
operation.perform(elem);
return null;
}
});
}

return callables;
}

public static interface Operation<T> {
public void perform(T pParameter);
}
}

Example Usage of Parallel.For

// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
elems.add(i);
}
Parallel.For(elems,
// The operation to perform with each item
new Parallel.Operation<Integer>() {
public void perform(Integer param) {
System.out.println(param);
};
});

I guess this implementation is really more similar to Parallel.ForEach

Edit
I put this up on GitHub if anyone is interested. Parallel For on GitHub

Java 8: How can I convert a for loop to run in parallel?

I used the following code in Java 8 and it did the work. I was able to reduce the batch job to run from 28 minutes to 3:39 minutes.

IntStream.range(0, 100000).parallel().forEach(i->{
restTemplate.exchange(url, HttpMethod.GET, request, String.class);
}
});

Parallel Processing or Multithreading inside a for loop Java 7

This is how it would typically be done in Java 7. A CountDownLatch is used for waiting till all the sites are called.

    int numOfThreads = 5;
List<String> permissionsList = Collections.synchronizedList(new ArrayList<>());
List<Integer> sites = getSites();
CountDownLatch countDownLatch = new CountDownLatch(sites.size());
ExecutorService es = Executors.newFixedThreadPool(numOfThreads);
for (Integer site : sites) {
es.submit(new Runnable() {
@Override
public void run() {
try {
String perm = authenticationService.getUserPermissionsForSite(site);
permissionsList.add(perm);
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();

Parallel counting - Java

Whenever more than one threads can access a value that is mutable then the system goes out of sync meaning the kind of problem that you are facing. No one can be sure what the result will be, and many a times the result will be wrong. You cannot guarantee which thread will write the value last.

Therefore, you need to synchronize the access to the shared resource (the integer you are incrementing) so that all threads get the latest updated value and the answer is always correct.

Coming to your program you can try making the parallelCount variable an Atomic Integer like AtomicInteger parallelCount = new AtomicInteger(); An Atomic Integer is thread safe meaning that they can be concurrently updated without running the system out of sync.

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class Counter {

private static AtomicInteger parallelCount = new AtomicInteger();
private static int sequentialCount = 0;

public static void main(String[] args) {

int n = 1000;

// I count in parallel:
IntStream.range(0, n).parallel().forEach(i -> {
parallelCount.getAndIncrement();
});

// I count sequentially:
for (int i = 0; i < n; i++) {
sequentialCount++;
}

System.out.println("parallelCount = " + parallelCount);
System.out.println("sequentialCount = " + sequentialCount);

}

}

How do I run something parallel in Java?

Some of the tasks you are forking attempt to use the same array for evaluating different combinations. You can solve the issue by creating a distinct array for each task or by limiting the parallelism to those tasks which already have an array on their own, i.e. those with different length.

But there’s another possibility; don’t use arrays at all. You can store combinations into int values, as each int value is a combination of bits. This does not only save a lot of memory, but you can also easily iterate over all possible combinations by just incrementing the value, as iterating over all int numbers also iterates over all possible bit combinations¹. The only thing we need to implement is generating the right string for a particular int value by interpreting the bits as numbers according to their position.

For a first attempt, we can take the easy way and use already existing classes:

public static void main(String[] args) {
long t0 = System.nanoTime();
combinations(10, 25);
long t1 = System.nanoTime();
System.out.println((t1 - t0)/1_000_000+" ms");
System.out.flush();
}
static void combinations(int start, int end) {
for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
System.out.println(
BitSet.valueOf(new long[]{i}).stream()
.mapToObj(b -> String.valueOf(b + start))
.collect(Collectors.joining(", ", "[", "]"))
);
}
}

The method uses an exclusive end, so for your example, you have to call it like combinations(0, 3) and it will print

[0]
[1]
[0, 1]
[2]
[0, 2]
[1, 2]
[0, 1, 2]
3 ms


of course, timing may vary

For the combinations(10, 25) example above, it prints all combinations, followed by 3477 ms on my machine. This sounds like an opportunity to optimize, but we should first think about which operations impose which costs.

Iterating over the combinations has been reduced to a trivial operation here. Creating the string is an order of magnitude more expensive. But this is still nothing compared to the actual printing which includes a data transfer to the operating system and, depending on the system, the actual rendering may add to our time. Since this is done while holding a lock within PrintStream, all threads attempting to print at the same time would be blocked, making it a nonparallelizable operation.

Let’s identify the fraction of the cost, by creating a new PrintStream, disabling the auto-flush on line breaks and using an insanely large buffer, capable of holding the entire output:

public static void main(String[] args) {
System.setOut(new PrintStream(
new BufferedOutputStream(new FileOutputStream(FileDescriptor.out),1<<20),false));
long t0 = System.nanoTime();
combinations(10, 25);
long t1 = System.nanoTime();
System.out.flush();
long t2 = System.nanoTime();
System.out.println((t1 - t0)/1_000_000+" ms");
System.out.println((t2 - t0)/1_000_000+" ms");
System.out.flush();
}
static void combinations(int start, int end) {
for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
System.out.println(
BitSet.valueOf(new long[]{i}).stream()
.mapToObj(b -> String.valueOf(b + start))
.collect(Collectors.joining(", ", "[", "]"))
);
}
}

On my machine, it prints something in the order of

93 ms
3340 ms

Showing that the code spent more than three seconds on the nonparallelizable printing and only about 100 milliseconds on the calculation. For completeness, the following code goes a level down for the String generation:

static void combinations(int start, int end) {
for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
System.out.println(bits(i, start));
}
}
static String bits(int bits, int offset) {
StringBuilder sb = new StringBuilder().append('[');
for(;;) {
int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit);
sb.append(num + offset);
bits -= bit;
if(bits == 0) break;
sb.append(", ");
}
return sb.append(']').toString();
}

which halves the calculation time on my machine, while having no noticable impact on the total time, which shouldn’t come as a surprise now.


But for education purposes, ignoring the lack of potential acceleration, let’s discuss how we would parallelize this operation.

The sequential code did already bring the task into a form which boils down to an iteration from a start value to an end value. Now, we rewrite this code to a ForkJoinTask (or suitable subclass) which represents an iteration with a start and end value. Then, we add the ability to split this operation into two, by splitting the range in the middle, so we get two tasks iterating over each half of the range. This can be repeated until we decide to have enough potentially parallel jobs and perform the current iteration locally. After the local processing we have to wait for the completion of any task we split off, to ensure that the completion of the root task implies the completion of all subtasks.

public class Combinations extends RecursiveAction {
public static void main(String[] args) {
System.setOut(new PrintStream(new BufferedOutputStream(
new FileOutputStream(FileDescriptor.out),1<<20),false));
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
long t0 = System.nanoTime();
Combinations job = Combinations.get(10, 25);
pool.execute(job);
job.join();
long t1 = System.nanoTime();
System.out.flush();
long t2 = System.nanoTime();
System.out.println((t1 - t0)/1_000_000+" ms");
System.out.println((t2 - t0)/1_000_000+" ms");
System.out.flush();
}

public static Combinations get(int min, int max) {
return new Combinations(min, 1, (1 << (max - min)) - 1);
}

final int offset, from;
int to;

private Combinations(int offset, int from, int to) {
this.offset = offset;
this.from = from;
this.to = to;
}

@Override
protected void compute() {
ArrayDeque<Combinations> spawned = new ArrayDeque<>();
while(getSurplusQueuedTaskCount() < 2) {
int middle = (from + to) >>> 1;
if(middle == from) break;
Combinations forked = new Combinations(offset, middle, to);
forked.fork();
spawned.addLast(forked);
to = middle - 1;
}
performLocal();
for(;;) {
Combinations forked = spawned.pollLast();
if(forked == null) break;
if(forked.tryUnfork()) forked.performLocal(); else forked.join();
}
}

private void performLocal() {
for(int i = from, stop = to; i <= stop; i++) {
System.out.println(bits(i, offset));
}
}

static String bits(int bits, int offset) {
StringBuilder sb = new StringBuilder().append('[');
for(;;) {
int bit=Integer.lowestOneBit(bits), num=Integer.numberOfTrailingZeros(bit);
sb.append(num + offset);
bits -= bit;
if(bits == 0) break;
sb.append(", ");
}
return sb.append(']').toString();
}
}

The getSurplusQueuedTaskCount() provides us with a hint about the saturation of the worker threads, in other words, whether forking more jobs might be beneficial. The returned number is compared with a threshold that is typically a small number, the more heterogeneous the jobs and hence, the expected workload, the higher should be the threshold to allow more work-stealing when jobs complete earlier than others. In our case, the workload is expected to be very well balanced.

There are two ways of splitting. Examples often create two or more forked subtasks, followed by joining them. This may lead to a large number of tasks just waiting for others. The alternative is to fork a subtask and alter the current task, to represent the other. Here, the forked task represents the [middle, to] range whereas the current task is modified to represent the [from, middle] range.

After forking enough tasks, the remaining range is processed locally in the current thread. Then, the task will wait for all forked subtasks, with one optimization: it will try to unfork the subtasks, to process them locally if no other worker thread has stolen them yet.

This works smoothly, but unfortunately, as expected, it does not accelerate the operation, as the most expensive part is the printing.


¹ Using an int to represent all combinations reduces the supported range length to 31, but keep in mind that such a range length implies 2³¹ - 1 combinations, which is quite a lot to iterate over. If that still feels like a limitation, you may change the code to use long instead. The then-supported range length of 63, in other words 2⁶³ - 1 combinations, is enough to keep to computer busy until the end of the universe.

Parallel FOR Iteration using Generics and Iterables on Java

This operation is usually called zip. If you want to check one implementation, the JOOL library provides a quite nice Seq.zip() method which zips the elements of two collections in tuples.

For example

Seq.of(1, 2, 3).zip(Seq.of("a", "b", "c"));

returns a

Seq.of(tuple(1, "a"), tuple(2, "b"), tuple(3, "c"));

I miss how simple Kotlin makes these operations :D.



Related Topics



Leave a reply



Submit