Skip to main content
edited title
Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238

Simple C++ thread pool, looking for possible pitfalls

Source Link
Nelarius
  • 93
  • 1
  • 1
  • 4

Simple C++ thread pool, looking for possible pitfalls

I wrote a minimalistic thread pool using some C++11 features. I've used it on two projects so far and it has worked fine, but when I ran the code on my laptop, I believe that my thread pool might be entering some kind of dead-locked state -- occasionally the CPU activity drops to almost nothing, and the program hangs.

What are your opinions on the code?

My header file:

#include <utils/Uncopyable.h>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>


/// \brief Use this class to run tasks in parallel.
class ThreadPool : private Uncopyable {
public:
    ThreadPool();
    ThreadPool( size_t threads );
    ~ThreadPool();

    /// \brief Initialize the ThreadPool with a number of threads.
    /// This method does nothing if the thread pool is already running,
    /// i.e. ThreadPool( size_t ) was called.
    void initializeWithThreads( size_t threads );

    /// \brief Schedule a task to be executed by a thread immediately.
    void schedule( const std::function<void()>& );

    /// \brief a blocking function that waits until the threads have processed all the tasks in the queue.
    void wait() const;

private:
    std::vector<std::thread>            _workers;
    std::queue<std::function<void()>>   _taskQueue;
    std::atomic_uint                    _taskCount;
    std::mutex                          _mutex;
    std::condition_variable             _condition;
    std::atomic_bool                    _stop;
};

And my implementation is as follows:

#include <utils/ThreadPool.h>
#include <chrono>

ThreadPool::ThreadPool()
    :   _workers(),
        _taskQueue(),
        _taskCount( 0u ),
        _mutex(),
        _condition(),
        _stop( false ) {}

ThreadPool::ThreadPool( size_t threads ) : ThreadPool() {
    initializeWithThreads( threads );
}

ThreadPool::~ThreadPool() {
    _stop = true;
    _condition.notify_all();
    for ( std::thread& w: _workers ) {
        w.join();
    }
}

void ThreadPool::initializeWithThreads( size_t threads ) {
    for ( size_t i = 0; i < threads; i++ ) {
        //each thread executes this lambda
        _workers.emplace_back( [this]() -> void {
            while (true) {
                std::function<void()> task;
                {   //acquire lock
                    std::unique_lock<std::mutex> lock( _mutex );
                    _condition.wait( lock, [this]() -> bool {
                        return !_taskQueue.empty() || _stop;
                    });

                    if ( _stop && _taskQueue.empty() ) {
                        return;
                    }

                    task = std::move( _taskQueue.front() );
                    _taskQueue.pop();
                }   //release lock
                task();
                _taskCount--;
            }   //while
        });
    }   //for
}

void ThreadPool::schedule( const std::function<void()>& task ) {
    {
        std::unique_lock<std::mutex> lock( _mutex );
        _taskQueue.push( task );
    }
    _taskCount++;
    _condition.notify_one();
}

void ThreadPool::wait() const {
    while ( _taskCount != 0u ) {
        std::this_thread::sleep_for( std::chrono::microseconds(1) );
    }
}