Description of the code itself:
The code provides asynchronous IPC functionality in c++C++ using the boostBoost libraries and pipes.
Each process asynchronously "listens" on the read-end end of the pipe and the messages are added to a queue, which means you won't have to call readread() manually,: you just have to send messages and they're automatically received on the other end.
I also plan to add a feature which enables you to send the name of a function and it'sits arguments to the other process to call it remotely.
Unrelated question, could this be of use to anyone? shouldShould I put it up on githubGitHub?
example_server.cpp
The code:
example_server.cpp
example_client.cpp
example_client.cpp
Connector.hpp
Connector.hpp
Connector.cpp
Connector.cpp
#include "Connector.hpp"
#include <boost/asio/post.hpp>
#include <boost/bind/bind.hpp>
#include <boost/bind/placeholders.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/read.hpp>
#include <iostream>
using namespace std;
using namespace boost::placeholders;
Connector::Connector(int reader_fd, int writer_fd, int numThreads)
{
// Initialize boost related objects that will help us
// perform asynchronous io operations.
// The io_service objects provides async_io functionality.
io_service = make_shared<boost::asio::io_service>();
// The "work" object doesn't allow io_service to
// exit when it runs out of tasks.
work = make_shared<boost::asio::io_service::work>(*io_service);
// Create pipe objects used for communication, from native pipe
// handles.
InitializeCommMethod(reader_fd, writer_fd);
threads = make_shared<boost::asio::thread_pool>(numThreads);
// Post io_service->run to the thread pool.
boost::asio::post(*threads, [this]()
{
this->io_service->run();
});
// Perform a number of predefined tasks.
PerformTasks();
}
Connector::~Connector()
{
// When the destructor is called, it doesn't allow the main thread
// to exit until all the worker threads have finished.
threads->join();
}
void Connector::CleanUp()
{
// Allow the worker threads to exit by
// removing the work object.
work.reset();
threads->join();
}
void Connector::SendData(const string& data)
{
// First sends the size of the message, then sends
// the message itself.
int32_t size = data.length();
boost::asio::async_write(*writer, boost::asio::buffer((void*)&size, 4), boost::bind(&Connector::SendHandler, this, _1, _2));
boost::asio::async_write(*writer, boost::asio::buffer(data), boost::bind(&Connector::SendHandler, this, _1, _2));
}
void Connector::SendHandler(const boost::system::error_code& ec, size_t bytes)
{
// Gets called whenever we write to the pipe.
return;
}
void Connector::Receive(int32_t bytes)
{
// Allocate an appropriately sized string ("buffer"),
// read "bytes" bytes from the pipe and put it in
// said string.
// OR, read the size of the incoming message and put it
// in "size_of_incoming_buffer".
buffer = string(bytes, '\0');
if(getting_size)
{
boost::asio::async_read(*reader, boost::asio::buffer((void*)&size_of_incoming_buffer, 4), boost::bind(&Connector::ReceiveHandler, this, _1, _2));
return;
}
boost::asio::async_read(*reader, boost::asio::buffer(buffer), boost::bind(&Connector::ReceiveHandler, this, _1, _2));
}
void Connector::ReceiveHandler(const boost::system::error_code& ec, size_t bytes)
{
// If we've already received the size of the incoming message,
// read "size_of_incoming_buffer" bytes from the pipe, which is the
// size of the message.
if(getting_size)
{
getting_size = false;
Receive(size_of_incoming_buffer);
return;
}
// If getting_size is false, then the last thing we received
// was the message, which is currently in "buffer".
// We push the message to "data_queue", and wait for the size
// of the next message.
getting_size = true;
data_queue.push(buffer);
// If the message is "die", exit.
if(buffer == "die")
{
// Tell the other process to exit as well.
SendData("die");
CleanUp();
return;
}
// Call the user provided handler function for when we receive
// a new message.
io_service->post(boost::bind(&Connector::CallHandler, this));
// Wait for the size of the next message.
// (The default value of the "Receive" function is 4 bytes, or an int32_t).
Receive();
}
void Connector::InitializeCommMethod(int reader_fd, int writer_fd)
{
// Create boost pipe objects from native pipe handles.
reader = make_shared<boost::asio::readable_pipe>(*io_service, reader_fd);
writer = make_shared<boost::asio::writable_pipe>(*io_service, writer_fd);
}
void Connector::PerformTasks()
{
// Receive the size of the first message.
Receive();
}
void Connector::AssignHandler(const function<void(const string&)>& func)
{
// Assign the user provided function to "handler".
handler = func;
// Run "handler" on all pending message, received before the user
// provided a handler function.
PerformPending();
}
void Connector::CallHandler()
{
// If handler is provided, run it on the
// last message in the queue.
if(handler)
{
handler(data_queue.front());
data_queue.pop();
}
// Otherwise, add the message to the pending queue.
else
{
pending.push(data_queue.front());
data_queue.pop();
}
}
void Connector::PerformPending()
{
// Gets called when the user provides a handler.
// Runs the handler on all pending messages.
string pending_message;
for(int i = 0; i < pending.size(); i++)
{
pending_message = pending.front();
pending.pop();
io_service->post([this, pending_message]()
{
this->handler(pending_message);
});
}
}
```