I'm writing several local servers which have almost the same code in main.cpp. Appreciate your comments, improvement suggestions and especially notes on potential memory leaks since the services are supposed to run 24/7 and process large requests. Thanks!
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()
void pullRequests();
const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";
RequestManager service; //Does the actual processing of the request
std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;
int main()
{
UdsServer app; //Server listening on a Unix Domain Socket
try
{
app.createServer(pathToSocket);
}
catch (const std::string & err)
{
MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);
return -1;
}
unsigned n_concThreads = std::thread::hardware_concurrency();
if (!n_concThreads) //if the query failed...
{
std::ifstream cpuinfo("/proc/cpuinfo");
n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));
if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
}
for (int i = 0; i < n_concThreads; ++i)
{
std::thread t (pullRequests);
t.detach();
}
while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
{
std::string command = app.getMsg (clientConnection); //Uses read() internally
if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")
{
app.closeConnection(clientConnection);
return 0;
}
else
{
{ //Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);
requestQueue[clientConnection] = std::move(command);
gotNewRequests = true;
}
processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
}
}
}
void pullRequests()
{
UnixDomainSocket uds;
std::unique_lock<std::mutex> writeLock(requestQueue_mutex);
while (true) //Let the thread run "forever"
{
while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);
std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();
gotNewRequests = false;
writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more
if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)
{
std::string response = service.pullRequests(queueCopy.cbegin()->second);
if (response.length())
{
auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
}
if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
}
else //Multiplex
{
std::unordered_map<std::string, std::vector<int>> multiplexedRequests;
for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);
for (const auto & request : multiplexedRequests)
{
std::string response = service.pullRequests(request.first);
if (response.length())
for (auto socket : request.second)
{
auto sendResult = uds.sendMsg(socket, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);
if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
}
}
}
writeLock.lock();
}
}
Node.jsjumps to mind as doneengnx. Also why would you not use a standard (HTTP) over HTTPS well documented well understood protocol that already has tools for debugging and maintenance. \$\endgroup\$