Here you can see a mediocre code of mine where I have fun with four deques, each with a different priority.
Here is how it works: in a while loop I generate a random number between 1 (highest priority) and 4 (lowest priority), and this number tells me in which deque I will insert the new element (by insert(int num, int fd)).
Before the while loop, a remover thread is detached with remove(void) method: this thread will remove from the highest priority maxQ deque (if there are any elements), then from medQ and so on to the lowest one, `unknownQ'.
Apparently it works: I know there is a matter of starvation since it will always remove elements with higher priorities; what if deque maxQ is always full of elements and minQ has few elements? Yep, deques with lower priorities will never be updated.
For now the elements to insert are randomly generated and this assures me that, sooner or later, all elements from all deques will be removed.
But this is not my problem: I just hate the way insert and remove methods are designed. They are (to me) ugly and redundant: the code is repeated, almost identical, for each deque.
I come from C and I still don't know the true power of C++: what suggestions do you provide to improve insert and remove?
Compiled with:
g++ -std=c++11 -o funwithmultideque funwithmultideque.cpp -pthread
The code:
#include <iostream>
#include <string>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <chrono>
#include <thread>
#include <random>
#define MAX_QUEUE_SIZE 100
#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"
#define debug_default(...) std::cout << __VA_ARGS__ << DEFAULTCOLOR << '\n' << std::flush;
#define debug_red(...) std::cout << RED << __VA_ARGS__ << DEFAULTCOLOR << '\n' << std::flush;
#define debug_yellow(...) std::cout << YELLOW << __VA_ARGS__ << DEFAULTCOLOR << '\n' << std::flush;
#define debug_green(...) std::cout << GREEN << __VA_ARGS__ << DEFAULTCOLOR << '\n' << std::flush;
// this is the element the deques will contain
typedef struct info_connection {
int fd;
std::chrono::time_point<std::chrono::system_clock> start;
} info_conn;
class QueuesManager {
public:
void initWorkerThread(void);
void remove(void);
void insert(int num, int fd);
// for debug
void printQueues(int flag);
private:
std::thread threadRead; // remover thread
std::mutex m1, m2, m3, m4;
// read and write conditions for each deque
std::condition_variable w1, r1, w2, r2, w3, r3, w4, r4;
std::deque<info_conn> maxQ, medQ, minQ, unknownQ;
};
void QueuesManager::printQueues(int flag) {
// show deques after inserting
if (flag == 1) {
debug_green(maxQ.size() << ' ' << medQ.size() << ' ' <<
minQ.size() << ' ' << unknownQ.size());
}
// show deques after removing
else {
debug_yellow(maxQ.size() << ' ' << medQ.size() << ' ' <<
minQ.size() << ' ' << unknownQ.size());
}
}
void QueuesManager::insert(int num, int fd) {
info_conn ic;
switch(num) {
case 1: {
std::unique_lock<std::mutex> locker(m1);
w1.wait(locker, [this] () { return maxQ.size() < MAX_QUEUE_SIZE; });
ic.start = std::chrono::system_clock::now();
ic.fd = fd;
maxQ.push_back(ic);
printQueues(1);
r1.notify_one();
break;
}
case 2: {
std::unique_lock<std::mutex> locker(m2);
w2.wait(locker, [this] () { return medQ.size() < MAX_QUEUE_SIZE; });
ic.start = std::chrono::system_clock::now();
ic.fd = fd;
medQ.push_back(ic);
printQueues(1);
r2.notify_one();
break;
}
case 3: {
std::unique_lock<std::mutex> locker(m3);
w3.wait(locker, [this] () { return minQ.size() < MAX_QUEUE_SIZE; });
ic.start = std::chrono::system_clock::now();
ic.fd = fd;
minQ.push_back(ic);
printQueues(1);
r3.notify_one();
break;
}
case 4: {
std::unique_lock<std::mutex> locker(m4);
w4.wait(locker, [this] () { return unknownQ.size() < MAX_QUEUE_SIZE; });
ic.start = std::chrono::system_clock::now();
ic.fd = fd;
unknownQ.push_back(ic);
printQueues(1);
r4.notify_one();
break;
}
default: {
std::cout << "You shouldn't be here\n" << std::flush;
break;
}
}
}
void QueuesManager::remove(void) {
while(true) {
info_conn ic;
if (maxQ.size() > 0) {
std::unique_lock<std::mutex> lck(m1);
r1.wait(lck, [this] () { return maxQ.size() > 0; });
ic = maxQ.front();
maxQ.pop_front();
printQueues(0);
w1.notify_one();
continue;
}
if (medQ.size() > 0) {
std::unique_lock<std::mutex> lck(m2);
r2.wait(lck, [this] () { return medQ.size() > 0; });
ic = medQ.front();
medQ.pop_front();
printQueues(0);
w2.notify_one();
continue;
}
if (minQ.size() > 0) {
std::unique_lock<std::mutex> lck(m3);
r3.wait(lck, [this] () { return minQ.size() > 0; });
ic = minQ.front();
minQ.pop_front();
printQueues(0);
w3.notify_one();
continue;
}
if (unknownQ.size() > 0) {
std::unique_lock<std::mutex> lck(m4);
r4.wait(lck, [this] () { return unknownQ.size() > 0; });
ic = unknownQ.front();
unknownQ.pop_front();
printQueues(0);
w4.notify_one();
continue;
}
}
}
void QueuesManager::initWorkerThread(void) {
threadRead = std::thread(&QueuesManager::remove, this);
threadRead.detach();
}
int main(void) {
int randomNum = 0;
int fd = 0;
QueuesManager qm;
std::default_random_engine eng((std::random_device())());
std::uniform_int_distribution<int> randomPrio(1, 4);
qm.initWorkerThread();
while (true) {
fd++;
randomNum = randomPrio(eng);
qm.insert(randomNum, fd);
}
return 0;
}