Boost Thread Throwing Exception "Thread_Resource_Error: Resource Temporarily Unavailable"

boost thread throwing exception thread_resource_error: resource temporarily unavailable

There's a limit on the number of threads you can create per process.

On linux, for example,

cat /proc/sys/kernel/threads-max

tells you the current maximum. The default is the number of memory pages/4, so on my system it's 513785, but it may be much much lower on another box. E.g. on my mail server box (512mb RAM) it's only 7295.

You could the limit. But in fact that will be useless because the OS can't schedule them effectively. So, instead, try using a thread pool.

Oh. PS. detach()-ing he threads will help (a lot) with conserving resources. pthreads might be blocking thread creation well before the OS limit is reached because it needs to allocate overhead tracking the active threads. detach frees those up (and removes the error of not joining all threads before program exit).

UPDATE Crazy friday bonus: a thread pool that auto-scales to the number of cores your system has:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

boost::atomic_size_t counter(0ul);

class thread_pool
{
private:
mutex mx;
condition_variable cv;

typedef function<void()> job_t;
std::deque<job_t> _queue;

thread_group pool;

boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}

public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}

void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));

cv.notify_one();
}

optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;

cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

if (_queue.empty())
return none;

auto job = std::move(_queue.front());
_queue.pop_front();

return std::move(job);
}

~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}

pool.join_all();
}
};

static constexpr size_t bignumber = 1 << 20;

class myClass
{
//unsigned char readbuffer[bignumber];
//unsigned char writebuffer[bignumber];
void functiondostuff() { }
void functiondomorestuff() { }

thread_pool pool; // uses 1 thread per core

public:
void wreak_havoc()
{
std::cout << "enqueuing jobs... " << std::flush;
for(size_t i=0; i<bignumber; ++i)
{
functiondostuff();
for(int j=0; j<2; ++j) {
functiondomorestuff();
pool.enqueue(bind(&myClass::myFunction, this, j, i));
}
}
std::cout << "done\n";
}

private:
void myFunction(int i, int j)
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
counter += 1;
}
};

int main()
{
myClass instance;
instance.wreak_havoc();

size_t last = 0;
while (counter < (2*bignumber))
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
if ((counter >> 4u) > last)
{
std::cout << "Progress: " << counter << "/" << (bignumber*2) << "\n";
last = counter >> 4u;
}
}
}

How many threads can I spawn using boost in c++?

Remember that each thread needs to reserve stack space. That is why there is a limit on the number of threads you can spawn. It looks like you are either hitting this limit or boost is stopping you from hitting this limit.

Here is a link to the most recent boost documentation that documents the behavior you are seeing (the exception thrown): boost thread docs (search for boost::thread_resource_error on that page)

How to properly close boost thread

As per the comment, it's not enough to simply call io_service.stop(). The thread on which the service is running must be joined to allow it to exit gracefully.

asio::io_service and thread_group lifecycle issue

while (condition)
{
//... stuff
threadpool.join_all();

//...
}

Doesn't make any sense, because you can only join threads once. Once joined, they are gone. You don't want to be starting new threads all the time (use a thread pool + task queue¹).

Since you don't want to actually stop the threads, you probably don't want to destruct the work. If you insist, a shared_ptr<work> or optional<work> works nicely (just my_work.reset() it)

¹ Update Suggestion:

  • simple thread_pool with task queue: (in boost thread throwing exception "thread_resource_error: resource temporarily unavailable")
  • A queue based on io_service itself (using work) c++ work queues with blocking

UPDATE

A simple extension to "SOLUTION #2" would make it possible to wait for all tasks to have been completed, without joining the workers/destroying the pool:

  void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}

Note that for reliable operation, one needs to signal the condition variable on de-queue as well:

      cv.notify_all(); // in order to signal drain

CAVEATS

  1. It's an interface inviting race conditions (the queue could accept jobs from many threads, so once drain() returns, another thread could have posted a new task already)

  2. This signals when the queue is empty, not when the task is completed. The queue cannot know about this, if you need this, use a barrier/signal a condition from within the task (the_work in this example). The mechanism for queuing/scheduling is not relevant there.

DEMO

Live On Coliru

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
private:
mutex mx;
condition_variable cv;

typedef function<void()> job_t;
std::deque<job_t> _queue;

thread_group pool;

boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}

public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}

void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));

cv.notify_one();
}

void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}

optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;

cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

if (_queue.empty())
return none;

auto job = std::move(_queue.front());
_queue.pop_front();

cv.notify_all(); // in order to signal drain

return std::move(job);
}

~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}

pool.join_all();
}
};

void the_work(int id)
{
std::cout << "worker " << id << " entered\n";

// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::milliseconds(2));
std::cout << "worker " << id << " done\n";
}

int main()
{
thread_pool pool; // uses 1 thread per core

for (auto i = 0ull; i < 20; ++i) {
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));

pool.drain(); // make the queue empty, leave the threads
std::cout << "Queue empty\n";
}

// destructing pool joins the worker threads
}

Generic thread pool class is not working properly

I'm really curious what the semaphore is about.

io_service is already a task queue. It's thread safe and you don't need a semaphore.

For comparison, here's io_service based thread pool:

  • A thread pool like yours, without Asio, using a condition variable in much the same way but without calling it "semaphore"
    boost thread throwing exception "thread_resource_error: resource temporarily unavailable"

  • The same thing, but rewritten around io_service like your ThreadPool, which shows how you don't need the semaphore anymore
    [Solution 1] in
    c++ work queues with blocking
    (Solution 2 is using the same threadpool as before).

(Even better, recent Asio versions have a built-in thread-pool).

Where Is The Error

This is unsafe:

template <class type, class T, class T1, class... Args>
auto post(type T::*f, T1& obj, Args... args) {
this->m_sem.take();
this->m_io_service.post([&]() {
T o = static_cast<T&&>(obj);
(o.*f)(std::forward<Args>(args)...);
this->m_sem.give();
});
}

Specifically:


  1. The line

    T o = static_cast<T&&>(obj);

    doesn't copy T (which is HelloWorld). You knew that because that wouldn't be possible. What happens is WORSE: the object is MOVED from obj.

    Incidentally, this assumes that T is move-constructible from T1.

    You specifically ask for it by explicitly casting the right-handside to an rvalue-reference.

    This is what std::move is specified to do, actually: "In particular, std::move produces an xvalue expression that identifies its argument t. It is exactly equivalent to a static_cast to an rvalue reference type."

    The effect is that the HelloWorld instance in main is no longer valid, yet you keep moving from it for subsequent tasks.

  2. The other arguments captured by reference. This means they're out of scope before the task actually executes (including f).

To make this safe, you have had to capture the arguments in local copies:

template <class type, class T, class... Args>
auto post(type T::*f, T&& obj, Args... args) {
this->m_sem.take();
this->m_io_service.post([=, o = std::move(obj)]() mutable {
try {
(o.*f)(args...);
} catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}

Notes:


  1. now obj is taken by rvalue reference. This means that post won't compile unless obj is an rvalue.

    Note this is not a universal reference because T is deduced as part of f.

  2. the lambda is now mutable (because otherwise only const member functions could be run on the captured o)

  3. all other args are copied - this is roughly how std::bind would operate, but you could optimize for movable arguments).

  4. We handle exceptions - in your code if f threw, you would never give() the semaphore

Of course, main needs to adapt so multiple HelloWorld instances are actually created and passed by rvalue:

for (int ii = 0; ii < 5; ii++) {
HelloWorld hw("Hola mundo");
tp.post(&HelloWorld::greetings, std::move(hw), ii);
}

BUT - IT WON'T WORK

At least, for me it doesn't compile. Asio requires handlers to be copyable (why must a Boost.Asio handler be copy constructible?, How to trick boost::asio to allow move-only handlers).

Also, we hardly scratched the surface. By hardcoding for type T::*f you made it so that you need new post overloads for many things: static methods, const member functions ...

Instead, why not do it the C++ way:

template <class F, class... Args>
auto post(F&& f, Args&&... args) {
this->m_sem.take();
this->m_io_service.post(
[this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)]
{
try { f(); }
catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}

Actually, in more modern C++ you'd write (assuming c++17 here):

    //...
[this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)]
{
try { std::apply(f, args); }
//...

Oh, and we still need

#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1

because of the move-only handler type

Full Fixed Version Demo

NOTE: Also added an output mutex (s_outputmx) to avoid intermixed console output.

Live On Coliru

#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

#include <mutex>
#include <condition_variable>

class Semaphore {
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore() { count = 0; }
void wait() {
std::unique_lock<std::mutex> m(lock);
while (count > 0)
cond.wait(m, [this] { return count == 0; });
}
void take() {
std::unique_lock m(lock);
count++;
}
void give() {
std::unique_lock m(lock);
count--;
if (count == 0) {
cond.notify_one();
}
}
};

class ThreadPool {
private:
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
Semaphore m_sem;

public:
ThreadPool(size_t n) {
this->m_work =
std::make_unique<boost::asio::io_service::work>(m_io_service);
for (size_t ii = 0; ii < n; ii++) {
m_threads.create_thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
}
}
ThreadPool(const ThreadPool& v) = delete;
ThreadPool(ThreadPool&& v) = delete;
~ThreadPool() { m_io_service.stop(); }

template <class F, class... Args>
auto post(F&& f, Args&&... args) {
this->m_sem.take();
this->m_io_service.post(
#if 1 // pre-c++17
[this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)]
{
try { f(); }
#else // https://en.cppreference.com/w/cpp/utility/apply
[this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)]
{
try { std::apply(f, args); }
#endif
catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}

void wait() { this->m_sem.wait(); }
};

struct HelloWorld {
std::string m_str;

HelloWorld(std::string str) : m_str(str){};
HelloWorld(const HelloWorld& v) = delete;
HelloWorld(HelloWorld&& v) = default;
~HelloWorld() = default;

void greetings(int ii) const {
for (int jj = 0; jj < 5; jj++) {
{
static std::mutex s_outputmx;
std::lock_guard<std::mutex> lk(s_outputmx);
std::cout << this->m_str << " " << ii << std::endl;
}
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
}
};

int main()
{
ThreadPool tp(8);

for (int ii = 0; ii < 5; ii++) {
HelloWorld hw("Hola mundo");
tp.post(&HelloWorld::greetings, std::move(hw), ii);
}

tp.wait();
}

Prints

Hola mundo 0
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 4
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 1
Hola mundo 3

BONUS: Drop semaphore

Dropping the semaphore and actually using work:

class ThreadPool {
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;

public:
ThreadPool(size_t n)
: m_work(std::make_unique<boost::asio::io_service::work>(m_io_service))
{
while (n--) {
m_threads.create_thread([this] { m_io_service.run(); });
}
}

~ThreadPool() { wait(); }

void wait() {
m_work.reset();
m_threads.join_all();
}

template <class F, class... Args> void post(F&& f, Args&&... args) {
m_io_service.post(
[f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] {
std::apply(f, args);
});
}
};

That's 28 lines of code, compared to 90 lines in your original. And it actually does more things.

See it Live On Coliru as well.

What's Left?

We didn't handle exceptions from io_service::run properly (see Should the exception thrown by boost::asio::io_service::run() be caught?)

Also, if you have "recent" Boost, you can enjoy an improved interface to work (make_work_guard and .reset() so you don't need unique_ptr), and a ready-made thread_pool (so you don't need ... basically anything anymore):

Live On Coliru

#include <boost/asio.hpp>
#include <mutex>
#include <iostream>
static std::mutex s_outputmx;
using namespace std::chrono_literals;

struct HelloWorld {
std::string const m_str;
void greetings(int ii) const;
};

int main() {
boost::asio::thread_pool tp(8);

for (int ii = 0; ii < 5; ii++)
//post(tp, [hw=HelloWorld{"Hola mundo"}, ii] { hw.greetings(ii); });
post(tp, std::bind(&HelloWorld::greetings, HelloWorld{"Hola mundo"}, ii));

tp.join();
}

void HelloWorld::greetings(int ii) const {
for (int jj = 0; jj < 5; jj++) {
std::this_thread::sleep_for(1s);

std::lock_guard<std::mutex> lk(s_outputmx);
std::cout << m_str << " " << ii << std::endl;
}
}

Boost Thread_Group in a loop is very slow

The answer is simple. Don't start that many threads. Consider starting as many threads as you have logical CPU cores. Starting threads is very expensive.

Certainly never start a thread just to do one tiny job. Keep the threads and give them lots of (small) tasks using a task queue.

See here for a good example where the number of threads was similarly the issue: boost thread throwing exception "thread_resource_error: resource temporarily unavailable"

In this case I'd think you can gain a lot of performance by increasing the size of each task (don't create one per pixel, but per scan-line for example)



Related Topics



Leave a reply



Submit