Waiting on a List of Future

Waiting on a list of Future

You can use a CompletionService to receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}

I think you can further improve to cancel any still executing tasks if one of them throws an error.

Wait for every Future in a List Future to be done

Your code can be simplified a lot. An equivalent version could be written as follows, unless you have requirements that you didn't specify in the question.

List<Principal> users = // fill users
List<Future<UserRecord>> futures = getAllTheFutures(users);
List<UserRecord> results = new ArrayList<>();

for (int i = 0; i < futures.size(); i++) {
try {
results.add(futures.get(i).get());
} catch (InterruptedException | ExecutionException e) {
// Handle appropriately, results.add(null) or just leave it out
}
}
}

How to use a loop to give `Future.wait()` a list o Futures

Why can't you just use a for loop and wait 1 second in between to not exceed the rate limit.

for(int i = 1; i <= 4; i++){
await Future.delayed(const Duration(seconds: 1));
await getPartNumber(i).then((value) => addPartToList(value.data));
}

This also ensures that no more than one request per second comes in, as the time in between is waited.
Or if you want the waiting time to be handled the same way as in your code, the following also works:

await for(var i in Stream.periodic(const Duration(seconds: 1), (i)=>i).take(5)){
if(i == 0) continue;
await getPartNumber(i).then((value) => addPartToList(value.data));
}

Or if you really want to do it with Future.wait(), which I personally don't like, here's how to do it:

await Future.wait(
Iterable.generate(4, (i)=>Future.delayed(const Duration(seconds: i + 1))
.then((value) => getPartNumber(i + 1))
.then((value) => addPartToList(value.data))
);

or

await Future.wait([
for(int i = 1; i <= 4; i++)
Future.delayed(const Duration(seconds: i))
.then((value) => getPartNumber(i)
.then((value) => addPartToList(value.data))
]);

How to await a future in a List.map() function and return a List of strings

Use Future.wait to wait for an iterable of futures to complete:

Future<List<String>> getAllDownloadUrlByDirectory(String item_id) async {
List<String> links = List.empty(growable: true);

Reference reference = storage.ref().child('item').child(item_id);
ListResult results = await reference.listAll();

final futures = results.items.map((e) {
return e.getDownloadURL();
});

return Future.wait(futures);
}

You should also remove the unnecessary async statement in your map call.

Get all the Futures at the same time from a list

Await termination of your executor service

Your comments indicate that your real goal is to wait until all your submitted tasks have been completed (or cancelled).

You said:

I want somehow to call get() after all the Futures are completed

A Future does not get completed. A Future represents:

  • The status of a Callable/Runnable that may have yet to start execution, may be currently executing, or may have ended its execution.
  • The result of a Callable object's work, returned by its call method. The result payload on the Future` is empty until the task has completed and returned an object.

To wait until all the submitted tasks are done, you simply need to call shutdown and awaitTermination on your executor service.

And by the way, you can submit a collection of Callable objects to your executor service, if that suits your situation. You will get back a collection of Future objects. Getting those Future objects does not mean the tasks are done, it means the tasks were successfully submitted to an executor service. By the time you get a Future back, the task may or may not have started execution.

Here is some example code.

Set up a collection of Callable objects. Our example here returns a UUID.

int countTasks = 10;
List < Callable < UUID > > tasks = new ArrayList <>( countTasks );
for ( int i = 0 ; i < countTasks ; i++ )
{
Callable < UUID > c = ( ) -> { return UUID.randomUUID(); };
tasks.add( c );
}

Set up an executor service to perform our tasks on background threads. Establish a List to be filled with Future objects representing each of our task’s work.

ExecutorService executorService = Executors.newCachedThreadPool();
List < Future < UUID > > futures = null;

Ask our executor service to perform those tasks. The service collects the instantiated Future objects, returning to us the collection.

try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }

We immediately ask the executor service to go into shutdown mode. This stops further tasks from being submitted, but allows already-submitted tasks to continue. We then tell the executor service to wait a certain amount of time for all submitted tasks to complete. We check to see if the termination completed or timed-out.

executorService.shutdown();
try
{
boolean terminationComleted = executorService.awaitTermination( 1 , TimeUnit.MINUTES );
if ( ! terminationComleted ) System.out.println( "ERROR - Submitted tasks not completed before time-out. " + Instant.now() );
}
catch ( InterruptedException e ) { e.printStackTrace(); }

The call to awaitTermination blocks until either all tasks are done or the time-out is reached. So beyond that call means the executor service is shutdown. So we can now examine our collection of Future objects to see the fruits of our threaded work.

// At this point all submitted tasks are done, and the executor service has ended.
for ( Future < UUID > future : futures )
{
if ( future.isCancelled() )
{
System.out.println( "Oops, task was cancelled. " );
} else
{
try { System.out.println( "future = " + future + " | " + future.get() ); } catch ( InterruptedException e ) { e.printStackTrace(); }catch ( ExecutionException e ) { e.printStackTrace(); }
}
}

Same example, in a complete class, ready to run.

package work.basil.demo.threads;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

public class App4
{
public static void main ( String[] args )
{
App4 app = new App4();
app.demo();
}

private void demo ( )
{
System.out.println( "INFO - Starting method `demo`. " + Instant.now() );

int countTasks = 10;
List < Callable < UUID > > tasks = new ArrayList <>( countTasks );
for ( int i = 0 ; i < countTasks ; i++ )
{
Callable < UUID > c = ( ) -> { return UUID.randomUUID(); };
tasks.add( c );
}

ExecutorService executorService = Executors.newCachedThreadPool();
List < Future < UUID > > futures = null;
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }

executorService.shutdown();
try
{
boolean terminationComleted = executorService.awaitTermination( 1 , TimeUnit.MINUTES );
if ( ! terminationComleted ) System.out.println( "ERROR - Submitted tasks not completed before time-out. " + Instant.now() );
}
catch ( InterruptedException e ) { e.printStackTrace(); }
// At this point all submitted tasks are done, and the executor service has ended.
for ( Future < UUID > future : futures )
{
if ( future.isCancelled() )
{
System.out.println( "Oops, task was cancelled. " );
} else
{
try { System.out.println( "future = " + future + " | " + future.get() ); } catch ( InterruptedException e ) { e.printStackTrace(); }catch ( ExecutionException e ) { e.printStackTrace(); }
}
}

System.out.println( "INFO - Starting method `demo`. " + Instant.now() );
}
}

You asked:

How can I make that all the futures to be executed and completed at the same time ?

You cannot.

The first rule of threaded work is that you do not control the threads. When a thread is scheduled for execution on a core, and for how long that execution runs before suspension, all depends on the JVM, the host OS, and the momentary runtime conditions.

Any of your threads may be started at any time, suspended at any time, and completed at any time. The threads may complete in any order at any time.

You said:

I want to be able to call the get() function for all the Futures

You can get all the Future objects as a collection, as shown in the code above. But each Future represents the task to be executed, not the fact that it has been executed. Hence the name Future.

You can ask each Future object if its task is done or not by calling Future#isDone. If the task has not yet begun execution, you get false. If the task is underway (either actively executing on a core or suspended), you get false. If the task has completed its work, been cancelled, or failed from a thrown exception, then you get true.

How to wait for list of `Future`s created using different `ExecutorServices`

Per Louis' comment, what I was looking for was Futures.successfulAsList

This allows me to wait for all to complete and then check for any futures that failed.

Guava RULES!

How to wait until list is not empty? - Flutter futures

For this you've to use a FutureBuilder widget

FutureBuilder(
future: createIpcList(resp),
builder: (context, snapshot){
if (snapshot.hasData)
return ResultList(formattedData: snapshot.data);
}
);

If you want you can show a loading indicator while the data is being fethced from the server.



Related Topics



Leave a reply



Submit