Completablefuture Is Not Getting Executed. If I Use the Executorservice Pool It Works as Expected But Not with the Common Forkjoinpool

The Strange behavior of CompletableFuture

From ForkJoinPool

...All worker threads are initialized with Thread.isDaemon() set true.

From Executors.defaultThreadFactory

...Each new thread is created as a non-daemon thread.

From Thread.setDaemon

...The Java Virtual Machine exits when the only threads running are all daemon threads.

In the first case, the thread is coming from ForkJoinPool.commonPool and hence it is daemon.

And in the second case, the thread is created using Executors.defaultThreadFactory, so is non-daemon.

This explain the different behaviour.

Why does ExecutorService waits for all threads to complete but Completable future not?

From my viewpoint, 'CompletableFuture' does not itself execute anything, so it has no threads to wait for. It relies on other mechanisms for running stages.

'runAsync', without an Executor, runs tasks in its common ForkJoin pool, which is documented as having the behaviour you observe.

That doesn't answer your question of 'why', except to say that it's intentionally designed that way. I can only hand-wave and say that its designers likely considered it to be the best default choice.

(I concur: In code I've written, if I get to the point of program termination, what I want is for everything to just go away. In the rare case I need it to complete, I'd wait for it before exiting.)

CompletableFuture with supplyAsync not working as expected

The result that you see is just a coincidence. The main thread exits before your async tasks are executed/completed. Add CompletableFuture::join at the end and you will always see the result :

CompletableFuture.supplyAsync(() - > {

return MailUtil.getMailInfo();

}).thenAccept(content - > {
System.out.println("Mail content: " + content);
}).join();

This is because when you use supplyAsync your task will be executed in one of the threads from ForkJoinPool.commonPool and by default threads from this pool are daemon threads so they do not stop JVM from exiting. You can verify this by checking the default thread factory associated with commonPool :

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
ForkJoinWorkerThread forkJoinWorkerThread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool);

boolean daemon = forkJoinWorkerThread.isDaemon();
System.out.println(daemon);

And the output is true.

Behaviour of ForkJoinPool in CompletableFuture.supplyAsync()

In both cases when you do

CompletableFuture.supplyAsync(action1, executorService)
.thenApply (action2)
.thenAccept (res -> System.out.println(res));

you don't wait for task completition. But then you program is going to exit and there is differences how common fork join pool:

ForkJoinPool.commonPool()

and regular executor service:

final ExecutorService executorService = Executors.newFixedThreadPool(4);

..react on attempt to call System.exit(...) equivalent.

This is what doc says about fork join common pool, you should point attention to that:

However this pool and any ongoing processing are automatically
terminated upon program System.exit(int)
. Any program that relies on
asynchronous task processing to complete before program termination
should invoke commonPool().awaitQuiescence, before exit.

That is link to ExecutorService docs, you may point attention to:

The shutdown() method will allow previously submitted tasks to execute
before terminating

I think that may be a difference you asking about.

Main thread exits before the completion of CompletableFuture

Your task provided in the supplyAsync will execute in the ForkJoinPool#commonPool. And if you take a look on the thread in the common pool you can see that they are Deamon thread, that means JVM will not wait to Shutdown for that daemon thread to complete its execution when there is no active non-daemon thread.In your case, you have a sleep in composeMethod and in the meantime, main thread executes and completed its work, and JVM does not have any active non-daemon thread.So, JVM will go to shut down without waiting for your task to complete. If you run the below example you can confirm about the thread pool and type of thread.

CompletableFuture feature = CompletableFuture.supplyAsync(() -> {
System.out.println("Thread : " + Thread.currentThread());
System.out.println("Is Daemon : " + Thread.currentThread().isDaemon());
return composeMethod();
}).thenAccept(s -> System.out.println("wassup java" + s)).thenRun(() -> System.out.println(" imm immortal"));

Output:

Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main] // This line may be bit different but it indicates that your task is executing on Common pool
Is Daemon : true
why should i wait for you

To resolve the issue you can pass your own executor like below:

ExecutorService executorService = Executors.newFixedThreadPool(8);
CompletableFuture feature = CompletableFuture.supplyAsync(() -> {
System.out.println("Thread : " + Thread.currentThread());
System.out.println("Is Daemon : " + Thread.currentThread().isDaemon());
return composeMethod();
}, executorService).thenAccept(s -> System.out.println("wassup java" + s))
.thenRun(() -> System.out.println(" imm immortal"));
executorService.shutdown();
nonblockingmethod();

Output:

Thread : Thread[pool-1-thread-1,5,main]
Is Daemon : false
why should i wait for you
wassup java resultdaa
imm immortal

Why does my CompletableFuture code run in Java 8 but not in Java 11?

TL;DR - add ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); after your call to CompletableFuture.runAsync and at the end of your code so that System.exit doesn't stop your runnable. That way you'll get you behavior.


Longer answer:

Okay, so first things first, I tried both examples in Oracles java 8, OpenJDK 8, and OpenJDK 11. Consistent behavior across the board, so my answer is that nothing has changed in these implementations of different java versions that would cause this discrepancy. In both examples, the behavior you see is consistent with what Java tells you it will do.

From the documentation of CompletableFuture.runAsync

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.

Okay... let's see what ForkJoinPool.commonPool will tell us (emphasis mine):

Returns the common pool instance. This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow(). However this pool and any ongoing processing are automatically terminated upon program System.exit(int). Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

Aha, so that's why we don't see the countdown when using the common pool, it's because the common pool will be terminated upon system exit, which is exactly what happens when we return from the method and exit the program (assuming your example is really that simple as you show it.... like with a single method call in main... anyways)

So why would the custom executor work? Because, as you've already noticed, that executor has not been terminated. There is still a piece of code running in the background, although idly, that Java doesn't have the power to stop.


So what can we do now?

One option is to do our own executor and shut it down once we are done, much like you have suggested. I would argue that this approach isn't all that bad after all to use.

Second option is to follow what the java doc says.

Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

public boolean awaitQuiescence​(long timeout,
TimeUnit unit)

If called by a ForkJoinTask operating in this pool, equivalent in effect to ForkJoinTask.helpQuiesce(). Otherwise, waits and/or attempts to assist performing tasks until this pool isQuiescent() or the indicated timeout elapses.

So we can call that method and specify a timeout for all common processes in the common pool. My opinion is that this is somewhat business specific, since now you have to answer the question - What the heck should the timeout be now??.

Third option is to use the power of CompletableFutures and hoist this runAsync method to a variable:

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> ...
...
...
bla bla bla code bla bla bla
...
...
voidCompletableFuture.join();
// or if you want to handle exceptions, use get
voidCompletableFuture.get();

and then right when you need it, you join()/get() whatever you need to have as a return value. I prefer this the most since the code is most clean and understandable like this. Also I can chain my CFs all I want and do funky stuff with them.


In the case where you don't need a return value and don't need to do anything else and just want a simple return of a string and async processing of counting from 1 to 20, then just shove ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); somewhere convenient to you and give it some ridiculous timeout, thus guaranteeing you'll exit upon all idle processes.

Why does CompletableFuture not run the task with runAsync?

The ForkJoinPool CommonPool which runs your tasks will shut down when the program exits:

    /**
* Returns the common pool instance. This pool is statically
* constructed; its run state is unaffected by attempts to {@link
* #shutdown} or {@link #shutdownNow}. However this pool and any
* ongoing processing are automatically terminated upon program
* {@link System#exit}. Any program that relies on asynchronous
* task processing to complete before program termination should
* invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
* before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}

If I modify your code to add awaitQuiescence and log the execution of the tasks I get:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;

public class FutureTextData {
private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
public CompletableFuture<Void> futureForText;

public void getCharInText(String text){
futureForText = CompletableFuture.runAsync(() -> {
System.out.println("Running first task");
map.put("foo", 1);
});
}

public void recordCharInText(String outPutFile){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running second task");

File file = new File(outPutFile);

try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
for(Map.Entry<String ,Integer> entry:map.entrySet()){
bf.write(entry.getKey() +"<----->" + entry.getValue());

}

} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText("xyz");
futureTextData.recordCharInText("outFile.txt");
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
}
}

which produces:

Running first task
Running second task

Process finished with exit code 0

Make Thread usable if it waits for other stuff

It sounds like you want to run a bunch of tasks on background threads and wait for them all to complete, then collect their returned results.

Executors

The Executors framework was added to Java 5 to make this kind of work easy.

Define your task as a Runnable or Callable. In your case, a Callable because you want a value returned.

In our task, we simulate work that will take a long while by sleeping the current thread for three seconds. To simulate a result to be returned, we capture the current moment as an Instant object.

Callable < Instant > task = () -> {
Thread.sleep( Duration.ofSeconds( 3 ).toMillis() ); // Simulating work that takes a long while.
return Instant.now(); // Simulating result of work to be returned.
};

Collect a bunch of tasks to be executed on background threads. Here we assign the same task object several time. You could alternatively instantiate multiple task objects.

List < Callable < Instant > > tasks = new ArrayList <>();  // Collect tasks to be executed. 
int limit = 5;
for ( int i = 0 ; i < limit ; i++ ) { tasks.add( task ); }

Instantiate a ExecutorService via Executors utility class.

You have a variety of executor service implementations to choose from with various behaviors. If you know you will have a small number of tasks to be run at a time, you may choose a cached thread pool which will generate a bunch of threads at a time. For a large number of tasks, choose another ExecutorService to avoid overwhelming your machine.

ExecutorService executorService = Executors.newCachedThreadPool();

FYI… In the future, Project Loom may bring virtual threads (fibers) to Java to enable millions of simultaneous threads (if not CPU-bound). Experimental builds are available now, based on early-access Java 18. You would use another executor service:

ExecutorService executorService = Executors.newVirtualThreadExecutor();

If you have a large number of tasks, and are not yet using Project Loom technology, you may want to choose an executor service backed by a limited number of threads.

ExecutorService executorService = Executors.newFixedThreadPool( 3 );

Submit our collection of tasks to be executed. The result of each task submitted is a Future. We track all the expected results as a collection of Future object.s

List < Future < Instant > > futures = null;
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }

Start the shutdown of our executor service. This prevents the submission of any more tasks.

executorService.shutdown();

Wait for all the submitted tasks to complete. Assign a time-out to throw exception if the tasks are taking longer than expected.

try { executorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }

Report results. Loop each Future object. See if the task was cancelled. If not cancelled, then we know it must have completed because we are past waiting for the executor service to shut down.

for ( Future < Instant > future : futures )
{
if ( future.isCancelled() )
{
System.out.println( "Canceled." );
} else
{
try { System.out.println( future.get() ); } catch ( InterruptedException e ) { e.printStackTrace(); } catch ( ExecutionException e ) { e.printStackTrace(); }
}
}

When run, with a pair of System.out.println calls around our executor service’s executions. You can see all the tasks are submitted just ofter 21 second, they all sleep for 3 seconds, then all wake and execute at 24 second.

This run was on a MacBook Pro (13-inch, Apple Silicon M1, 2020) with 8 (4 performance and 4 efficiency) cores, using early-access Java 17.

INFO - Starting execution at 2021-08-28T21:33:21.514429Z
INFO - Ending execution at 2021-08-28T21:33:24.525798Z
2021-08-28T21:33:24.524563Z
2021-08-28T21:33:24.522178Z
2021-08-28T21:33:24.522177Z
2021-08-28T21:33:24.522178Z
2021-08-28T21:33:24.522181Z

Scheduling of threads

Your comment is correct. A blocked thread in Java does not block use of a CPU core.

Current implementations of Java (at least through Java 17) use host OS threads as Java threads. This means the scheduling of which thread runs on which CPU core, and for how long, is controlled by the host OS.

Currently, if your Java code blocks, the Java thread blocks, and therefore the host OS thread blocks. Whether a thread, blocked or not, remains executing on the CPU core is up to the host OS.

Be aware that switching between host OS threads for execution on a CPU core is relatively “expensive”, costing CPU cycles for overhead, and possibly excessive memory assigned. This is why you should not overburden your machine with too many threads. Roughly the same number of threads as cores is a general guideline, though it varies depending on the nature of your tasks being executed.

“Cheaper” threads in Project Loom

As mentioned above, virtual threads in Project Loom promise to make blocking Java code amongst Java threads much “cheaper”, meaning less memory and less CPU overhead.

A blocked virtual thread in Loom technology will much more quickly and easily switch the CPU core to work on another virtual thread. These virtual threads map to “real” host platform OS thread, many-to-one. These efficiencies mean that even millions of simultaneous threads may be reasonable on conventional hardware.

I am deliberately over-simplifying here. For full details on current threading technology and Project Loom’s changes, see recent presentations and interviews with Ron Pressler and other members of Project Loom.

You said:

i thought that if i assign 5 threads to my threadpool, then i block 5 cpu cores.

As discussed above, a blocked Java thread does not block the CPU core. The host OS may choose to run other threads on that core at any time, whether your Java thread is blocked or not. By “other threads” I mean Java threads or threads of other apps. So keep in mind the bigger picture: Your Java thread may be paused, at any moment, for any length of time, as the host OS sees fit given current operating conditions on that machine.

However, within Java, if you use an executor service backed by a fixed-size thread pool of five threads, with pending tasks submitted but not yet started, and all five current tasks happen to block, then no more work is performed by that executor service until the current blocking clears.

This is what changes under Project Loom with virtual threads: Any blocked virtual thread is set aside (“parked”) by the JVM (not the host OS), so that its host OS thread can immediately start executing one of the many other virtual threads sharing that “real” OS thread.



Related Topics



Leave a reply



Submit