Transform Java Future into a CompletableFuture
There is a way, but you won't like it. The following method transforms a Future<T>
into a CompletableFuture<T>
:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
Obviously, the problem with this approach is, that for each Future, a thread will be blocked to wait for the result of the Future--contradicting the idea of futures. In some cases, it might be possible to do better. However, in general, there is no solution without actively wait for the result of the Future.
Transform Scala Future into CompletableFuture
Use the scala-java8-compat library:
import java.util.concurrent.CompletableFuture
import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
val scalaFuture = Future.apply("Hello")
val javaFuture: CompletableFuture[String] = scalaFuture.toJava.toCompletableFuture
Refactoring: Converting future.get() to CompletionStage.whenComplete()?
You can not avoid blocking for a method that is supposed to return the final result, i.e. the List<Object>
. The only way to make this method non-blocking, is by returning a future for the final result.
Then, the most straight-forward way to deal with a list of futures, is to keep the loop, but perform it as a chained operation, when all futures are known to be completed, so querying the result won’t block.
for(List<Long> batch: batches) {
futureHolder.add(ask(batch));
}
return CompletableFuture.allOf(futureHolder.toArray(new CompletableFuture[0]))
.thenApply(justVoid -> {
List<Object> combinedStatus = new ArrayList<>();
for(CompletableFuture<Object> future: futureHolder) {
combinedStatus.add(future.join().result());
}
return combinedStatus;
});
The returned future will be completed exceptionally if any of the operation failed with an exception, so you don’t need to deal with exceptions within this code. The caller of you method can decide how to deal with exceptional results.
How do you transform a CompletableFuture of one type to another?
You're effectively trying to transform the completed value of your CompletableFuture
into a value of type Void
. Presumably you want to propagate any exception if that future was completed exceptionally.
CompletableFuture
provides thenApply
for this basic transformation, but other methods can also be used.
In your case, you'll want to ignore the value from the source future and return null
, since null
is the only possible value for the type Void
. However, there needs to be some hint for the compiler that you're targeting the type Void
.
Either be explicit by providing an explicit type argument to the invocation of thenApply
public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).<Void> thenApply(c -> null);
}
or be explicit by casting to the appropriate type in the lambda expression
public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).thenApply(c -> (Void) null);
}
Your solution achieves the same result, since the value is known to be of the correct type, but it involves an extra method invocation
@Override
public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).thenApply(c -> empty());
}
All of these solutions will propagate the exception, if any, of the origial CompletableFuture
.
Thank you to Luis, you could also just use thenAccept
with a Consumer
doing nothing:
public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).thenAccept(c -> {}):
}
The behavior is the same for any other type. thenApply
lets you perform any Function
on the result of a CompletableFuture
.
For example, I can have a future that's meant to complete with String
that's meant to be converted to an Integer
.
public static void main(String[] args) throws Exception {
CompletableFuture<String> futureLine = CompletableFuture.supplyAsync(() -> "1234");
CompletableFuture<Integer> theNumber = futureLine.thenApply(Integer::parseInt);
System.out.println(theNumber.get());
}
thenApply
receives the completed value and transforms it by passing it to an invocation of Integer#parseInt(String)
. Since parseInt
has a return type of int
, the return type of thenApply
is inferred to CompletableFuture<Integer>
.
Best way to convert a Java Future to a Twitter Future
If your Java future is just a java.util.concurrent.Future
, then what you have is basically the best you can do, because it's a very poor type. If you have a java.util.concurrent.CompletableFuture
, you basically combine the answers to convert it to a scala.concurrent.Future
and convert Scala future to a Twitter future:
import java.util.concurrent.{Future => JFuture, CompletableFuture}
import com.twitter.util.{Future => TwitterFuture, Promise => TwitterPromise}
import scala.util.{Success, Failure}
implicit class FutureWrapper[A](private val jf: JFuture[A]) extends AnyVal {
def asTwitter: TwitterFuture[A] = jf match {
case jf: CompletableFuture[A] =>
val promise = new TwitterPromise[A]()
jf.whenComplete { (result, exception) =>
if (exception != null)
promise.setException(exception)
else
promise.setValue(result)
}
promise
case _ =>
FuturePool.unboundedPool{ jf.get }
}
}
Convert from ListCompletableFuture to CompletableFutureList
Use CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
A few comments on your implementation:
Your use of .thenComposeAsync
, .thenApplyAsync
and .thenCombineAsync
is likely not doing what you expect. These ...Async
methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync
methods without a good reason.
Additionally, reduce
should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect
instead.
If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence
method:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow();
right after result.completeExceptionally(ex);
. This, of course, assumes that exec
only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future
individually.
CompletableFuture: transformation vs. composition
Yes, the thenCompose
and supplyAsync
achieve the same as using thenApplyAsync
directly.
I haven't read the book, but it might be that some example code is focused on some topic or feature rather than the most concise or fastest code. As such, I leave some suggestions assuming you're considering using similar code.
One more suggestion about this code is that it's kind of weird to chain each CompletableFuture
through successive calls to map
. It seems the current example was built on top of a previous Stream
based method with multiple calls, and left as-is but with the use of CompletableFuture
.
I prefer one single map
and chaining each CompletableFuture
directly, which also allows refactoring it out into a method of its own.
So this:
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))
Would become this:
.map(shop ->
CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
.thenApply(Quote::parse)
.thenApplyAsync(Discount::applyDiscount, executor))
This lambda is easily turned into a method, which can be reused in cases without a Stream
, it can be composed with another CompletableFuture
, it can be tested, it can be mocked, etc.
Another suggestion would be to make your code asynchronous all the way, such that findPrices
doesn't block on join
(or get
, for that matter).
The problem with blocking is that it might block the last available thread on an executor, thus provoking a deadlock by thread exhaustion. The asynchronous code on which your code depends on, that eventually needs to run on the executor, might never run.
public CompletableFuture<List<String>> findPricesAsync(String product) {
// List<CompletableFuture<String>> priceFutures = ...
CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
return all.thenRun(() -> priceFutures.stream()
.map(CompletableFuture::join));
}
Note that the return type changed from List<String>
to CompletableFuture<List<String>>
. Also note that the last call to join
will not block, as every CompletableFuture
on which it will be called has completed.
Finally, I tend to return CompletionStage
, as it allows hypothetical implementations other than CompletableFuture
. I also make the assumption that the returned object also implements Future
, which allows to use get
on the result, but not join
, the difference being the declared thrown exception types.
In one case where I made NIO-like methods return CompletionStage
for asynchronous I/O, I've implemented a subclass of CompletableFuture
overriding the default executor used in each *Async
method that doesn't have the executor parameter. This was made easier since Java 9, still by subclassing, but it only needs overriding defaultExecutor
. The main reason I subclassed was that the alternative using composition would result in much more code (wrapping results and what not). Another reason, but not what really worried me, was having an extra object to be garbage collected for each instance.
This was just to demonstrate that there may be cases where having custom CompletionStage
implementations is actually needed, which might or might not be subclasses of CompletableFuture
.
How to convert the code to use CompletableFuture?
supplyAsync()
expects a Supplier<U>
and you are giving it a Callable
.
The error message is telling you that the compiler has tried to find a type to use for U
such that your SampleTask
"is a" Supplier<U>
, but it can't find one.
Java will implicitly "promote" a lambda to a functional interface such as Callable
or Supplier
. But it won't treat functional interfaces as interchangeable -- that is, you can't use a Callable
where a Supplier
is expected.
You can make a suitable lambda in-place:
SimpleTask task = new SimpleTask();
CompletableFuture.supplyAsync(() -> task.call());
Note that this works if SimpleTask
's call()
is:
public Double call() { // note no exception declared
return 0d;
}
The fact that SimpleTask
happens to implement Callable
is not relevant to the code above.
If you want this to work with an arbitrary Callable
, or if you declare task
as a Callable
:
Callable callable = new SimpleTask();
CompletableFuture.supplyAsync(() -> callable.call());
... then you will get a compiler error about the uncaught exception. Your lambda will need to catch the exception and handle it (perhaps rethrowing as an unchecked exception, as described in other answers).
Or you could make SampleTask
implement Supplier<Double>
.
Part of the motivation for lambdas is that writing things like Callable
was too verbose. So you might well leave out the intermediate class and go directly for:
CompleteableFuture<Double> future = CompletableFuture.supplyAsync(() -> 0d);
This applies for more complicated suppliers too:
CompleteableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
Foo foo = slowQuery();
return transformToDouble(foo);
});
Converting Java CompletableFuture to a Scala Future
Try
libraryDependencies += "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"
which is the same as
libraryDependencies += "org.scala-lang.modules" % "scala-java8-compat_2.12" % "0.9.0"
Pay attention to %%
or _2.12
.
CompletionStage
is a supertype of java.util.concurrent.CompletableFuture
.
So if you have a CompletableFuture
that's it.
Related Topics
Junit Confusion: Use 'Extends Testcase' or '@Test'
How to Compile and Deploy a Java Class at Runtime
Java 8, Streams to Find the Duplicate Elements
How to Load/Reference a File as a File Instance from the Classpath
Convert Arraylist into 2D Array Containing Varying Lengths of Arrays
Painted Content Invisible While Resizing in Java
How Does Java Decide When to Import
Java Desktop Application: Swt VS. Swing
How to Obtain the Last Path Segment of a Uri
Create MySQL Database from Java
Peer Not Authenticated While Importing Gradle Project in Eclipse
Should a "Static Final Logger" Be Declared in Upper-Case
How to Get the Current Date and Time of Your Timezone in Java
Integer Arithmetic in Java with Char and Integer Literal
Last Row Always Removed from Defaulttablemodel, Regardless of Index