I'm just dusting off my C++ knowledge in area of multithreading. I started with implementing a producer-consumer pattern inspired by https://jenkov.com/tutorials/java-util-concurrent/blockingqueue.html.
Here's the code:
#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <random>
using namespace std::chrono_literals;
using engine = std::mt19937;
template <typename Task>
class BlockingQueue
{
public:
BlockingQueue() = default;
void put(Task t) {
{
std::unique_lock lock{mtx};
canPut.wait(lock, [this]{return tasks.size() < sizeLimit; });
tasks.push_front(t);
std::cout << "Tasks: " << tasks.size() << "\n";
}
canTake.notify_one();
}
Task take() {
std::unique_lock lock{mtx};
canTake.wait(lock, [this]{ return !tasks.empty(); });
Task taken {std::move(tasks.back())};
tasks.pop_back();
// std::cout << "Task: " << taken << " taken by thread: " << std::this_thread::get_id() << "\n";
std::cout << "Tasks: " << tasks.size() << "\n";
return taken;
}
private:
std::mutex mtx;
std::condition_variable canPut;
std::condition_variable canTake;
int sizeLimit {50};
std::deque<Task> tasks;
};
int main() {
std::random_device os_seed;
const auto seed = os_seed();
engine generator{seed};
std::uniform_int_distribution<uint32_t> distribute(0, 10);
BlockingQueue<int> q;
std::atomic_int i = 0;
std::jthread prod {
[&]{
while(true){
q.put(i++);
auto sleepTime =
std::chrono::seconds(distribute(generator));
std::this_thread::sleep_for(sleepTime);
}
}
};
std::jthread prod2 {
[&]{
while(true){
q.put(i++);
auto sleepTime =
std::chrono::seconds(distribute(generator));
std::this_thread::sleep_for(sleepTime);
}
}
};
std::jthread consumer {
[&]{
while(true){
auto task = q.take();
auto sleepTime =
std::chrono::seconds(distribute(generator));
std::this_thread::sleep_for(sleepTime);
}
}
};
return 0;
}
notify_oneforcanPut, so theputcall can enter an infinite wait. \$\endgroup\$