I've written a MQTT client for embedded system with RTOS (understand not POSIX, generally FreeRTOS). There's no pthread in the system. There's only 32 bits atomic instructions supported.
The client multi-thread/task usage is like this:
- One thread is doing receiving/connecting/subscribing work (it's calling a callback upon message receiving). It's called the event loop thread.
- N threads can publish messages (this can happens in the event loop thread too, but not within the receiving code).
The client holds a pointer to a implementation (in a PImpl idiom) and that implementation is deleted upon error. A reconnect to the server creates a new instance of that implementation.
In most usage cases, there's only one thread in the system, so I don't want to add a lock/mutex/semaphore to protect the implementation's pointer, that would be taken for each access in the event loop.
So, in order to handle the case of a publish in an other thread, I've considered that:
- The publishing thread only need the implementation's pointer to send (thread local) data to implementation's socket. socket usage is multithread safe since only the publish thread write to the socket when the client is not in error.
- The event thread should detect an error and close/destruct the implementation only when no publish thread is publishing.
- While it's doing so, no publishing thread can publish
I've done an implementation like this pseudo code (link at the bottom of the post for a real working minimal example):
struct Impl
{
[...]
};
struct A
{
atomic<u32> usage_count = 0;
atomic<u32> errored = true;
Impl * impl = nullptr;
bool exchangeUsage(int step, u32 error)
{
u32 previous = usage.load();
while (true)
{
u32 next = (previous + step);
if (usage.compare_exchange_strong(previous, next))
{
errored |= error;
break;
}
}
}
Impl * acquire()
{
if (error.load()) return nullptr; // Precheck: prevent acquiring impl if an error is already signaled
exchangeUsage(1, false);
// Postcheck: If the error happened during CAS, we need to account for it
if (errored.load()) {
exchangeUsage(-1, true); // Release our usage since we don't use the implementation anymore
return nullptr;
}
return impl;
}
void release(bool withError) { exchangeUsage(-1, withError); }
void closeIfError(bool errorHappened) {
bool inError = errored.load();
if (!inError && !errorHappened) return;
errored.store(1); // Mark the object as errored so starting from now, no publish can acquire it anymore
// Wait until the usage count is 1 again, meaning all publish thread are done with the implementation
u32 previous = usage_count.load();
while (previous != 1)
{
// Yield here
sched_yield();
previous = usage_count.load();
}
// Ok, here it's not possible for a publish to acquire anymore since it's errored
delete impl; impl = nullptr;
}
void resetState() { usage_count = 1; errored = false; }
};
A client;
// Usage is like this for publishing thread:
void publish()
{
Impl * impl = client.acquire();
if (impl) { bool error = impl->publish(...); client.release(error); }
}
// Usage is like this in the event loop thread
void eventLoop()
{
while(true) {
bool error = receiveAndCallCallback(client.impl);
if (error)
{
closeIfError(error);
client.impl = reconnect(...);
client.resetState();
}
}
}
// main() will call client.resetState() before creating threads.
I've tested this code with both GCC's thread sanitizer and valgrind's DRD and it runs without error (Valgrind only complains on unrelated printf's issues, not this code).
The question is therefore:
- Is this scheme safe to use?
- Do you spot a logic error in the pattern?
Minimal reproducible working example code on Godbolt, with a main which runs and prints some output.
[...]looks like a placeholder for some future code rather than real code for review. Also,atomic<>andu32aren't defined, which leaves reviewers guessing. \$\endgroup\$usageis also never defined inexchangeUsage()…erroris never defined inacquire()… etc. \$\endgroup\$