- The reactor possesses its own thread to wait for messages in an event loop.
- Users should be able to start or stop the reactor at any point. These two operations are not required to be thread-safe.
- Users should be able to asynchronously register or remove message handlers from the reactor.
- It is possible that users may register or remove any handler within the message handling routine of a handler.
NOTE: I use arbitrary delays to simulate input blocking. The test code is in main() and not as separate CPP UNIT tests, since this is not a requirement. As far as I know, this code works fine, but I can test it more thoroughly. Async logic is there assuming that the user of the reactor will call std::async(func-name).
My implementation:
#include <iostream>
#include <algorithm>
#include <chrono>
#include <thread>
#include <map>
#include <functional>
#include <mutex>
namespace reactor
{
struct Message
{
int32_t type;
char data[256];
};
// Define a few message types to simulate User event behaviour.
enum MessageTypes
{
OPEN = 0,
READ,
WRITE,
CLOSE,
REGISTER_HANDLER,
REMOVE_HANDLER
};
class IEventHandler
{
public:
virtual ~IEventHandler() {};
virtual void OnMessage(const Message* msg) = 0;
};
class EventHandler : public IEventHandler
{
public:
EventHandler() {};
EventHandler(const Message *msg)
{
fresh_msg = msg;
event_loop_start = true;
reactor_thread = std::thread(&EventHandler::MessageLoop, this);
}; // Take in the pointer to a message as an argument to the constructor.
bool RegisterEventHandler(int user_id, int message_type, std::function<void (void)> func)
{
std::lock_guard<std::mutex> lock(map_mutex);
handler_map[user_id][message_type] = func;
register_status_map[user_id][message_type] = true;
return register_status_map[user_id][message_type];
}
bool RemoveEventHandler(int user_id, int message_type)
{
std::lock_guard<std::mutex> lock(map_mutex);
handler_map[user_id].erase(message_type);
register_status_map[user_id][message_type] = false;
return register_status_map[user_id][message_type];
}
virtual void OnMessage(const Message* msg) override
{
fresh_msg = msg;
}
void startEventHandlerThread()
{
if (event_loop_start == false)
{
event_loop_start = true;
reactor_thread.join(); //Thread was already started and stopped. Restart it again.
reactor_thread = std::thread(&EventHandler::MessageLoop, this);
}
}
void stopEventHandlerThread()
{
event_loop_start = false;
}
std::function<void(void)> getHandler()
{
return register_handler;
}
void setHandler(std::function<void(void)> reg_handler)
{
register_handler = reg_handler;
}
~EventHandler()
{
if (reactor_thread.joinable())
{
reactor_thread.join();
}
};
private:
//Don't expose the actual message parsing logic to the end user.
void MessageLoop()
{
while (event_loop_start)
{
if (fresh_msg)
{
std::chrono::milliseconds wait_for_input((fresh_msg->type + 1) * 100);
std::this_thread::sleep_for(wait_for_input); // Simulate waiting for input, based on the message type, by putting the thread to sleep.
ParseMessage(fresh_msg);
}
else
{
std::cerr << "Invalid incoming message! msg pointer is NULL. " << std::endl;
}
}
}
void ParseMessage(const Message* msg)
{
std::lock_guard<std::mutex> lock(msg_mutex);
// Assume that user ID is obtained from the first 2 bytes of the message payload.
int user_id = (msg->data[1] << 8) | msg->data[0];
std::map<int, std::function<void(void)>> event_map;
if (msg->type == REGISTER_HANDLER)
{
if (this->getHandler())
{
RegisterEventHandler(user_id, msg->type, this->getHandler());
}
else
{
std::cerr << "Cannot register Event handler since it is NULL!" << std::endl;
}
}
else if (msg->type == REMOVE_HANDLER)
{
RemoveEventHandler(user_id, msg->type);
}
else
{
try
{
event_map = handler_map.at(user_id);
std::function<void(void)> handler = event_map.at(msg->type);
//Call the function registered by a specific user, for a specific message type.
handler();
}
catch (const std::out_of_range&)
{
// Further analysis can be done on which key failed, exactly.
std::cout << "One of the Keys " << user_id << " or " << msg->type << " not found" << std::endl;
}
}
}
const Message* fresh_msg{ nullptr };
std::thread reactor_thread;
std::function<void(void)> register_handler = nullptr;
std::atomic<bool> event_loop_start{ false };
std::map<int, std::map<int, std::function<void (void)>>> handler_map; // Create a std::map of std::map, containing user IDs and a map of message types and event handlers, for each user ID.
// User IDs can be set from outside.
std::map<int, std::map<int, bool>> register_status_map; // Create a second std::map of std::map, containing user IDs and a map of message types and boolean flags, for each user ID.
// This map indicates whether a handler has completed its task or not, by making the handler set the flag on completion.
std::mutex map_mutex;
std::mutex msg_mutex;
};
}
void TestRead1()
{
std::cout << "Reading from user1. " << std::endl;
}
void TestWrite1()
{
std::cout << "Writing from user1. " << std::endl;
}
void TestOpen1()
{
std::cout << "Opening a file from user1. " << std::endl;
}
void TestRead2()
{
std::cout << "Reading from user2. " << std::endl;
}
void TestWrite2()
{
std::cout << "Writing from user2. " << std::endl;
}
void TestOpen2()
{
std::cout << "Opening a file from user2. " << std::endl;
}
int main()
{
/* Simulating end users of a reactor. */
reactor::Message* msg_ptr;
reactor::Message message1 = { reactor::OPEN, {0,1,2,3} };
msg_ptr = &message1;
reactor::EventHandler event_handler(msg_ptr);
event_handler.startEventHandlerThread();
event_handler.RegisterEventHandler(256, reactor::READ, TestRead1);
event_handler.RegisterEventHandler(256, reactor::OPEN, TestOpen1);
event_handler.RegisterEventHandler(256, reactor::WRITE, TestWrite1);
reactor::Message message2 = { reactor::READ, {1,1,2,3} };
msg_ptr = &message2;
event_handler.RegisterEventHandler(257, reactor::READ, TestRead2);
event_handler.OnMessage(msg_ptr);
int count = 0;
while (1)
{
count++;
std::cout << "count: " << count << std::endl;
event_handler.OnMessage(msg_ptr);
if (count % 2 == 0)
{
message1.type = count % 3;
msg_ptr = &message1;
event_handler.RemoveEventHandler(((message1.data[1] << 8) | message1.data[0]), message1.type);
}
else
{
message2.type = count%7;
msg_ptr = &message2;
}
if (count >= 100 && count <= 200)
{
event_handler.stopEventHandlerThread();
}
if (count > 200)
{
event_handler.startEventHandlerThread();
}
if (count > 500)
{
event_handler.stopEventHandlerThread();
break;
}
std::chrono::milliseconds change_interval(20);
std::this_thread::sleep_for(change_interval);
}
}
```of a terminating code block - scroll to its end to see why.) \$\endgroup\$c++11tag. \$\endgroup\$