Transform Java Future into a Completablefuture

Transform Java Future into a CompletableFuture

There is a way, but you won't like it. The following method transforms a Future<T> into a CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}

Obviously, the problem with this approach is, that for each Future, a thread will be blocked to wait for the result of the Future--contradicting the idea of futures. In some cases, it might be possible to do better. However, in general, there is no solution without actively wait for the result of the Future.

Transform Scala Future into CompletableFuture

Use the scala-java8-compat library:

import java.util.concurrent.CompletableFuture
import scala.concurrent.Future
import scala.compat.java8.FutureConverters._

val scalaFuture = Future.apply("Hello")
val javaFuture: CompletableFuture[String] = scalaFuture.toJava.toCompletableFuture

Refactoring: Converting future.get() to CompletionStage.whenComplete()?

You can not avoid blocking for a method that is supposed to return the final result, i.e. the List<Object>. The only way to make this method non-blocking, is by returning a future for the final result.

Then, the most straight-forward way to deal with a list of futures, is to keep the loop, but perform it as a chained operation, when all futures are known to be completed, so querying the result won’t block.

for(List<Long> batch: batches) {
futureHolder.add(ask(batch));
}

return CompletableFuture.allOf(futureHolder.toArray(new CompletableFuture[0]))
.thenApply(justVoid -> {
List<Object> combinedStatus = new ArrayList<>();
for(CompletableFuture<Object> future: futureHolder) {
combinedStatus.add(future.join().result());
}
return combinedStatus;
});

The returned future will be completed exceptionally if any of the operation failed with an exception, so you don’t need to deal with exceptions within this code. The caller of you method can decide how to deal with exceptional results.

How do you transform a CompletableFuture of one type to another?

You're effectively trying to transform the completed value of your CompletableFuture into a value of type Void. Presumably you want to propagate any exception if that future was completed exceptionally.

CompletableFuture provides thenApply for this basic transformation, but other methods can also be used.

In your case, you'll want to ignore the value from the source future and return null, since null is the only possible value for the type Void. However, there needs to be some hint for the compiler that you're targeting the type Void.

Either be explicit by providing an explicit type argument to the invocation of thenApply

public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).<Void> thenApply(c -> null);
}

or be explicit by casting to the appropriate type in the lambda expression

public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).thenApply(c -> (Void) null);
}

Your solution achieves the same result, since the value is known to be of the correct type, but it involves an extra method invocation

@Override
public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).thenApply(c -> empty());
}

All of these solutions will propagate the exception, if any, of the origial CompletableFuture.

Thank you to Luis, you could also just use thenAccept with a Consumer doing nothing:

public CompletableFuture<Void> packetEncrypted(ByteBuffer engineToSocketData) {
return realChannel.write(engineToSocketData).thenAccept(c -> {}):
}

The behavior is the same for any other type. thenApply lets you perform any Function on the result of a CompletableFuture.

For example, I can have a future that's meant to complete with String that's meant to be converted to an Integer.

public static void main(String[] args) throws Exception {
CompletableFuture<String> futureLine = CompletableFuture.supplyAsync(() -> "1234");
CompletableFuture<Integer> theNumber = futureLine.thenApply(Integer::parseInt);
System.out.println(theNumber.get());
}

thenApply receives the completed value and transforms it by passing it to an invocation of Integer#parseInt(String). Since parseInt has a return type of int, the return type of thenApply is inferred to CompletableFuture<Integer>.

Best way to convert a Java Future to a Twitter Future

If your Java future is just a java.util.concurrent.Future, then what you have is basically the best you can do, because it's a very poor type. If you have a java.util.concurrent.CompletableFuture, you basically combine the answers to convert it to a scala.concurrent.Future and convert Scala future to a Twitter future:

import java.util.concurrent.{Future => JFuture, CompletableFuture}
import com.twitter.util.{Future => TwitterFuture, Promise => TwitterPromise}
import scala.util.{Success, Failure}

implicit class FutureWrapper[A](private val jf: JFuture[A]) extends AnyVal {
def asTwitter: TwitterFuture[A] = jf match {
case jf: CompletableFuture[A] =>
val promise = new TwitterPromise[A]()
jf.whenComplete { (result, exception) =>
if (exception != null)
promise.setException(exception)
else
promise.setValue(result)
}
promise
case _ =>
FuturePool.unboundedPool{ jf.get }
}
}

Convert from ListCompletableFuture to CompletableFutureList

Use CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

A few comments on your implementation:

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);

com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));

return result;

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

CompletableFuture: transformation vs. composition

Yes, the thenCompose and supplyAsync achieve the same as using thenApplyAsync directly.

I haven't read the book, but it might be that some example code is focused on some topic or feature rather than the most concise or fastest code. As such, I leave some suggestions assuming you're considering using similar code.


One more suggestion about this code is that it's kind of weird to chain each CompletableFuture through successive calls to map. It seems the current example was built on top of a previous Stream based method with multiple calls, and left as-is but with the use of CompletableFuture.

I prefer one single map and chaining each CompletableFuture directly, which also allows refactoring it out into a method of its own.

So this:

            .map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

Would become this:

            .map(shop ->
CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
.thenApply(Quote::parse)
.thenApplyAsync(Discount::applyDiscount, executor))

This lambda is easily turned into a method, which can be reused in cases without a Stream, it can be composed with another CompletableFuture, it can be tested, it can be mocked, etc.


Another suggestion would be to make your code asynchronous all the way, such that findPrices doesn't block on join (or get, for that matter).

The problem with blocking is that it might block the last available thread on an executor, thus provoking a deadlock by thread exhaustion. The asynchronous code on which your code depends on, that eventually needs to run on the executor, might never run.

public CompletableFuture<List<String>> findPricesAsync(String product) {
// List<CompletableFuture<String>> priceFutures = ...

CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
return all.thenRun(() -> priceFutures.stream()
.map(CompletableFuture::join));
}

Note that the return type changed from List<String> to CompletableFuture<List<String>>. Also note that the last call to join will not block, as every CompletableFuture on which it will be called has completed.


Finally, I tend to return CompletionStage, as it allows hypothetical implementations other than CompletableFuture. I also make the assumption that the returned object also implements Future, which allows to use get on the result, but not join, the difference being the declared thrown exception types.

In one case where I made NIO-like methods return CompletionStage for asynchronous I/O, I've implemented a subclass of CompletableFuture overriding the default executor used in each *Async method that doesn't have the executor parameter. This was made easier since Java 9, still by subclassing, but it only needs overriding defaultExecutor. The main reason I subclassed was that the alternative using composition would result in much more code (wrapping results and what not). Another reason, but not what really worried me, was having an extra object to be garbage collected for each instance.

This was just to demonstrate that there may be cases where having custom CompletionStage implementations is actually needed, which might or might not be subclasses of CompletableFuture.

How to convert the code to use CompletableFuture?

supplyAsync() expects a Supplier<U> and you are giving it a Callable.

The error message is telling you that the compiler has tried to find a type to use for U such that your SampleTask "is a" Supplier<U>, but it can't find one.

Java will implicitly "promote" a lambda to a functional interface such as Callable or Supplier. But it won't treat functional interfaces as interchangeable -- that is, you can't use a Callable where a Supplier is expected.

You can make a suitable lambda in-place:

SimpleTask task = new SimpleTask();
CompletableFuture.supplyAsync(() -> task.call());

Note that this works if SimpleTask's call() is:

 public Double call() {  // note no exception declared
return 0d;
}

The fact that SimpleTask happens to implement Callable is not relevant to the code above.


If you want this to work with an arbitrary Callable, or if you declare task as a Callable:

Callable callable = new SimpleTask();
CompletableFuture.supplyAsync(() -> callable.call());

... then you will get a compiler error about the uncaught exception. Your lambda will need to catch the exception and handle it (perhaps rethrowing as an unchecked exception, as described in other answers).


Or you could make SampleTask implement Supplier<Double>.


Part of the motivation for lambdas is that writing things like Callable was too verbose. So you might well leave out the intermediate class and go directly for:

CompleteableFuture<Double> future = CompletableFuture.supplyAsync(() -> 0d);

This applies for more complicated suppliers too:

CompleteableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
Foo foo = slowQuery();
return transformToDouble(foo);
});

Converting Java CompletableFuture to a Scala Future

Try

libraryDependencies += "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

which is the same as

libraryDependencies += "org.scala-lang.modules" % "scala-java8-compat_2.12" % "0.9.0"

Pay attention to %% or _2.12.

CompletionStage is a supertype of java.util.concurrent.CompletableFuture.
So if you have a CompletableFuture that's it.



Related Topics



Leave a reply



Submit