Skip to main content
1 of 3
Bula
  • 369
  • 2
  • 8

Correct paradigm for packet generation-consumption in cpp

Not sure if this belongs on Stack overflow more than it belongs here. Happy to move if the case.

I have the following simplification of a program which consists of 2 threads. One thread pushes packets to the back of a deque while another waits for user input before performing a "heavy" operation on the current deque (Here, we just look for the maximum id).

#include <iostream>
#include <thread>
#include <random>
#include <chrono>
#include <deque>
#include <string>
#include <mutex>

using namespace std;

struct Packet{
  int id;
  int size;
};

mutex deque_mutex;
deque<shared_ptr<Packet> > packet_deque;

void gen_packet(Packet* packet){
  // generate packets
  // packet size is 1-100 bytes
  // packet id is 0-1000
  random_device rd;
  mt19937 gen(rd());
  uniform_int_distribution<> size_dist(1, 100);
  uniform_int_distribution<> id_dist(0, 1000);
  int size = size_dist(gen);
  int id = id_dist(gen);
  packet->size = size;
  packet->id = id;
}

void packet_monitor(){
  // Gets a packet every 100th of a second
  // Add it to the deque
  // If the deque is full, remove the oldest packet
  // A deque is full if it has 1000 packets

  while(true){
    shared_ptr<Packet> packet = make_shared<Packet>();
    gen_packet(packet.get());
    unique_lock<mutex> lock(deque_mutex);
    if(packet_deque.size() == 1000){
      packet_deque.pop_front();
    }
    packet_deque.push_back(packet);
    lock.unlock();
    this_thread::sleep_for(chrono::milliseconds(10));
  }
}

void operate(){
  // Listens for a key stroke
  // It then looks for the packet with the biggest id
  while(true){
    string input;
    getline(std::cin, input);
    if(!input.empty()){
      int max_id = 0;
      int max_index_id = 0;
      unique_lock<mutex> lock(deque_mutex);
      deque<shared_ptr<Packet> > packet_deque_copy(packet_deque);
      lock.unlock();
      for(int i = 0; i < packet_deque_copy.size(); i++){
        if(packet_deque_copy[i]->id > max_id){
          max_id = packet_deque_copy[i]->id;
          max_index_id = i;
        }
      }
      cout << "Max id: " << max_id << " at " << max_index_id << "\n";
    }
  }
}

int main(){
  thread packet_monitor_thread(packet_monitor);
  thread operate_thread(operate);

  packet_monitor_thread.join();
  operate_thread.join();

  return 0;
}

I am not very familiar with cpp nor with concurrency, so I have some questions about the best way to solve this

  1. Do I have any memory leaks here? I am assuming the shared_ptr takes care of freeing the memories for packets that are deleted from the deque. Is that correct?
  2. I use a unique_lock here to try to tackle the potential deadlock that might arise from an exception being thrown inbetween the lock and unlock. Is there a more cpp way of doing this?
  3. I think that the locking and unlocking for every packet is a bit overkill. I can buffer 10 at a time and then push those. What are some of the implications of locking/unlocking so quickly?
  4. Here, I don't need to wait for a packet to come, but in the actual implementation I block until there is another packet available. Hence, I have both a blocking operation while waiting for input from the user as well as blocking for a packet to arrive. Am I introducing any potential deadlocks, or is there a better way to think about this?
Bula
  • 369
  • 2
  • 8