Convert from ListCompletableFuture to CompletableFutureList
Use CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
A few comments on your implementation:
Your use of .thenComposeAsync
, .thenApplyAsync
and .thenCombineAsync
is likely not doing what you expect. These ...Async
methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync
methods without a good reason.
Additionally, reduce
should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect
instead.
If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence
method:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow();
right after result.completeExceptionally(ex);
. This, of course, assumes that exec
only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future
individually.
Return a CompletableFuture containing a list of CompletableFutures
The following snippet shows the use of listOfFutures.stream().map(CompletableFuture::join)
to collect the result of allOF
. I have taken this example from this page that states that it wont wait for every Future to finish.
class Test {
public static void main(String[] args) throws Exception {
long millisBefore = System.currentTimeMillis();
List<String> strings = Arrays.asList("1","2", "3", "4", "5", "6", "7", "8");
List<CompletableFuture<String>> listOfFutures = strings.stream().map(Test::downloadWebPage).collect(toList());
CompletableFuture<List<String>> futureOfList = CompletableFuture
.allOf(listOfFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> listOfFutures.stream().map(CompletableFuture::join).collect(toList()));
System.out.println(futureOfList.get()); // blocks here
System.out.printf("time taken : %.4fs\n", (System.currentTimeMillis() - millisBefore)/1000d);
}
private static CompletableFuture<String> downloadWebPage(String webPageLink) {
return CompletableFuture.supplyAsync( () ->{
try { TimeUnit.SECONDS.sleep(4); }
catch (Exception io){ throw new RuntimeException(io); }
finally { return "downloaded : "+ webPageLink; }
});
}
}
Since efficiency seems to be the issue here, I have included a dummy benchmarck to prove it does not take 32 seconds to execute.
Output :
[downloaded : 1, downloaded : 2, downloaded : 3, downloaded : 4, downloaded : 5, downloaded : 6, downloaded : 7, downloaded : 8]
time taken : 8.0630s
EDIT from the original Question-Poster
Thanks to this answer, and through using this website (talks about exception handling related to allOf
), I came up with this completed version:
public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
/* Collecting the list of all the async requests that build a List<Event>. */
List<CompletableFuture<List<Event>>> completableFutures = eventsResearchApis.stream()
.map(api -> getFilteredEventsAsync(api, eventResearch))
.collect(Collectors.toList());
/* Creating a single Future that contains all the Futures we just created ("flatmap"). */
CompletableFuture<Void> allFutures =CompletableFuture.allOf(completableFutures
.toArray(new CompletableFuture[eventsResearchApis.size()]));
/* When all the Futures have completed, we join them to create merged List<Event>. */
CompletableFuture<List<Event>> allCompletableFutures = allFutures
.thenApply(future -> completableFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream) // creating a List<Event> from List<List<Event>>
.collect(Collectors.toList())
);
return allCompletableFutures;
}
private CompletableFuture<List<Event>> getFilteredEventsAsync(UniformEventsResearchApi api,
EventResearch eventResearch) {
/* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
return CompletableFuture.supplyAsync(() -> api.getFilteredEvents(eventResearch))
.exceptionally(ex -> {
LOGGER.error("Extraction of events from API went wrong: ", ex);
return Collections.emptyList(); // gets managed in the wrapping Future
});
}
How to convert CollectionCompletableFutureX to CompletableFutureCollectionX without calling join or get?
The solution is to
- Convert
Collection<CompletableFuture<X>>
toCollection<CompletableFuture<Collection<X>>>
- Reduce on this collection using
CompletableFuture.completedFuture(Collections.emptyList())
as initial value.
final CompletableFuture<Collection<X>> joined =
futures
.stream()
.map(f -> f.thenApply(value -> (Collection<X>)Collections.singletonList(value)))
.reduce(
CompletableFuture.completedFuture(Collections.emptyList()),
(f, g) ->
f.thenCompose(
xs -> g.thenApply(
ys -> Stream
.of(xs, ys)
.flatMap(Collection::stream)
.collect(Collectors.toList()))
)
);
Why compiler in given me this cannot convert from CompletableFutureObject to CompletableFutureString
Your main clause is
CompletableFuture<String> allGen = loadFile1().thenApply(params1 -> {
…
});
So the specified function is supposed to return a String
. But your code is trying to return a CompletableFuture<String>
, as Stream.of(gen1, gen2) .map(CompletableFuture::join) .collect(joining(","))
produces a String
and you’re using this expression in return CompletableFuture .allOf(gen1, gen2) .thenApply(r -> …);
The compiler error messages are often very unhelpful in cases of such type mismatches in generic code.
The simplest fix (with the smallest change) is to use thenCompose
instead of thenAppy
, allowing the function to return a CompletableFuture
.
CompletableFuture<String> allGen = loadFile1().thenCompose(params1 -> {
CompletableFuture<String> gen1 = loadFile2().thenApply(params2 -> {
return generateResultFile1(params1, params2);
});
CompletableFuture<String> gen2 = loadFile3().thenApply(params3 -> {
return generateResultFile2(params1, params3);
});
return CompletableFuture.allOf(gen1, gen2)
.thenApply(r -> Stream.of(gen1, gen2)
.map(CompletableFuture::join).collect(joining(",")));
});
There are, however, opportunities to use simplified syntax
CompletableFuture<String> allGen = loadFile1().thenCompose(params1 -> {
CompletableFuture<String> gen1 = loadFile2()
.thenApply(params2 -> generateResultFile1(params1, params2));
CompletableFuture<String> gen2 = loadFile3()
.thenApply(params3 -> generateResultFile2(params1, params3));
return CompletableFuture.allOf(gen1, gen2)
.thenApply(r -> Stream.of(gen1, gen2)
.map(CompletableFuture::join).collect(joining(",")));
});
If the code is always combining exactly two results, you can use the even simpler:
CompletableFuture<String> allGen = loadFile1().thenCompose(params1 ->
loadFile2().thenApply(params2 -> generateResultFile1(params1, params2))
.thenCombine(
loadFile3().thenApply(params3 -> generateResultFile2(params1, params3)),
(s1, s2) -> String.join(",", s1, s2))
);
Despite the different nesting, loadFile2().thenApply(…)
and loadFile3().thenApply(…)
are still two independent operations and only the final (s1, s2) -> String.join(",", s1, s2)
depends on both.
If you want to make this more obvious, keep the local variables
CompletableFuture<String> allGen = loadFile1().thenCompose(params1 -> {
CompletableFuture<String> gen1
= loadFile2().thenApply(params2 -> generateResultFile1(params1, params2));
CompletableFuture<String> gen2
= loadFile3().thenApply(params3 -> generateResultFile2(params1, params3));
return gen1.thenCombine(gen2, (s1, s2) -> s1 + "," + s2);
});
As shown in the last example, you may also replace String.join(",", s1, s2)
with s1 + "," + s2
here. The latter will be slightly more efficient, but since it’s rather unlikely to dominate the overall performance, it’s rather a matter of taste.
Related Topics
Garbage Collector in Java - Set an Object Null
Are "While(True)" Loops So Bad
Return Generated PDF Using Spring MVC
Instanceof - Incompatible Conditional Operand Types
How to Get Data Between Quotes in Java
Installed Java 7 on MAC Os X But Terminal Is Still Using Version 6
Isn't the Size of Character in Java 2 Bytes
Why Is Class.Newinstance() "Evil"
How to Create Sparksession with Hive Support (Fails with "Hive Classes Are Not Found")
Converting a Java Keystore into Pem Format
Java Dynamic Binding and Method Overriding
Embed a Jre in a Windows Executable
What Is the Breakdown for Java's Lambda Syntax
How to Avoid Constructor Code Redundancy in Java