In Which Thread Do Completablefuture's Completion Handlers Execute

In which thread do CompletableFuture's completion handlers execute?

The policies as specified in the CompletableFuture docs could help you understand better:

  • Actions supplied for dependent completions of non-async methods may be
    performed by the thread that completes the current CompletableFuture,
    or by any other caller of a completion method.

  • All async methods without an explicit Executor argument are performed
    using the ForkJoinPool.commonPool() (unless it does not support a
    parallelism level of at least two, in which case, a new Thread is
    created to run each task
    ). To simplify monitoring, debugging, and
    tracking, all generated asynchronous tasks are instances of the marker
    interface CompletableFuture.AsynchronousCompletionTask.

Update: I would also advice on reading this answer by @Mike as an interesting analysis further into the details of the documentation.

On which thread the callback registered on new CompletableFuture() is executed?

Only Async methods are executed by new thread, so in your case thenApply is executed by main thread

All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).

public class TestMain {

public static void main(String[] args) {

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "hello";
});
CompletableFuture<Integer> intFuture = future.thenApply(s -> {
System.out.println(Thread.currentThread().getName());
return s.length();
});

CompletableFuture<Integer> intFuture2 = future.thenApply(s -> {
System.out.println(Thread.currentThread().getName());
return s.length();
});
future.complete("hello");

}

}

Output

ForkJoinPool.commonPool-worker-1
main
main

Which executor does CompletableFuture.allOf use?

The answer of Ivan Gammel is not exact.

There is indeed no executor associated with the CompletableFuture returned by allOf(), as in fact, there isn't ever an executor associated with any CompletableFuture.

A task is associated with an executor, as it is running inside of it, but the association is inverse: the executor has a list of tasks to execute.

A task can also be associated with a CompletableFuture, which it will complete when the task finishes. The CompletableFuture itself does not keep a reference to the task or executor that were used for its creation. It may however keep references to tasks and optionally executors used in dependent stages.

The CompletableFuture returned by allOf() will be completed by a task, which is a dependant stage of the original CompletableFutures. In your example, this task can be executed by:

  • executor1, if the third task finished first;
  • executor2, if the 2 first tasks finished before the third one; or
  • the original thread, if all tasks finished before you called allOf().

This can be seen by adding a dependent thenRun() stage to the allOf() call:

public class CompletableFutureAllOfCompletion {
private ExecutorService executor1 = Executors.newFixedThreadPool(2);
private ExecutorService executor2 = Executors.newFixedThreadPool(2);
private Random random = new Random();

public static void main(String[] args) {
new CompletableFutureAllOfCompletion().run();
}

public void run() {
CompletableFuture<Integer> cf1 = supplyAsync(this::randomSleepAndReturn, executor1);
CompletableFuture<Integer> cf2 = supplyAsync(this::randomSleepAndReturn, executor1);
CompletableFuture<Integer> cf3 = supplyAsync(this::randomSleepAndReturn, executor2);
randomSleepAndReturn();
CompletableFuture.allOf(cf1, cf2, cf3)
.thenRun(() -> System.out.println("allOf() commpleted on "
+ Thread.currentThread().getName()));

executor1.shutdown();
executor2.shutdown();
}

public int randomSleepAndReturn() {
try {
final long millis = random.nextInt(1000);
System.out.println(
Thread.currentThread().getName() + " waiting for " + millis);
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
}

Some possible outputs:

Completing on first executor (third task finished first):

pool-1-thread-1 waiting for 937
pool-1-thread-2 waiting for 631
main waiting for 776
pool-2-thread-1 waiting for 615
allOf() commpleted on pool-1-thread-1

Completing on second executor (first and second task finished before the third one):

pool-1-thread-1 waiting for 308
pool-1-thread-2 waiting for 788
main waiting for 389
pool-2-thread-1 waiting for 863
allOf() commpleted on pool-2-thread-1

Completing on main thread (all tasks finished before allOf().thenRun()):

pool-1-thread-1 waiting for 168
pool-1-thread-2 waiting for 292
main waiting for 941
pool-2-thread-1 waiting for 188
allOf() commpleted on main

How to control the executor that will be used after allOf() (or anyOf())

Since there is no guarantee on the executor that will be used, a call to one of those methods should be followed by a *Async(, executor) call to control which executor will be used.

If you need to return the resulting CompletableFuture of one of those calls, just add a thenApplyAsync(i -> i, executor) before returning it.

What is the difference between thenApply and thenApplyAsync of Java CompletableFuture?

The difference has to do with the Executor that is responsible for running the code. Each operator on CompletableFuture generally has 3 versions.

  1. thenApply(fn) - runs fn on a thread defined by the CompleteableFuture on which it is called, so you generally cannot know where this will be executed. It might immediately execute if the result is already available.
  2. thenApplyAsync(fn) - runs fn on a environment-defined executor regardless of circumstances. For CompletableFuture this will generally be ForkJoinPool.commonPool().
  3. thenApplyAsync(fn,exec) - runs fn on exec.

In the end the result is the same, but the scheduling behavior depends on the choice of method.

Is CompletableFuture guaranteed to run un a new thread?

No, this guarantee does not exist.

What you are doing is unsafe.

If you look of the docs of CompletableFuture.supplyAsync(Supplier):

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

you can see that it uses a ForkJoinPool obtained from ForkJoinPool.commonPool().

From the docs of ForkJoinPool:

all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined. All worker threads are initialized with Thread.isDaemon() set true.
A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors

This means that tasks submitted may be executed in an arbituary amount of threads (number of processors by default) and those threads are re-used. If all those threads are busy, the execution may wait for previous executions to finish.

As the common pool may also be used by other parts of the application, submitted tasks should run shortly and should not block so that other tasks can be executed quickly.

While OpenJDK has a special handling for ForkJoinPool in CompletableFuture#get that makes sure that other tasks can be executed during that time, other JDKs might not provide this.

Alternative: Asynchronous handling

Instead of blocking using .get(), you may want to use methods like CompletableFuture#thenAcceptAsync(Consumer). This runs the Consumer after the future finishes.

Also, you can use CompletionStage#exceptionally to handle exceptions in an asynchronous manner.

public static void main (String[] args) throws java.lang.Exception
{
CompletableFuture<Void> s = new CompletableFuture();
CompletableFuture<Void> f = new CompletableFuture();

CompletableFuture<Void> someContext = CompletableFuture.supplyAsync(() ->
{

System.out.println(Thread.currentThread().getId());
CompletableFuture<String> update =
CompletableFuture.supplyAsync(
() -> {
String ans = null;
try {
System.out.println(Thread.currentThread().getId());
ans = "Hello";
} catch (Exception e) {
ans = e.toString();
} finally {
s.complete(null);
return ans;
}
});
s thenSupplyAsync(result->{
System.out.println(s.isDone());
}).exceptionally(e->{
System.out.println("Some error");
return null;
});

return null;
});

System.out.println(f.isDone());
}

Which executor is used when composing Java CompletableFutures?

Side Question: If you assigned the intermediate CompletionStage to a variable and call a method on it, it would get executed on the same thread.

Main Question: Only the first one, so change thenAccept to thenAcceptAsync -- all the following ones will execute their steps on the thread that is used for the accept.

Alternative Question: the thread that completed the future from thenCompose is the same one as was used for the compose.

You should think of the CompletionStages as steps, that are executed in rapid succession on the same thread (by just applying the functions in order), unless you specifically want the step to be executed on a different thread, using async. All next steps are done on that new thread then.

In your current setup the steps would be executed like this:

Thread-1: delete, accept, compose, complete

With the first accept async, it becomes:

Thread-1: delete
Thread-2: accept, compose, complete

As for your last question, about the same thread being used by your callers if they add additional steps -- I don't think there is much you can do about aside from not returning a CompletableFuture, but a normal Future.

Return a CompletableFuture without exposing executor thread

There is no feature to detach your completion from the execution of the dependent action. When the thread chaining the dependent action has already completed the registration and your executor’s thread completes the future, which other thread ought to execute the dependent action if no other executor was given?

Your approach of chaining another action with a different executor seems to be the best you can get. However, it’s important to note that in case of an exceptional completion, the exception gets propagated without evaluating functions passed to thenApply. This exception propagation could again lead to an exposure of the thread, if the caller chained an action like whenComplete, handle, or exceptionally.

On the other hand, you don’t need to specify a secondary executor, as you can use the async method without executor parameter, to get the default (common Fork/Join) pool.

So chaining .whenCompleteAsync((x,y) -> {}) is the best solution to your problem so far.



Related Topics



Leave a reply



Submit