Is There an Intra-Process Local Pipe in Qt

Worker threads with shared resources in Qt application

First of all, you don't need explicit multithreading (it's optional), second of all you don't need any manually managed synchronization primitives.

Then, model each procedure using a state machine. Hopefully the communication protocol allows each procedure recognize the responses to its own commands, so that even though you'd be replicating the incoming data to all of the procedures, they'd ignore the data irrelevant to them.

This answer has a sketch of a solution that does exactly what you want, sans multiplexing. Multiplexing a QIODevice is trivial when you expose it via local pipes: everything incoming from the port is written to one end of one or more local pipes. Everything incoming from the pipes is written to the port. The pipes will maintain the integrity of the packets as long as you open their procedure end in Unbuffered mode. That way each write will arrive at the serial port as a contiguous block of bytes, and will be written to the port in the same manner.

How would you multiplex? Like so:

class IODeviceMux : public QObject {
Q_OBJECT
QVector<QPointer<AppPipe>> m_portPipes;
QVector<QPointer<AppPipe>> m_userPipes;
QPointer<QSerialPort> m_port;
public:
IODeviceMux(QObject *parent = {}) : QObject(parent) {}
void setPort(QIODevice *port) {
if (m_port) {
disconnect(m_port.get(), 0, this, 0);
m_userPipes.removeAll({});
for (auto pipe : qAsConst(m_userPipes))
disconnect(m_port.get(), 0, pipe.get(), 0);
}
m_port = port;
connect(m_port.get(), &QIODevice::readyRead, this, &IODeviceMux::onPortRead);
}
AppPipe *getPipe() {
QScopedPointer<AppPipe> user(new AppPipe(QIODevice::ReadWrite | QIODevice::Unbuffered));
auto *port = new AppPipe(QIODevice::ReadWrite | QIODevice::Unbuffered, this);
user->addOther(port);
connect(port, &QIODevice::readyRead, this, &IODeviceMux::onPipeRead);
connect(m_port.get(), &QIODevice::bytesWritten, user.get(), &QIODevice::bytesWritten);
connect(user, &QObject::destroyed, port, &QObject::deleteLater);
m_userPipes.push_back(user.get());
m_portPipes.push_back(port);
return user.take();
}
private:
void onPortRead() {
if (!m_port) return;
auto data = m_port->readAll();
m_portPipes.removeAll({});
for (auto pipe : qAsConst(m_portPipes))
pipe->write(data);
}
void onPipeRead() {
auto *pipe = qobject_cast<AppPipe*>(sender());
QByteArray data;
if (pipe) data = pipe->readAll();
if (m_port) m_port->write(data);
}
};

The procedures would each getPipe() and treat the pipe as if it was a serial port device. Each write into a pipe gets faithfully executed on the port. Each readyRead on the port is faithfully forwarded, with same data amounts available immediately to read. Even the port's bytesWritten is forwarded. But bytesToWrite doesn't work - it always returns zero. This could be fixed by adding an option to AppPipe to query this value.

That's about all you need to get it to work, I'd think.

Wait for signal while processing other signals

You're thinking synchronously in a pre-C++1z world. In C++14 (and prior) asynchronous programming, there is mostly no place for a notion of a wait that is implemented as a function that returns when the wait is over (switch-based coroutine hacks excepted). You are also not using the fact that your application is stateful, and the state transitions can be expressed in a state machine.

Instead, you should simply act on data being available. Presumably, your application can be in multiple states. One of the states - the one where you have to wait for input - is simply exited when the input arrives.

The example below uses a simple process-local pipe, but it would work exactly the same if you were using a serial port - both are a QIODevice and emit requisite signals. We start with the project file.

# async-comms-32309737.pro
QT += widgets core-private
TARGET = async-comms-32309737
CONFIG += c++11
TEMPLATE = app
SOURCES += main.cpp

To make things simple, the pipe implementation reuses the QRingBuffer private class from Qt. See this question for more fleshed-out implementation(s).

// main.cpp
#include <QtWidgets>
#include <private/qringbuffer_p.h>

/// A simple point-to-point intra-application pipe. This class is not thread-safe.
class AppPipe : public QIODevice {
Q_OBJECT
AppPipe * m_other { nullptr };
QRingBuffer m_buf;
public:
AppPipe(AppPipe * other, QObject * parent = 0) : QIODevice(parent), m_other(other) {
open(QIODevice::ReadWrite);
}
void setOther(AppPipe * other) { m_other = other; }
qint64 writeData(const char * data, qint64 maxSize) Q_DECL_OVERRIDE {
if (!maxSize) return maxSize;
m_other->m_buf.append(QByteArray(data, maxSize));
emit m_other->readyRead();
return maxSize;
}
qint64 readData(char * data, qint64 maxLength) Q_DECL_OVERRIDE {
return m_buf.read(data, maxLength);
}
qint64 bytesAvailable() const Q_DECL_OVERRIDE {
return m_buf.size() + QIODevice::bytesAvailable();
}
bool isSequential() const Q_DECL_OVERRIDE { return true; }
};

We start with a simple UI, with one button to restart the state machine, another to transmit a single byte that will be received by the client, and a label that indicates the current state of the state machine.

screenshot of the example

int main(int argc, char *argv[])
{
QApplication a { argc, argv };
QWidget ui;
QGridLayout grid { &ui };
QLabel state;
QPushButton restart { "Restart" }, transmit { "Transmit" };
grid.addWidget(&state, 0, 0, 1, 2);
grid.addWidget(&restart, 1, 0);
grid.addWidget(&transmit, 1, 1);
ui.show();

We now create the simulated device and the client pipe endpoints.

   AppPipe device { nullptr };
AppPipe client { &device };
device.setOther(&client);

The state machine has three states. The s_init is the initial state, and is exited after a 1.5s delay. The s_wait state is only exited when we receive some data (a byte or more) from the device in that state. In this example, receiving the data in other states has no effect. The machine is set to restart automatically when stopped.

   QStateMachine sm;
QState
s_init { &sm }, // Exited after a delay
s_wait { &sm }, // Waits for data to arrive
s_end { &sm }; // Final state
QTimer timer;
timer.setSingleShot(true);

sm.setInitialState(&s_init);
QObject::connect(&sm, &QStateMachine::stopped, &sm, &QStateMachine::start);
QObject::connect(&s_init, &QState::entered, [&]{ timer.start(1500); });
s_init.addTransition(&timer, SIGNAL(timeout()), &s_wait);
s_wait.addTransition(&client, SIGNAL(readyRead()), &s_end);

To visualize the state machine's progress, we assign the state label's text property in each of the states:

   s_init.assignProperty(&state, "text", "Waiting for timeout.");
s_wait.assignProperty(&state, "text", "Waiting for data.");
s_end.assignProperty(&state, "text", "Done.");

Finally, the restart button stops the state machine - it will self-restart then. The transmit button simulates the device sending one byte of data.

   QObject::connect(&restart, &QPushButton::clicked, &sm, &QStateMachine::stop);
QObject::connect(&transmit, &QPushButton::clicked, [&]{
device.write("*", 1);
});

We start the machine, enter the event loop, and let Qt follow our directions onwards from here. The main.moc file is included for it contains the metadata for AppPipe.

   sm.start();
return a.exec();
}

#include "main.moc"

Is Qt's QBuffer thread safe?

To quote Mark Summerfield's book C++ GUI Programming with Qt 4:

Qt's thread-safe classes include QMutex, QMutexLocker, QReadWriteLock,
QReadLocker, QWriteLocker, QSemaphore, QThreadStorage, and
QWaitCondition. In addition, parts of the QThread API and several
other functions are thread-safe, notably QObject::connect(),
QObject::disconnect(), QCoreApplication::postEvent(), and
QCoreApplication::removePostedEvents().

Qt expects that you will use locking mechanisms around most of it's classes. The docs will say "All functions are thread-safe" if they are, and the individual functions will also specify "is thread-safe".

Notes on Qt Classes

Many Qt classes are reentrant, but they are not
made thread-safe, because making them thread-safe would incur the
extra overhead of repeatedly locking and unlocking a QMutex. For
example, QString is reentrant but not thread-safe. You can safely
access different instances of QString from multiple threads
simultaneously, but you can't safely access the same instance of
QString from multiple threads simultaneously (unless you protect the
accesses yourself with a QMutex).

Some Qt classes and functions are
thread-safe. These are mainly the thread-related classes (e.g. QMutex)
and fundamental functions (e.g. QCoreApplication::postEvent()).

Because QBuffer is a direct subclass of QIODevice I would especially expect it not to be thread-safe, but there are container classes that are thread-safe for read-access, but would require locking for write access:

Container Classes

The container classes are implicitly shared, they are reentrant, and
they are optimized for speed, low memory consumption, and minimal
inline code expansion, resulting in smaller executables. In addition,
they are thread-safe in situations where they are used as read-only
containers by all threads used to access them.

How Do I Make My Program in Qt Continually Send A String to My Arduino?

I suggest you expand on your design somewhat:

  • have a repeating QTimer with an interval depending on the rate you want to send the string at, and the timer to the function that sends the string
  • connect the button's pressed signal to start the timer
  • connect the button's released signal to stop the timer

Events are sent only once, thus the handlers will be executed only once, if you want to keep on repeating it, you will have to use a timer or some other event driven way. You cannot use a loop as that would block the GUI thread and your application will stop responding.

Sure, you could use the button's auto repeat, and there is the option to adjust the triggering and repeating intervals, but a solution that puts a line between logic and GUI is better. You should really rely on the GUI for storing data or controlling the internal logic. The GUI should only be a front end.

You need more work on the serial port though. If you are going to use it from the GUI thread, you will have to use the non-blocking API. Which will require to extend on your implementation a little bit more. There is a good example on how to achieve that, you only need to modify it to simply enable the sending of further payloads once the previous payload has been successfully sent. In pseudo code:

on button press
start timer
on button release
stop timer
onTimeout
if (can send)
send
can send = false
onBytesWritten
accumulate bytes
if (payload is completed)
can send = true
reset payload byte counter

Of course, you will also have to do some error checking, you can't just expect it to work. The example linked contains basic error handling.

ASSERT failure in QListT::operator[]: index out of range

The decoder will perform better if you make the parsing more explicit in the formalism of a parser. A two-state machine will do the job. Below is a full test case:

// https://github.com/KubaO/stackoverflown/tree/master/questions/packet-read-43228728
#include <QtTest>
#include <private/qringbuffer_p.h>

// See http://stackoverflow.com/a/32317276/1329652
/// A simple point-to-point intra-process pipe. The other endpoint can live in any
/// thread.
class AppPipe : public QIODevice {
//...
};

class Decoder : public QObject {
Q_OBJECT
QPointer<QIODevice> m_device;
QByteArray m_data;
char m_first;
bool m_isFirst = true;
static constexpr char fromHex(char c) {
return
(c >= '0' && c <= '9') ? (c - '0') :
(c >= 'A' && c <= 'F') ? (c - 'A' + 10) :
(c >= 'a' && c <= 'f') ? (c - 'a' + 10) :
-1;
}
void decode(const QByteArray & src) {
for (auto c : src) {
auto val = fromHex(c);
if (val < 0) continue;
if (m_isFirst)
m_first = val << 4;
else
m_data.append(m_first | val);
m_isFirst = !m_isFirst;
}
}
void onReadyRead() {
// The data has the format "XX XX XX" where X are hex digits.
// Spaces and invalid digits are skipped
decode(m_device->readAll());
if (m_data.size() >= 4) {
auto length = 4 + m_data[3];
if (m_data.size() >= length) {
emit hasMessage(m_data.left(length));
m_data.remove(0, length);
}
}
}
public:
Decoder(QIODevice * dev, QObject * parent = {}) : QObject{parent}, m_device{dev} {
connect(dev, &QIODevice::readyRead, this, &Decoder::onReadyRead);
}
Q_SIGNAL void hasMessage(const QByteArray &);
};

class DecoderTest : public QObject {
Q_OBJECT
AppPipe src{nullptr, QIODevice::ReadWrite};
AppPipe dst{&src, QIODevice::ReadWrite};
Q_SLOT void initTestCase() {
src.addOther(&dst);
}
Q_SLOT void test1() {
Decoder dec(&dst, this);
QSignalSpy spy(&dec, &Decoder::hasMessage);

src.write("0"); // send a partial header
QCOMPARE(spy.size(), 0);
src.write("0 00 00 03 "); // send rest of the header
QCOMPARE(spy.size(), 0);
src.write("0A 0B "); // send partial data
QCOMPARE(spy.size(), 0);
src.write("0C "); // send rest of data
QCOMPARE(spy.size(), 1);

QCOMPARE(dst.bytesAvailable(), 0); // ensure all data has been read

const QByteArray packet{"\x00\x00\x00\x03\x0A\x0B\x0C", 4+3};
QCOMPARE(spy.first().size(), 1);
QCOMPARE(spy.first().first(), {packet});
}
Q_SLOT void test2() {
Decoder dec(&dst, this);
QSignalSpy spy(&dec, &Decoder::hasMessage);

src.write("BABE0004 C001 DA7E\n0FAB33"); // send a packet and part of another
QCOMPARE(spy.size(), 1);
src.write("01 AB\n");
QCOMPARE(spy.size(), 2);

QCOMPARE(spy.at(0).size(), 1);
QCOMPARE(spy.at(1).size(), 1);
const QByteArray packet1{"\xBA\xBE\x00\x04\xC0\x01\xDA\x7E", 4+4};
const QByteArray packet2{"\x0F\xAB\x33\x01\xAB", 4+1};
QCOMPARE(spy.at(0).first(), {packet1});
QCOMPARE(spy.at(1).first(), {packet2});
}
};

QTEST_GUILESS_MAIN(DecoderTest)
#include "main.moc"

Sending a sequence of commands and wait for response

Let's use QStateMachine to make this simple. Let's recall how you wished such code would look:

Serial->write("boot", 1000);
Serial->waitForKeyword("boot successful");
Serial->sendFile("image.dat");

Let's put it in a class that has explicit state members for each state the programmer could be in. We'll also have action generators send, expect, etc. that attach given actions to states.

// https://github.com/KubaO/stackoverflown/tree/master/questions/comm-commands-32486198
#include <QtWidgets>
#include <private/qringbuffer_p.h>
#include <type_traits>

[...]

class Programmer : public StatefulObject {
Q_OBJECT
AppPipe m_port { nullptr, QIODevice::ReadWrite, this };
State s_boot { &m_mach, "s_boot" },
s_send { &m_mach, "s_send" };
FinalState s_ok { &m_mach, "s_ok" },
s_failed { &m_mach, "s_failed" };
public:
Programmer(QObject * parent = 0) : StatefulObject(parent) {
connectSignals();
m_mach.setInitialState(&s_boot);
send (&s_boot, &m_port, "boot\n");
expect(&s_boot, &m_port, "boot successful", &s_send, 1000, &s_failed);
send (&s_send, &m_port, ":HULLOTHERE\n:00000001FF\n");
expect(&s_send, &m_port, "load successful", &s_ok, 1000, &s_failed);
}
AppPipe & pipe() { return m_port; }
};

This is fully functional, complete code for the programmer! Completely asynchronous, non-blocking, and it handles timeouts, too.

It's possible to have infrastructure that generates the states on-the-fly, so that you don't have to manually create all the states. The code is much smaller and IMHO easier to comperehend if you have explicit states. Only for complex communication protocols with 50-100+ states would it make sense to get rid of explicit named states.

The AppPipe is a simple intra-process bidirectional pipe that can be used as a stand-in for a real serial port:

// See http://stackoverflow.com/a/32317276/1329652
/// A simple point-to-point intra-process pipe. The other endpoint can live in any
/// thread.
class AppPipe : public QIODevice {
[...]
};

The StatefulObject holds a state machine, some basic signals useful for monitoring the state machine's progress, and the connectSignals method used to connect the signals with the states:

class StatefulObject : public QObject {
Q_OBJECT
Q_PROPERTY (bool running READ isRunning NOTIFY runningChanged)
protected:
QStateMachine m_mach { this };
StatefulObject(QObject * parent = 0) : QObject(parent) {}
void connectSignals() {
connect(&m_mach, &QStateMachine::runningChanged, this, &StatefulObject::runningChanged);
for (auto state : m_mach.findChildren<QAbstractState*>())
QObject::connect(state, &QState::entered, this, [this, state]{
emit stateChanged(state->objectName());
});
}
public:
Q_SLOT void start() { m_mach.start(); }
Q_SIGNAL void runningChanged(bool);
Q_SIGNAL void stateChanged(const QString &);
bool isRunning() const { return m_mach.isRunning(); }
};

The State and FinalState are simple named state wrappers in the style of Qt 3. They allow us to declare the state and give it a name in one go.

template <class S> struct NamedState : S {
NamedState(QState * parent, const char * name) : S(parent) {
this->setObjectName(QLatin1String(name));
}
};
typedef NamedState<QState> State;
typedef NamedState<QFinalState> FinalState;

The action generators are quite simple, too. The meaning of an action generator is "do something when a given state is entered". The state to act on is always given as the first argument. The second and subsequent arguments are specific to the given action. Sometimes, an action might need a target state as well, e.g. if it succeeds or fails.

void send(QAbstractState * src, QIODevice * dev, const QByteArray & data) {
QObject::connect(src, &QState::entered, dev, [dev, data]{
dev->write(data);
});
}

QTimer * delay(QState * src, int ms, QAbstractState * dst) {
auto timer = new QTimer(src);
timer->setSingleShot(true);
timer->setInterval(ms);
QObject::connect(src, &QState::entered, timer, static_cast<void (QTimer::*)()>(&QTimer::start));
QObject::connect(src, &QState::exited, timer, &QTimer::stop);
src->addTransition(timer, SIGNAL(timeout()), dst);
return timer;
}

void expect(QState * src, QIODevice * dev, const QByteArray & data, QAbstractState * dst,
int timeout = 0, QAbstractState * dstTimeout = nullptr)
{
addTransition(src, dst, dev, SIGNAL(readyRead()), [dev, data]{
return hasLine(dev, data);
});
if (timeout) delay(src, timeout, dstTimeout);
}

The hasLine test simply checks all lines that can be read from the device for a given needle. This works fine for this simple communications protocol. You'd need more complex machinery if your communications were more involved. It is necessary to read all the lines, even if you find your needle. That's because this test is invoked from the readyRead signal, and in that signal you must read all the data that fulfills a chosen criterion. Here, the criterion is that the data forms a full line.

static bool hasLine(QIODevice * dev, const QByteArray & needle) {
auto result = false;
while (dev->canReadLine()) {
auto line = dev->readLine();
if (line.contains(needle)) result = true;
}
return result;
}

Adding guarded transitions to states is a bit cumbersome with the default API, so we will wrap it to make it easier to use, and to keep the action generators above readable:

template <typename F>
class GuardedSignalTransition : public QSignalTransition {
F m_guard;
protected:
bool eventTest(QEvent * ev) Q_DECL_OVERRIDE {
return QSignalTransition::eventTest(ev) && m_guard();
}
public:
GuardedSignalTransition(const QObject * sender, const char * signal, F && guard) :
QSignalTransition(sender, signal), m_guard(std::move(guard)) {}
GuardedSignalTransition(const QObject * sender, const char * signal, const F & guard) :
QSignalTransition(sender, signal), m_guard(guard) {}
};

template <typename F> static GuardedSignalTransition<F> *
addTransition(QState * src, QAbstractState *target,
const QObject * sender, const char * signal, F && guard) {
auto t = new GuardedSignalTransition<typename std::decay<F>::type>
(sender, signal, std::forward<F>(guard));
t->setTargetState(target);
src->addTransition(t);
return t;
}

That's about it - if you had a real device, that's all you need. Since I don't have your device, I'll create another StatefulObject to emulate the presumed device behavior:

class Device : public StatefulObject {
Q_OBJECT
AppPipe m_dev { nullptr, QIODevice::ReadWrite, this };
State s_init { &m_mach, "s_init" },
s_booting { &m_mach, "s_booting" },
s_firmware { &m_mach, "s_firmware" };
FinalState s_loaded { &m_mach, "s_loaded" };
public:
Device(QObject * parent = 0) : StatefulObject(parent) {
connectSignals();
m_mach.setInitialState(&s_init);
expect(&s_init, &m_dev, "boot", &s_booting);
delay (&s_booting, 500, &s_firmware);
send (&s_firmware, &m_dev, "boot successful\n");
expect(&s_firmware, &m_dev, ":00000001FF", &s_loaded);
send (&s_loaded, &m_dev, "load successful\n");
}
Q_SLOT void stop() { m_mach.stop(); }
AppPipe & pipe() { return m_dev; }
};

Now let's make it all nicely visualized. We'll have a window with a text browser showing the contents of the communications. Below it will be buttons to start/stop the programmer or the device, and labels indicating the state of the emulated device and the programmer:

screenshot

int main(int argc, char ** argv) {
using Q = QObject;
QApplication app{argc, argv};
Device dev;
Programmer prog;

QWidget w;
QGridLayout grid{&w};
QTextBrowser comms;
QPushButton devStart{"Start Device"}, devStop{"Stop Device"},
progStart{"Start Programmer"};
QLabel devState, progState;
grid.addWidget(&comms, 0, 0, 1, 3);
grid.addWidget(&devState, 1, 0, 1, 2);
grid.addWidget(&progState, 1, 2);
grid.addWidget(&devStart, 2, 0);
grid.addWidget(&devStop, 2, 1);
grid.addWidget(&progStart, 2, 2);
devStop.setDisabled(true);
w.show();

We'll connect the device's and programmer's AppPipes. We'll also visualize what the programmer is sending and receiving:

   dev.pipe().addOther(&prog.pipe());
prog.pipe().addOther(&dev.pipe());
Q::connect(&prog.pipe(), &AppPipe::hasOutgoing, &comms, [&](const QByteArray & data){
comms.append(formatData(">", "blue", data));
});
Q::connect(&prog.pipe(), &AppPipe::hasIncoming, &comms, [&](const QByteArray & data){
comms.append(formatData("<", "green", data));
});

Finally, we'll connect the buttons and labels:

   Q::connect(&devStart, &QPushButton::clicked, &dev, &Device::start);
Q::connect(&devStop, &QPushButton::clicked, &dev, &Device::stop);
Q::connect(&dev, &Device::runningChanged, &devStart, &QPushButton::setDisabled);
Q::connect(&dev, &Device::runningChanged, &devStop, &QPushButton::setEnabled);
Q::connect(&dev, &Device::stateChanged, &devState, &QLabel::setText);
Q::connect(&progStart, &QPushButton::clicked, &prog, &Programmer::start);
Q::connect(&prog, &Programmer::runningChanged, &progStart, &QPushButton::setDisabled);
Q::connect(&prog, &Programmer::stateChanged, &progState, &QLabel::setText);
return app.exec();
}

#include "main.moc"

The Programmer and Device could live in any thread. I've left them in the main thread since there's no reason to move them out, but you could put both into a dedicated thread, or each into its own thread, or into threads shared with other objects, etc. It's completely transparent since AppPipe supports communications across the threads. This would also be the case if QSerialPort was used instead of AppPipe. All that matters is that each instance of a QIODevice is used from one thread only. Everything else happens via signal/slot connections.

E.g. if you wanted the Programmer to live in a dedicated thread, you'd add the following somewhere in main:

  // fix QThread brokenness
struct Thread : QThread { ~Thread() { quit(); wait(); } };

Thread progThread;
prog.moveToThread(&progThread);
progThread.start();

A little helper formats the data to make it easier to read:

static QString formatData(const char * prefix, const char * color, const QByteArray & data) {
auto text = QString::fromLatin1(data).toHtmlEscaped();
if (text.endsWith('\n')) text.truncate(text.size() - 1);
text.replace(QLatin1Char('\n'), QString::fromLatin1("<br/>%1 ").arg(QLatin1String(prefix)));
return QString::fromLatin1("<font color=\"%1\">%2 %3</font><br/>")
.arg(QLatin1String(color)).arg(QLatin1String(prefix)).arg(text);
}


Related Topics



Leave a reply



Submit