@@ -227,9 +227,6 @@ de265_image::de265_image()
nThreadsBlocked = 0;
nThreadsFinished = 0;
nThreadsTotal = 0;
-
- de265_mutex_init(&mutex);
- de265_cond_init(&finished_cond);
}
@@ -489,9 +486,6 @@ de265_image::~de265_image()
if (ctb_progress) {
delete[] ctb_progress;
}
-
- de265_cond_destroy(&finished_cond);
- de265_mutex_destroy(&mutex);
}
@@ -685,7 +679,7 @@ void de265_image::exchange_pixel_data_with(de265_image& b)
void de265_image::thread_start(int nThreads)
{
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
//printf("nThreads before: %d %d\n",nThreadsQueued, nThreadsTotal);
@@ -693,51 +687,47 @@ void de265_image::thread_start(int nThreads)
nThreadsTotal += nThreads;
//printf("nThreads after: %d %d\n",nThreadsQueued, nThreadsTotal);
-
- de265_mutex_unlock(&mutex);
}
void de265_image::thread_run(const thread_task* task)
{
+ std::unique_lock<std::mutex> lock(mutex);
+
//printf("run thread %s\n", task->name().c_str());
- de265_mutex_lock(&mutex);
nThreadsQueued--;
nThreadsRunning++;
- de265_mutex_unlock(&mutex);
}
void de265_image::thread_blocks()
{
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
+
nThreadsRunning--;
nThreadsBlocked++;
- de265_mutex_unlock(&mutex);
}
void de265_image::thread_unblocks()
{
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
+
nThreadsBlocked--;
nThreadsRunning++;
- de265_mutex_unlock(&mutex);
}
void de265_image::thread_finishes(const thread_task* task)
{
//printf("finish thread %s\n", task->name().c_str());
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
nThreadsRunning--;
nThreadsFinished++;
assert(nThreadsRunning >= 0);
if (nThreadsFinished==nThreadsTotal) {
- de265_cond_broadcast(&finished_cond, &mutex);
+ finished_cond.notify_all();
}
-
- de265_mutex_unlock(&mutex);
}
void de265_image::wait_for_progress(thread_task* task, int ctbx,int ctby, int progress)
@@ -772,11 +762,11 @@ void de265_image::wait_for_progress(thread_task* task, int ctbAddrRS, int progre
void de265_image::wait_for_completion()
{
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
+
while (nThreadsFinished!=nThreadsTotal) {
- de265_cond_wait(&finished_cond, &mutex);
+ finished_cond.wait(lock);
}
- de265_mutex_unlock(&mutex);
}
bool de265_image::debug_is_completed() const
int de265_thread_create(de265_thread* t, void *(*start_routine) (void *), void *arg) { return pthread_create(t,NULL,start_routine,arg); }
void de265_thread_join(de265_thread t) { pthread_join(t,NULL); }
void de265_thread_destroy(de265_thread* t) { }
-void de265_mutex_init(de265_mutex* m) { pthread_mutex_init(m,NULL); }
-void de265_mutex_destroy(de265_mutex* m) { pthread_mutex_destroy(m); }
-void de265_mutex_lock(de265_mutex* m) { pthread_mutex_lock(m); }
-void de265_mutex_unlock(de265_mutex* m) { pthread_mutex_unlock(m); }
-void de265_cond_init(de265_cond* c) { pthread_cond_init(c,NULL); }
-void de265_cond_destroy(de265_cond* c) { pthread_cond_destroy(c); }
-void de265_cond_broadcast(de265_cond* c,de265_mutex* m) { pthread_cond_broadcast(c); }
-void de265_cond_wait(de265_cond* c,de265_mutex* m) { pthread_cond_wait(c,m); }
-void de265_cond_signal(de265_cond* c) { pthread_cond_signal(c); }
#else // _WIN32
#define THREAD_RESULT_TYPE DWORD
@@ -66,37 +57,16 @@ int de265_thread_create(de265_thread* t, LPTHREAD_START_ROUTINE start_routine,
}
void de265_thread_join(de265_thread t) { WaitForSingleObject(t, INFINITE); }
void de265_thread_destroy(de265_thread* t) { CloseHandle(*t); *t = NULL; }
-void de265_mutex_init(de265_mutex* m) { *m = CreateMutex(NULL, FALSE, NULL); }
-void de265_mutex_destroy(de265_mutex* m) { CloseHandle(*m); }
-void de265_mutex_lock(de265_mutex* m) { WaitForSingleObject(*m, INFINITE); }
-void de265_mutex_unlock(de265_mutex* m) { ReleaseMutex(*m); }
-void de265_cond_init(de265_cond* c) { win32_cond_init(c); }
-void de265_cond_destroy(de265_cond* c) { win32_cond_destroy(c); }
-void de265_cond_broadcast(de265_cond* c,de265_mutex* m)
-{
- de265_mutex_lock(m);
- win32_cond_broadcast(c);
- de265_mutex_unlock(m);
-}
-void de265_cond_wait(de265_cond* c,de265_mutex* m) { win32_cond_wait(c,m); }
-void de265_cond_signal(de265_cond* c) { win32_cond_signal(c); }
#endif // _WIN32
-
-
de265_progress_lock::de265_progress_lock()
{
mProgress = 0;
-
- de265_mutex_init(&mutex);
- de265_cond_init(&cond);
}
de265_progress_lock::~de265_progress_lock()
{
- de265_mutex_destroy(&mutex);
- de265_cond_destroy(&cond);
}
void de265_progress_lock::wait_for_progress(int progress)
@@ -105,34 +75,29 @@ void de265_progress_lock::wait_for_progress(int progress)
return;
}
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
while (mProgress < progress) {
- de265_cond_wait(&cond, &mutex);
+ cond.wait(lock);
}
- de265_mutex_unlock(&mutex);
}
void de265_progress_lock::set_progress(int progress)
{
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
if (progress>mProgress) {
mProgress = progress;
- de265_cond_broadcast(&cond, &mutex);
+ cond.notify_all();
}
-
- de265_mutex_unlock(&mutex);
}
void de265_progress_lock::increase_progress(int progress)
{
- de265_mutex_lock(&mutex);
+ std::unique_lock<std::mutex> lock(mutex);
mProgress += progress;
- de265_cond_broadcast(&cond, &mutex);
-
- de265_mutex_unlock(&mutex);
+ cond.notify_all();
}
int de265_progress_lock::get_progress() const
@@ -191,42 +156,42 @@ static THREAD_RESULT_TYPE THREAD_CALLING_CONVENTION worker_thread(THREAD_PARAM_T
thread_pool* pool = (thread_pool*)pool_ptr;
- de265_mutex_lock(&pool->mutex);
-
while(true) {
- // wait until we can pick a task or until the pool has been stopped
+ thread_task* task = nullptr;
- for (;;) {
- // end waiting if thread-pool has been stopped or we have a task to execute
+ {
+ std::unique_lock<std::mutex> lock(pool->mutex);
- if (pool->stopped || pool->tasks.size()>0) {
- break;
- }
+ // wait until we can pick a task or until the pool has been stopped
- //printf("going idle\n");
- de265_cond_wait(&pool->cond_var, &pool->mutex);
- }
+ for (;;) {
+ // end waiting if thread-pool has been stopped or we have a task to execute
- // if the pool was shut down, end the execution
+ if (pool->stopped || pool->tasks.size()>0) {
+ break;
+ }
- if (pool->stopped) {
- de265_mutex_unlock(&pool->mutex);
- return (THREAD_RESULT_TYPE)0;
- }
+ //printf("going idle\n");
+ pool->cond_var.wait(lock);
+ }
+ // if the pool was shut down, end the execution
- // get a task
+ if (pool->stopped) {
+ return (THREAD_RESULT_TYPE)0;
+ }
- thread_task* task = pool->tasks.front();
- pool->tasks.pop_front();
- pool->num_threads_working++;
+ // get a task
- //printblks(pool);
+ task = pool->tasks.front();
+ pool->tasks.pop_front();
- de265_mutex_unlock(&pool->mutex);
+ pool->num_threads_working++;
+ //printblks(pool);
+ }
// execute the task
@@ -234,11 +199,11 @@ static THREAD_RESULT_TYPE THREAD_CALLING_CONVENTION worker_thread(THREAD_PARAM_T
// end processing and check if this was the last task to be processed
- de265_mutex_lock(&pool->mutex);
+ // TODO: the num_threads_working can probably be an atomic integer
+ std::unique_lock<std::mutex> lock(pool->mutex);
pool->num_threads_working--;
}
- de265_mutex_unlock(&pool->mutex);
return (THREAD_RESULT_TYPE)0;
}
@@ -257,13 +222,12 @@ de265_error start_thread_pool(thread_pool* pool, int num_threads)
pool->num_threads = 0; // will be increased below
- de265_mutex_init(&pool->mutex);
- de265_cond_init(&pool->cond_var);
+ {
+ std::unique_lock<std::mutex> lock(pool->mutex);
- de265_mutex_lock(&pool->mutex);
- pool->num_threads_working = 0;
- pool->stopped = false;
- de265_mutex_unlock(&pool->mutex);
+ pool->num_threads_working = 0;
+ pool->stopped = false;
+ }
// start worker threads
@@ -283,32 +247,30 @@ de265_error start_thread_pool(thread_pool* pool, int num_threads)
void stop_thread_pool(thread_pool* pool)
{
- de265_mutex_lock(&pool->mutex);
- pool->stopped = true;
- de265_mutex_unlock(&pool->mutex);
+ {
+ std::unique_lock<std::mutex> lock(pool->mutex);
+ pool->stopped = true;
+ }
- de265_cond_broadcast(&pool->cond_var, &pool->mutex);
+ pool->cond_var.notify_all();
for (int i=0;i<pool->num_threads;i++) {
de265_thread_join(pool->thread[i]);
de265_thread_destroy(&pool->thread[i]);
}
-
- de265_mutex_destroy(&pool->mutex);
- de265_cond_destroy(&pool->cond_var);
}
void add_task(thread_pool* pool, thread_task* task)
{
- de265_mutex_lock(&pool->mutex);
+ std::unique_lock<std::mutex> lock(pool->mutex);
+
if (!pool->stopped) {
pool->tasks.push_back(task);
// wake up one thread
- de265_cond_signal(&pool->cond_var);
+ pool->cond_var.notify_one();
}
- de265_mutex_unlock(&pool->mutex);
}
#include <pthread.h>
typedef pthread_t de265_thread;
-typedef pthread_mutex_t de265_mutex;
-typedef pthread_cond_t de265_cond;
#else // _WIN32
#if !defined(NOMINMAX)
@@ -49,10 +47,14 @@ typedef pthread_cond_t de265_cond;
#endif
typedef HANDLE de265_thread;
-typedef HANDLE de265_mutex;
-typedef win32_cond_t de265_cond;
#endif // _WIN32
+#include <mutex>
+#include <condition_variable>
+//#include <thread>
+
+//typedef pthread_t de265_thread;
+
#ifndef _WIN32
int de265_thread_create(de265_thread* t, void *(*start_routine) (void *), void *arg);
#else
@@ -60,16 +62,6 @@ int de265_thread_create(de265_thread* t, LPTHREAD_START_ROUTINE start_routine,
#endif
void de265_thread_join(de265_thread t);
void de265_thread_destroy(de265_thread* t);
-void de265_mutex_init(de265_mutex* m);
-void de265_mutex_destroy(de265_mutex* m);
-void de265_mutex_lock(de265_mutex* m);
-void de265_mutex_unlock(de265_mutex* m);
-void de265_cond_init(de265_cond* c);
-void de265_cond_destroy(de265_cond* c);
-void de265_cond_broadcast(de265_cond* c, de265_mutex* m);
-void de265_cond_wait(de265_cond* c,de265_mutex* m);
-void de265_cond_signal(de265_cond* c);
-
class de265_progress_lock
{
@@ -88,8 +80,8 @@ private:
// private data
- de265_mutex mutex;
- de265_cond cond;
+ std::mutex mutex;
+ std::condition_variable cond;
};
@@ -131,8 +123,8 @@ class thread_pool
int ctbx[MAX_THREADS]; // the CTB the thread is working on
int ctby[MAX_THREADS];
- de265_mutex mutex;
- de265_cond cond_var;
+ std::mutex mutex;
+ std::condition_variable cond_var;
};