Skip to main content
added 2 characters in body
Source Link
toolic
  • 15.8k
  • 6
  • 29
  • 217
#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <random>

using namespace std::chrono_literals;
using engine = std::mt19937;

template <typename Task>
class BlockingQueue
{
public:
    BlockingQueue() = default;

    void put(Task t) {
        {
            std::unique_lock lock{mtx};
            canPut.wait(lock, [this]{return tasks.size() < sizeLimit; });
            tasks.push_front(t);
            std::cout << "Tasks: " << tasks.size() << "\n";
        }
        canTake.notify_one();
    }

    Task take() {
        std::unique_lock lock{mtx};
        canTake.wait(lock, [this]{ return !tasks.empty(); });
        Task taken {std::move(tasks.back())};
        tasks.pop_back();
        // std::cout << "Task: " << taken << " taken by thread: " << std::this_thread::get_id() << "\n";
        std::cout << "Tasks: " << tasks.size() << "\n";
        return taken;
    }

private:
    std::mutex mtx;
    std::condition_variable canPut;
    std::condition_variable canTake;
    int sizeLimit {50};
    std::deque<Task> tasks;
};


int main() {

    std::random_device os_seed;
    const auto seed = os_seed();
    engine generator{seed};
    std::uniform_int_distribution<uint32_t> distribute(0, 10);

    BlockingQueue<int> q;

    std::atomic_int i = 0;
    std::jthread prod {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread prod2 {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread consumer {
        [&]{
            while(true){
                auto task = q.take();
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
    }
    };
    return 0;
}
```
#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <random>

using namespace std::chrono_literals;
using engine = std::mt19937;

template <typename Task>
class BlockingQueue
{
public:
    BlockingQueue() = default;

    void put(Task t) {
        {
            std::unique_lock lock{mtx};
            canPut.wait(lock, [this]{return tasks.size() < sizeLimit; });
            tasks.push_front(t);
            std::cout << "Tasks: " << tasks.size() << "\n";
        }
        canTake.notify_one();
    }

    Task take() {
        std::unique_lock lock{mtx};
        canTake.wait(lock, [this]{ return !tasks.empty(); });
        Task taken {std::move(tasks.back())};
        tasks.pop_back();
        // std::cout << "Task: " << taken << " taken by thread: " << std::this_thread::get_id() << "\n";
        std::cout << "Tasks: " << tasks.size() << "\n";
        return taken;
    }

private:
    std::mutex mtx;
    std::condition_variable canPut;
    std::condition_variable canTake;
    int sizeLimit {50};
    std::deque<Task> tasks;
};


int main() {

    std::random_device os_seed;
    const auto seed = os_seed();
    engine generator{seed};
    std::uniform_int_distribution<uint32_t> distribute(0, 10);

    BlockingQueue<int> q;

    std::atomic_int i = 0;
    std::jthread prod {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread prod2 {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread consumer {
        [&]{
            while(true){
                auto task = q.take();
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
    }
    };
    return 0;
}
```
#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <random>

using namespace std::chrono_literals;
using engine = std::mt19937;

template <typename Task>
class BlockingQueue
{
public:
    BlockingQueue() = default;

    void put(Task t) {
        {
            std::unique_lock lock{mtx};
            canPut.wait(lock, [this]{return tasks.size() < sizeLimit; });
            tasks.push_front(t);
            std::cout << "Tasks: " << tasks.size() << "\n";
        }
        canTake.notify_one();
    }

    Task take() {
        std::unique_lock lock{mtx};
        canTake.wait(lock, [this]{ return !tasks.empty(); });
        Task taken {std::move(tasks.back())};
        tasks.pop_back();
        // std::cout << "Task: " << taken << " taken by thread: " << std::this_thread::get_id() << "\n";
        std::cout << "Tasks: " << tasks.size() << "\n";
        return taken;
    }

private:
    std::mutex mtx;
    std::condition_variable canPut;
    std::condition_variable canTake;
    int sizeLimit {50};
    std::deque<Task> tasks;
};


int main() {

    std::random_device os_seed;
    const auto seed = os_seed();
    engine generator{seed};
    std::uniform_int_distribution<uint32_t> distribute(0, 10);

    BlockingQueue<int> q;

    std::atomic_int i = 0;
    std::jthread prod {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread prod2 {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread consumer {
        [&]{
            while(true){
                auto task = q.take();
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
    }
    };
    return 0;
}
Source Link

A Simple BlockingQueue implementation in C++

I'm just dusting off my C++ knowledge in area of multithreading. I started with implementing a producer-consumer pattern inspired by https://jenkov.com/tutorials/java-util-concurrent/blockingqueue.html.

Here's the code:

#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <random>

using namespace std::chrono_literals;
using engine = std::mt19937;

template <typename Task>
class BlockingQueue
{
public:
    BlockingQueue() = default;

    void put(Task t) {
        {
            std::unique_lock lock{mtx};
            canPut.wait(lock, [this]{return tasks.size() < sizeLimit; });
            tasks.push_front(t);
            std::cout << "Tasks: " << tasks.size() << "\n";
        }
        canTake.notify_one();
    }

    Task take() {
        std::unique_lock lock{mtx};
        canTake.wait(lock, [this]{ return !tasks.empty(); });
        Task taken {std::move(tasks.back())};
        tasks.pop_back();
        // std::cout << "Task: " << taken << " taken by thread: " << std::this_thread::get_id() << "\n";
        std::cout << "Tasks: " << tasks.size() << "\n";
        return taken;
    }

private:
    std::mutex mtx;
    std::condition_variable canPut;
    std::condition_variable canTake;
    int sizeLimit {50};
    std::deque<Task> tasks;
};


int main() {

    std::random_device os_seed;
    const auto seed = os_seed();
    engine generator{seed};
    std::uniform_int_distribution<uint32_t> distribute(0, 10);

    BlockingQueue<int> q;

    std::atomic_int i = 0;
    std::jthread prod {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread prod2 {
        [&]{
            while(true){
                q.put(i++);
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
        }
    };
    std::jthread consumer {
        [&]{
            while(true){
                auto task = q.take();
                auto sleepTime = 
                    std::chrono::seconds(distribute(generator));
                std::this_thread::sleep_for(sleepTime);
            }
    }
    };
    return 0;
}
```