Skip to main content
edited tags
Link
toolic
  • 15.8k
  • 6
  • 29
  • 217
Source Link
watkipet
  • 173
  • 1
  • 4

Producer consumer with shared buffer in C++11

My C++ knowledge is outdated and I'm trying to learn about C++11 threading. I'm working on a SoapySDR driver where the client acquires a buffer it wants to write to and the hardware transmits from that same buffer once it's filled. Rather than post the actual code here, I've distilled it down to an example using std::cout as my "transmitter".

//
//  main.cpp
//  ThreadingExample
//
//  Demonstrates sharing a common buffer between two threads. One
//  is the consumer (see tx_loop()), the other is the producer.
//

#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status
#include <iomanip>

std::mutex _buf_mutex;
std::condition_variable _buf_cond;
char _buff[10];
std::atomic<ssize_t>    _buf_count;
bool running;
std::chrono::system_clock::time_point start;

// The transmit loop. Every 500 ms, it tries to get a lock on the buffer,
// then "transmits" (cout) that buffer
void tx_loop()
{
    while (running)
    {
        std::unique_lock <std::mutex> lock(_buf_mutex);
        if (_buf_cond.wait_for(lock, std::chrono::seconds(1), []{return _buf_count > 0;}) == false)
        {
            // We hit this if we've had an underflow or if the producer thread has stopped.
            std::cout
                << std::setfill('0') << std::setw(5)
                << std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - start).count()
                << ": tx_loop timed out\n";
            return;
        }
        
        // Decrement the number of filled buffers.
        _buf_count--;
        
        // "Transmit" the packet
        std::cout
            << std::setfill('0') << std::setw(5)
            << std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - start).count()
            << ": "
            << std::string(_buff, sizeof(_buff) / sizeof(_buff[0]))
            << "\n";
        
        std::this_thread::sleep_for (std::chrono::milliseconds(500));
        
        // Notify the other threads that transmission has finished
        _buf_cond.notify_one();
    }
}

// Aqcuires a buffer that the client can write to. Blocks until the buffer becomes
// available for writing.
size_t acquireWriteBuffer(void **buf)
{
    std::cout
        << std::setfill('0') << std::setw(5)
        << std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - start).count()
        << ": acquireWriteBuffer _buf_count="
        << (int) _buf_count
        << "\n";
    
    // Increment the number of buffers about to be filled
    _buf_count++;
    
    // Notify the transmission thread that there's a buffer to transmit
    if (_buf_count > 0)
    {
        _buf_cond.notify_one();
    }

    // Wait for the buffer to become available
    std::unique_lock <std::mutex> lock(_buf_mutex);
    if (_buf_cond.wait_for(lock, std::chrono::seconds(1), []{return _buf_count < 1;}) == false)
    {
        // We hit this if we can't TX fast enough
        std::cout
            << std::setfill('0') << std::setw(5)
            << std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - start).count()
            << ": acquireWriteBuffer timed out\n";
        
        return 0;
    }
    
    *buf = _buff;
    
    return sizeof(_buff) / sizeof(_buff[0]);
}

// The main program. Sends 10 "packets" of 10 characters each. Each
// character is one ASCII value greater than the last.
int main (int argc, const char * argv[])
{
    start = std::chrono::system_clock::now();
    running = true;
    
    // Note that _buf_count starts at -1. This is because we not only need
    // to acquire a buffer, we need to write to it prior to the TX thread
    // sending it. When _buf_count is 0, we've acquired a buffer. When it's
    // 1, we've filled it.
    _buf_count = -1;
    std::thread th (tx_loop);
    
    // Send 10 chunks
    char presentChar  = ' ';
    for (int i = 0; i < 10; i++)
    {
        char *buf;
        size_t numElements = acquireWriteBuffer((void **) &buf);
        for (int j = 0; j < numElements; j++)
        {
            buf[j] = presentChar;
            presentChar < 127 ? presentChar++ : presentChar = ' ';
        }
    }


    running = false;
    th.join();

    return 0;
}

The output from running my program is as follows:

00000: acquireWriteBuffer _buf_count=-1
00000: acquireWriteBuffer _buf_count=0
00000:  !"#$%&'()
00503: acquireWriteBuffer _buf_count=0
00503: *+,-./0123
01006: acquireWriteBuffer _buf_count=0
01006: 456789:;<=
01511: acquireWriteBuffer _buf_count=0
01511: >?@ABCDEFG
02014: acquireWriteBuffer _buf_count=0
02014: HIJKLMNOPQ
02518: acquireWriteBuffer _buf_count=0
02518: RSTUVWXYZ[
03019: acquireWriteBuffer _buf_count=0
03019: \]^_`abcde
03519: acquireWriteBuffer _buf_count=0
03519: fghijklmno
04024: acquireWriteBuffer _buf_count=0
04024: pqrstuvwxy
05532: tx_loop timed out

The odd thing about my situation is that I know when the client wants to acquire a buffer, but I don't know when it has actually written to it. So, all I can do is assume that it's written to the last buffer it acquired once it requests another one. You'll see this in the comments about _buf_count starting at -1.

There are a few things I'm unsure about in my code:

  1. Is my use of _buf_count thread-safe?
  2. Is my code prone to deadlocks?
  3. Is my use of a single condition variable appropriate?
  4. Am I managing my mutexes well? Since I'm using wait_for, they automatically get unlocked, right?

Please indicate in a comment if you'd like me to change anything (i.e. add more comments, etc.) before you answer.