Implementing Future::Then() Equivalent for Asynchronous Execution in C++11

c++11 async continuations or attempt at .then() semantics

I find 3 problems with the above implemention:

  • It will only work if you pass std::shared_future as Fut.
  • The continuation might want a chance to handle exceptions.
  • It will not always behave as expected, since if you do not specify std::launch::async it might be deferred, thus the continuation is not invoked as one would expect.

I've tried to address these:

template<typename F, typename W, typename R>
struct helper
{
F f;
W w;

helper(F f, W w)
: f(std::move(f))
, w(std::move(w))
{
}

helper(const helper& other)
: f(other.f)
, w(other.w)
{
}

helper(helper&& other)
: f(std::move(other.f))
, w(std::move(other.w))
{
}

helper& operator=(helper other)
{
f = std::move(other.f);
w = std::move(other.w);
return *this;
}

R operator()()
{
f.wait();
return w(std::move(f));
}
};

}

template<typename F, typename W>
auto then(F f, W w) -> std::future<decltype(w(F))>
{
return std::async(std::launch::async, detail::helper<F, W, decltype(w(f))>(std::move(f), std::move(w)));
}

Used like this:

std::future<int> f = foo();

auto f2 = then(std::move(f), [](std::future<int> f)
{
return f.get() * 2;
});

Creating Task Then function(like std::future.then)

You could wait on a condition variable. Make it a member of the task class and signal it after setting the function.

c++11 async continuations or attempt at .then() semantics

I find 3 problems with the above implemention:

  • It will only work if you pass std::shared_future as Fut.
  • The continuation might want a chance to handle exceptions.
  • It will not always behave as expected, since if you do not specify std::launch::async it might be deferred, thus the continuation is not invoked as one would expect.

I've tried to address these:

template<typename F, typename W, typename R>
struct helper
{
F f;
W w;

helper(F f, W w)
: f(std::move(f))
, w(std::move(w))
{
}

helper(const helper& other)
: f(other.f)
, w(other.w)
{
}

helper(helper&& other)
: f(std::move(other.f))
, w(std::move(other.w))
{
}

helper& operator=(helper other)
{
f = std::move(other.f);
w = std::move(other.w);
return *this;
}

R operator()()
{
f.wait();
return w(std::move(f));
}
};

}

template<typename F, typename W>
auto then(F f, W w) -> std::future<decltype(w(F))>
{
return std::async(std::launch::async, detail::helper<F, W, decltype(w(f))>(std::move(f), std::move(w)));
}

Used like this:

std::future<int> f = foo();

auto f2 = then(std::move(f), [](std::future<int> f)
{
return f.get() * 2;
});

How does a C++ compiler choose between deferred and async execution for std::async?

Compiler vendors basically all chose to make "pick one" mean "deferred".

This sucks.

They are free to do any logic they choose. They chose to make their logic be "always defer".

c++ futures/promises like javascript?

A .then function for std::future has been proposed for the upcoming C++17 standard.

Boost's implementation of future (which is compliant with the current standard, but provides additional features as extensions) already provides parts of that functionality in newer versions (1.53 or newer).

For a more well-established solution, take a look at the Boost.Asio library, which does allow easy implementation of asynchronous control flows as provided by future.then. Asio's concept is slightly more complicated, as it requires access to a central io_service object for dispatching asynchronous callbacks and requires manual management of worker threads. But in principle this is a very good match for what you asked for.

Does a thread really start after std::future::get() is called?

Because you are using std::launch::async, it's up to std::async to determine how to schedule your requests. According to cppreference.com:

The template function async runs the function f asynchronously (potentially in a separate thread which may be part of a thread pool) and returns a std::future that will eventually hold the result of that function call.

It does guarantee that they will be threaded, however, and you can infer that the evaluation of your lambda will be scheduled to happen at the next available opportunity:

If the async flag is set (i.e. policy & std::launch::async != 0), then async executes the callable object f on a new thread of execution (with all thread-locals initialized) as if spawned by std::thread(std::forward<F>(f), std::forward<Args>(args)...), except that if the function f returns a value or throws an exception, it is stored in the shared state accessible through the std::future that async returns to the caller.

For the purposes of your question, however, you just wanted to know when it's executed in relation to your call to get. It's easy to demonstrate that get has nothing to do with the execution of async tasks when launched with std::launch::async:

#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <chrono>

using namespace std;

int main() {
auto start = chrono::steady_clock::now();
auto timestamp = [start]( ostream & s )->ostream& {
auto now = chrono::steady_clock::now();
auto elapsed = chrono::duration_cast<chrono::microseconds>(now - start);
return s << "[" << elapsed.count() << "us] ";
};

vector<future<int>> futures;
for( int i = 0; i < 5; i++ )
{
futures.emplace_back( async(launch::async,
[=](){
timestamp(cout) << "Launch " << i << endl;
return i;
} ) );
}

this_thread::sleep_for( chrono::milliseconds(100) );

for( auto & f : futures ) timestamp(cout) << "Get " << f.get() << endl;

return 0;
}

Output (live example here):

[42us] Launch 4
[85us] Launch 3
[95us] Launch 2
[103us] Launch 1
[109us] Launch 0
[100134us] Get 0
[100158us] Get 1
[100162us] Get 2
[100165us] Get 3
[100168us] Get 4

These operations are trivial, but if you have long-running tasks then you can expect that some or all of those tasks might still be executing when you call std::future<T>::get(). In that case, your thread will be suspended until the promise associated with that future is satisfied. Also, because the async tasks may be pooled it's possible that some will not begin evaluation until after others have completed.

If you use instead std::launch::deferred, then you will get lazy evaluation on the calling thread, and so the output would be something like:

[100175us] Launch 0
[100323us] Get 0
[100340us] Launch 1
[100352us] Get 1
[100364us] Launch 2
[100375us] Get 2
[100386us] Launch 3
[100397us] Get 3
[100408us] Launch 4
[100419us] Get 4
[100430us] Launch 5

Creating a future from intermediate futures?

It occurred to me that I can use std::async with the deferred launch policy to compose the final object:

std::future<Item> get_item()
{
// start async creation of component
// (using shared_future to make it copyable)
std::shared_future<Component> component = get_component();

// deferred launch policy can be used for construction of the final object
return std::async(std::launch::deferred, [=]() {
return Item(component.get());
});
}


Related Topics



Leave a reply



Submit