Convert from List<Completablefuture> to Completablefuture<List>

Convert from ListCompletableFuture to CompletableFutureList

Use CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

A few comments on your implementation:

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);

com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));

return result;

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

Return a CompletableFuture containing a list of CompletableFutures

The following snippet shows the use of listOfFutures.stream().map(CompletableFuture::join) to collect the result of allOF. I have taken this example from this page that states that it wont wait for every Future to finish.

class Test {

public static void main(String[] args) throws Exception {

long millisBefore = System.currentTimeMillis();

List<String> strings = Arrays.asList("1","2", "3", "4", "5", "6", "7", "8");
List<CompletableFuture<String>> listOfFutures = strings.stream().map(Test::downloadWebPage).collect(toList());
CompletableFuture<List<String>> futureOfList = CompletableFuture
.allOf(listOfFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> listOfFutures.stream().map(CompletableFuture::join).collect(toList()));

System.out.println(futureOfList.get()); // blocks here
System.out.printf("time taken : %.4fs\n", (System.currentTimeMillis() - millisBefore)/1000d);
}

private static CompletableFuture<String> downloadWebPage(String webPageLink) {
return CompletableFuture.supplyAsync( () ->{
try { TimeUnit.SECONDS.sleep(4); }
catch (Exception io){ throw new RuntimeException(io); }
finally { return "downloaded : "+ webPageLink; }
});
}

}

Since efficiency seems to be the issue here, I have included a dummy benchmarck to prove it does not take 32 seconds to execute.

Output :

[downloaded : 1, downloaded : 2, downloaded : 3, downloaded : 4, downloaded : 5, downloaded : 6, downloaded : 7, downloaded : 8]
time taken : 8.0630s

EDIT from the original Question-Poster

Thanks to this answer, and through using this website (talks about exception handling related to allOf), I came up with this completed version:

    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {

/* Collecting the list of all the async requests that build a List<Event>. */
List<CompletableFuture<List<Event>>> completableFutures = eventsResearchApis.stream()
.map(api -> getFilteredEventsAsync(api, eventResearch))
.collect(Collectors.toList());

/* Creating a single Future that contains all the Futures we just created ("flatmap"). */
CompletableFuture<Void> allFutures =CompletableFuture.allOf(completableFutures
.toArray(new CompletableFuture[eventsResearchApis.size()]));

/* When all the Futures have completed, we join them to create merged List<Event>. */
CompletableFuture<List<Event>> allCompletableFutures = allFutures
.thenApply(future -> completableFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream) // creating a List<Event> from List<List<Event>>
.collect(Collectors.toList())
);

return allCompletableFutures;
}

private CompletableFuture<List<Event>> getFilteredEventsAsync(UniformEventsResearchApi api,
EventResearch eventResearch) {
/* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
return CompletableFuture.supplyAsync(() -> api.getFilteredEvents(eventResearch))
.exceptionally(ex -> {
LOGGER.error("Extraction of events from API went wrong: ", ex);
return Collections.emptyList(); // gets managed in the wrapping Future
});
}

How to convert CollectionCompletableFutureX to CompletableFutureCollectionX without calling join or get?

The solution is to

  1. Convert Collection<CompletableFuture<X>> to Collection<CompletableFuture<Collection<X>>>
  2. Reduce on this collection using CompletableFuture.completedFuture(Collections.emptyList())
    as initial value.
final CompletableFuture<Collection<X>> joined = 
futures
.stream()
.map(f -> f.thenApply(value -> (Collection<X>)Collections.singletonList(value)))
.reduce(
CompletableFuture.completedFuture(Collections.emptyList()),
(f, g) ->
f.thenCompose(
xs -> g.thenApply(
ys -> Stream
.of(xs, ys)
.flatMap(Collection::stream)
.collect(Collectors.toList()))
)
);

Why compiler in given me this cannot convert from CompletableFutureObject to CompletableFutureString

Your main clause is

CompletableFuture<String> allGen = loadFile1().thenApply(params1 -> {

});

So the specified function is supposed to return a String. But your code is trying to return a CompletableFuture<String>, as Stream.of(gen1, gen2) .map(CompletableFuture::join) .collect(joining(",")) produces a String and you’re using this expression in return CompletableFuture .allOf(gen1, gen2) .thenApply(r -> …);

The compiler error messages are often very unhelpful in cases of such type mismatches in generic code.

The simplest fix (with the smallest change) is to use thenCompose instead of thenAppy, allowing the function to return a CompletableFuture.

CompletableFuture<String> allGen = loadFile1().thenCompose(params1 -> {

CompletableFuture<String> gen1 = loadFile2().thenApply(params2 -> {
return generateResultFile1(params1, params2);
});

CompletableFuture<String> gen2 = loadFile3().thenApply(params3 -> {
return generateResultFile2(params1, params3);
});

return CompletableFuture.allOf(gen1, gen2)
.thenApply(r -> Stream.of(gen1, gen2)
.map(CompletableFuture::join).collect(joining(",")));
});

There are, however, opportunities to use simplified syntax

CompletableFuture<String> allGen = loadFile1().thenCompose(params1 -> {
CompletableFuture<String> gen1 = loadFile2()
.thenApply(params2 -> generateResultFile1(params1, params2));

CompletableFuture<String> gen2 = loadFile3()
.thenApply(params3 -> generateResultFile2(params1, params3));

return CompletableFuture.allOf(gen1, gen2)
.thenApply(r -> Stream.of(gen1, gen2)
.map(CompletableFuture::join).collect(joining(",")));
});

If the code is always combining exactly two results, you can use the even simpler:

CompletableFuture<String> allGen = loadFile1().thenCompose(params1 ->
loadFile2().thenApply(params2 -> generateResultFile1(params1, params2))
.thenCombine(
loadFile3().thenApply(params3 -> generateResultFile2(params1, params3)),
(s1, s2) -> String.join(",", s1, s2))
);

Despite the different nesting, loadFile2().thenApply(…) and loadFile3().thenApply(…) are still two independent operations and only the final (s1, s2) -> String.join(",", s1, s2) depends on both.

If you want to make this more obvious, keep the local variables

CompletableFuture<String> allGen = loadFile1().thenCompose(params1 -> {
CompletableFuture<String> gen1
= loadFile2().thenApply(params2 -> generateResultFile1(params1, params2));
CompletableFuture<String> gen2
= loadFile3().thenApply(params3 -> generateResultFile2(params1, params3));
return gen1.thenCombine(gen2, (s1, s2) -> s1 + "," + s2);
});

As shown in the last example, you may also replace String.join(",", s1, s2) with s1 + "," + s2 here. The latter will be slightly more efficient, but since it’s rather unlikely to dominate the overall performance, it’s rather a matter of taste.



Related Topics



Leave a reply



Submit