Making a C++ Class a Monitor (In the Concurrent Sense)

Making a C++ class a Monitor (in the concurrent sense)

You can achieve this with some judicious use of operator-> and modern c++ which gives for much cleaner syntax than the previously accepted answer:

template<class T>
class monitor
{
public:
template<typename ...Args>
monitor(Args&&... args) : m_cl(std::forward<Args>(args)...){}

struct monitor_helper
{
monitor_helper(monitor* mon) : m_mon(mon), m_ul(mon->m_lock) {}
T* operator->() { return &m_mon->m_cl;}
monitor* m_mon;
std::unique_lock<std::mutex> m_ul;
};

monitor_helper operator->() { return monitor_helper(this); }
monitor_helper ManuallyLock() { return monitor_helper(this); }
T& GetThreadUnsafeAccess() { return m_cl; }

private:
T m_cl;
std::mutex m_lock;
};

The idea is that you use the arrow operator to access the underlying object, but that returns a helper object which locks and then unlocks the mutex around your function call. Then through the magic of the language repeatedly applying operator-> you get a reference to the underlying object.

Usage:

monitor<std::vector<int>> threadSafeVector {5};

threadSafeVector->push_back(0);
threadSafeVector->push_back(1);
threadSafeVector->push_back(2);

// Create a bunch of threads that hammer the vector
std::vector<std::thread> threads;
for(int i=0; i<16; ++i)
{
threads.push_back(std::thread([&]()
{
for(int i=0; i<1024; ++i)
{
threadSafeVector->push_back(i);
}
}));
}

// You can explicitely take a lock then call multiple functions
// without the overhead of a relock each time. The 'lock handle'
// destructor will unlock the lock correctly. This is necessary
// if you want a chain of logically connected operations
{
auto lockedHandle = threadSafeVector.ManuallyLock();
if(!lockedHandle->empty())
{
lockedHandle->pop_back();
lockedHandle->push_back(-3);
}
}

for(auto& t : threads)
{
t.join();
}

// And finally access the underlying object in a raw fashion without a lock
// Use with Caution!

std::vector<int>& rawVector = threadSafeVector.GetThreadUnsafeAccess();
rawVector.push_back(555);

// Should be 16393 (5+3+16*1024+1)
std::cout << threadSafeVector->size() << std::endl;

C++ monitor class/wrapper using condition variables

Here is a working example:

#include <iostream>
#include <mutex>

template<class T>
class Wrapper {
std::mutex m;
T* p;

public:
Wrapper(T& t) : p(&t) {}

class Proxy {
std::unique_lock<std::mutex> lock;
T* p;

public:
Proxy(std::mutex& m, T* p)
: lock(m)
, p(p)
{
// Locked the mutex.
std::cout << __PRETTY_FUNCTION__ << '\n';
}

Proxy(Proxy&& b) = default;

~Proxy() {
std::cout << __PRETTY_FUNCTION__ << '\n';
// Unlocked the mutex.
}

T* operator->() const { return p; }
};

Proxy operator->() { return Proxy(m, p); }
};

struct A {
void f() { std::cout << __PRETTY_FUNCTION__ << '\n'; }
};

int main() {
A a;
Wrapper<A> w(a);
w->f();
}

Outputs:

Wrapper<T>::Proxy::Proxy(std::mutex&, T*) [with T = A]
void A::f()
Wrapper<T>::Proxy::~Proxy() [with T = A]

See Wrapping C++ Member Function Calls by Bjarne Stroustrup for full details.

How does Herb Sutters monitor class work?

In which ways are Herb Sutters code better than mine?

  • Your T should be default constructible, and assignable.
  • In Herb Sutters code, T should be copy constructible.

  • Herb Sutters code allows to initialize the member.

  • Your operator () doesn't handle reference.

How to create a single process mutex within C++?

The answer by @Alex Guteniev is as accurate as one can get (and should be considered the accepted answer). It states that the c++ standard doesn't define a system wide concept, and that mutexes for all practical purposes are per process i.e for synchronization between threads (execution agents) in a single process (and therefore according to your needs). The C++ makes it clear what a thread (std::thread) is (33.3 - ... intended to map one-to-one with OS threads (in my draft, at least...N4687)).
Microsoft post VC2015 has improved their implementation to use windows primitives as stated here. This is also indicated here in the most upvoted answer. I've also looked at the boost library implementations (which often precedes/influences the c++ standard) for microsoft and (AFAICT) it doesn't use any inter-process calls.

So to answer your question. In C++ threads and monitors are practically the same thing if this definition is to be considered accurate.

Thread-safe locking of instance with multiple member functions

My suggestion per comment:

A better idiom might be a monitor which keeps the lock of your resource and provides access to the owner. To obtain a resource, the reserve() could return such monitor object (something like a proxy to access the contents of the resource). Any competing access to reserve() would block now (as the mutex is locked). When the resource owning thread is done, it just destroys the monitor object which in turn unlocks the resource. (This allows to apply RAII to all this which makes your code safe and maintainable.)

I modified OPs code to sketch how this could look like:

#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex

class information_t {

private:
std::mutex mtx;
int importantValue = 0;

public:
class Monitor {
private:
information_t& resource;
std::lock_guard<std::mutex> lock;

friend class information_t; // to allow access to constructor.

private:
Monitor(information_t& resource):
resource(resource), lock(resource.mtx)
{ }
public:
~Monitor()
{
std::cout << "Result: " << resource.importantValue << '\n';
resource.reset();
}

Monitor(const Monitor&) = delete; // copying prohibited
Monitor& operator=(const Monitor&) = delete; // copy assign prohibited

public:
// exposed resource API for monitor owner:
void incrementIt() { resource.incrementIt(); }
void decrementIt() { resource.decrementIt(); }
void reset() { resource.reset(); }
};
friend class Monitor; // to allow access to private members

public:
Monitor aquire() { return Monitor(*this); }

private:
// These should only be callable from the thread that currently holds the mutex
// Hence, they are private and accessible through a monitor instance only
void incrementIt() { importantValue++; }
void decrementIt() { importantValue--; }
void reset() { importantValue = 0; }

} protectedResource; // We only have one instance of this that we need to work with

#if 0 // OBSOLETE
// Free the resource so other threads can reserve and use it
void release()
{
protectedResource.reset();
protectedResource.mtx.unlock(); // Will this work? Can I guarantee the mtx is locked?
}
#endif // 0

// Supposed to make sure no other thread can reserve or use it now anymore!
information_t::Monitor reserve()
{
return protectedResource.aquire();
}

using MyResource = information_t::Monitor;

int main()
{
std::thread threads[3];

threads[0]
= std::thread([]
{
MyResource protectedResource = reserve();
protectedResource.incrementIt();
protectedResource.incrementIt();
// scope end releases protectedResource
});

threads[1]
= std::thread([]
{
try {
MyResource protectedResource = reserve();
throw "Haha!";
protectedResource.incrementIt();
// scope end releases protectedResource
} catch(...) { }
});

threads[2]
= std::thread([]
{
MyResource protectedResource = reserve();
protectedResource.decrementIt();
// scope end releases protectedResource
});

for (auto& th : threads) th.join();

return 0;
}

Output:

Result: 2
Result: -1
Result: 0

Live Demo on coliru

Is it possible to protect a resource like that, when I know it will always call reserve and release in that order?

It's not anymore necessary to be concerned about this. The correct usage is burnt in:

  • To get access to the resource, you need a monitor.
  • If you get it you are the exclusive owner of the resource.
  • If you exit the scope (where you stored the monitor as local variable) the monitor is destroyed and thus the locked resource auto-released.

The latter will happen even for unexpected bail-outs (in the MCVE the throw "Haha!";).

Furthermore, I made the following functions private:

  • information_t::increment()
  • information_t::decrement()
  • information_t::reset()

So, no unauthorized access is possible. To use them properly, an information_t::Monitor instance must be acquired. It provides public wrappers to those functions which can be used in the scope where the monitor resides i.e. by the owner thread only.

How might a class like .NET's ConcurrentBagT be implemented?

If you look at the details of ConcurrentBag<T>, you'll find that it's, internally, basically a customized linked list.

Since Bags can contain duplicates, and are not accessible by index, a doubly linked list is a very good option for implementation. This allows locking to be fairly fine grained for insert and removal (you don't have to lock the entire collection, just the nodes around where you're inserting/removing). Since you're not worried about duplicates, no hashing is involved. This makes a double linked list perfect.

How do I notify all threads waiting on other monitor?

How about using java.util.concurrent.locks.ReentrantLock instead of using the language's built-in monitors?

class Account {
private final Lock lock = new ReentrantLock();
private final Condition depositOk = lock.newCondition();
private final Condition withdrawOk = lock.newCondition();

public void withdraw(amount) {
lock.lock();
try {
while (!accountContainsAtLeast(amount)) {
withdrawOk.await();
}
withdrawLocked(amount);
depositOK.signal();
} finally {
lock.unlock();
}
}

public void deposit(amount) {
lock.lock();
try {
while (!OKtoDeposit(amount)) {
depositOk.await();
}
depositLocked(amount);
withdrawOK.signal();
} finally {
lock.unlock();
}
}
}

Semaphore vs. Monitors - what's the difference?

A Monitor is an object designed to be accessed from multiple threads. The member functions or methods of a monitor object will enforce mutual exclusion, so only one thread may be performing any action on the object at a given time. If one thread is currently executing a member function of the object then any other thread that tries to call a member function of that object will have to wait until the first has finished.

A Semaphore is a lower-level object. You might well use a semaphore to implement a monitor. A semaphore essentially is just a counter. When the counter is positive, if a thread tries to acquire the semaphore then it is allowed, and the counter is decremented. When a thread is done then it releases the semaphore, and increments the counter.

If the counter is already zero when a thread tries to acquire the semaphore then it has to wait until another thread releases the semaphore. If multiple threads are waiting when a thread releases a semaphore then one of them gets it. The thread that releases a semaphore need not be the same thread that acquired it.

A monitor is like a public toilet. Only one person can enter at a time. They lock the door to prevent anyone else coming in, do their stuff, and then unlock it when they leave.

A semaphore is like a bike hire place. They have a certain number of bikes. If you try and hire a bike and they have one free then you can take it, otherwise you must wait. When someone returns their bike then someone else can take it. If you have a bike then you can give it to someone else to return --- the bike hire place doesn't care who returns it, as long as they get their bike back.

Why are there no concurrent collections in C#?

.NET has had relatively "low level" concurrency support until now - but .NET 4.0 introduces the System.Collections.Concurrent namespace which contains various collections which are safe and useful.

Andrew's answer is entirely correct in terms of how to deal with collections before .NET 4.0 of course - and for most uses I'd just lock appropriately when accessing a "normal" shared collection. The concurrent collections, however, make it easy to use a producer/consumer queue, etc.



Related Topics



Leave a reply



Submit