Waiting for Multiple Futures

Waiting for multiple futures?

Thread support in C++11 was just a first pass, and while std::future rocks, it does not support multiple waiting as yet.

You can fake it relatively inefficiently, however. You end up creating a helper thread for each std::future (ouch, very expensive), then gathering their "this future is ready" into a synchronized many-producer single-consumer message queue, then setting up a consumer task that dispatches the fact that a given std::future is ready.

The std::future in this system doesn't add much functionality, and having tasks that directly state that they are ready and sticks their result into the above queue would be more efficient. If you go this route, you could write wrapper that match the pattern of std::async or std::thread, and return a std::future like object that represents a queue message. This basically involves reimplementing a chunk of the the concurrency library.

If you want to stay with std::future, you could create shared_futures, and have each dependent task depend on the set of shared_futures: ie, do it without a central scheduler. This doesn't permit things like abort/shutdown messages, which I consider essential for a robust multi threaded task system.

Finally, you can wait for C++2x, or whenever the concurrency TS is folded into the standard, to solve the problem for you.

Future.wait() for multiple futures

You can use Future.wait to wait for several Future to be completed.

body: FutureBuilder<List<FlashCardList>>(
future: Future.wait([
fetchFlashCardList(),
fetchFlashCardListFromDB(),
]),

Dart Future.wait for multiple futures and get back results of different types

You need to adapt each of your Future<T>s to a common type of Future. You could use Future<void> and assign the results instead of relying on return values:

late List<Foo> foos;
late List<Bar> bars;
late List<FooBars> foobars;

await Future.wait<void>([
downloader.getFoos().then((result) => foos = result),
downloader.getBars().then((result) => bars = result),
downloader.getFooBars().then((result) => foobars = result),
]);

processData(foos, bars, foobars);

Or if you prefer await to .then(), the Future.wait call could be:

await Future.wait<void>([
(() async => foos = await downloader.getFoos())(),
(() async => bars = await downloader.getBars())(),
(() async => foobars = await downloader.getFooBars())(),
]);

How can I await multiple futures, at once, that are nested inside a map?

One reason to complete all the futures immediately would be that they already exist, and if you don't await all of them immediately, one of them might complete with an (unhandled!) error, which is bad.

The standard provided way to wait for more than one future at a time is Future.wait, which takes an Iterable<Future<X>> and returns a Future<List<X>>.

That will directly help you with the individual lists, but then you'll have a Future<List<Foo>> per key in the map. You'll have to convert those to a list too.
So, maybe something like:

Future<Map<String, List<Foo>>> waitAll(
Map<String, Iterable<Future<Foo>>> map) async =>
Map.fromIterables(map.keys.toList(), await Future.wait(map.values.map(Future.wait));

Waiting on multiple futures borrowing mutable self

You are not allowed to have multiple mutable references to an object and there's a good reason for that.
Imagine you pass an object mutably to 2 different functions and they edited the object out of sync since you don't have any mechanism for that in place. then you'd end up with something called a race condition.

To prevent this bug rust allows only one mutable reference to an object at a time but you can have multiple immutable references and often you see people use internal mutability patterns.

In your case, you want data not to be able to be modified by 2 different threads at the same time so you'd wrap it in a Lock or RwLock then since you want multiple threads to be able to own this value you'd wrap that in an Arc.

here you can read about interior mutability in more detail.

Alternatively, while declaring the type of your function you could add proper lifetimes to indicate the resulting Future will be waited on in the same context by giving it a lifetime since your code waits for the future before the next iteration that would do the trick as well.

Waiting on a list of Future

You can use a CompletionService to receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}

I think you can further improve to cancel any still executing tasks if one of them throws an error.

How to wait for several Futures?

You could use a for-comprehension as follows instead:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

In this example, futures 1, 2 and 3 are kicked off in parallel. Then, in the for comprehension, we wait until the results 1 and then 2 and then 3 are available. If either 1 or 2 fails, we will not wait for 3 anymore. If all 3 succeed, then the aggFut val will hold a tuple with 3 slots, corresponding to the results of the 3 futures.

Now if you need the behavior where you want to stop waiting if say fut2 fails first, things get a little trickier. In the above example, you would have to wait for fut1 to complete before realizing fut2 failed. To solve that, you could try something like this:

  val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}

def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)

fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)

case Success(value) =>
processFutures(futures - value, value :: values, prom)

case Failure(ex) => prom.failure(ex)
}
prom.future
}

val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}

Now this works correctly, but the issue comes from knowing which Future to remove from the Map when one has been successfully completed. As long as you have some way to properly correlate a result with the Future that spawned that result, then something like this works. It just recursively keeps removing completed Futures from the Map and then calling Future.firstCompletedOf on the remaining Futures until there are none left, collecting the results along the way. It's not pretty, but if you really need the behavior you are talking about, then this, or something similar could work.



Related Topics



Leave a reply



Submit