Recently I am working on implementing a shared-memory based IPC message queue in C programming language on Linux system. A few design choices I've made include
- The queue will have only 1 producer, possibly 1 or more consumers. (Currently my implementation is only 1 producer 1 consumer yet)
- The queue will be bounded in size and wrap around in a circular fashion
- When the consumers are lagging behind, the producer will wait until the queue is drained
I have the basic prototype working and wrap it in an reusable API. I am seeking code review improvement advice both on
- How to make the code be written better in term of readability and reusability
- How to make the code more performant
Here is the full code, including a benchmark harness:
- header file: spmc_queue.h
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdalign.h>
#define SPMC_QUEUE_VERSION 0
#define L1_DCACHE_LINESIZE 64               // found via /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
typedef _Atomic int64_t atomic_int64_t;
enum spmc_mode { spmc_mode_reader = 0, spmc_mode_writer };
typedef struct spmc_header {
  int fd;
  char *path;
  enum spmc_mode mode;
  size_t shared_size;
} spmc_header_t;
typedef struct spmc_shared {
  uint8_t version;
  size_t element_capacity;
  size_t element_size;
  bool initialized;
  bool client_connected;
  alignas(L1_DCACHE_LINESIZE) atomic_int64_t writer_idx;
  alignas(L1_DCACHE_LINESIZE) atomic_int64_t reader_idx;
  alignas(L1_DCACHE_LINESIZE) uint8_t data[];
} spmc_shared_t;
typedef struct spmc_queue {
  spmc_header_t header;
  spmc_shared_t *shared;
} spmc_queue_t;
spmc_queue_t *spmc_queue_create(const char *path, size_t element_size, size_t element_capacity, enum spmc_mode mode);
void spmc_queue_destroy(spmc_queue_t *queue);
bool spmc_queue_enqueue(spmc_queue_t *queue, uint8_t *src_data);
bool spmc_queue_dequeue(spmc_queue_t *queue, uint8_t *dst_data);
- source file: spmc_queue.c
#define _POSIX_C_SOURCE 200809L
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "spmc_queue.h"
spmc_queue_t *spmc_queue_create(const char *path, size_t element_size, size_t element_capacity, enum spmc_mode mode) {
  if (!path || element_size == 0 || element_capacity == 0 || (mode != spmc_mode_reader && mode != spmc_mode_writer)) {
    fprintf(stderr, "spmc_queue_create: invalid arguments (path:%s element_size:%zu element_capacity:%zu mode:%d\n",
            path, element_size, element_capacity, mode);
    return NULL;
  }
  printf("creating spmc_queue %s of size %zu and capacity %zu with mode %s\n", path, element_size, element_capacity,
         (mode == spmc_mode_reader) ? "reader" : "writer");
  spmc_queue_t *queue = NULL;
  spmc_shared_t *shared = NULL;
  int fd = -1;
  size_t shared_size = sizeof(spmc_shared_t) + element_size * element_capacity;
  int oflag = (mode == spmc_mode_reader) ? O_RDWR : O_RDWR | O_CREAT | O_EXCL;
  fd = shm_open(path, oflag, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
  if (fd == -1) {
    perror("shm_open");
    goto cleanup;
  }
  if (mode == spmc_mode_writer) {
    int rt = ftruncate(fd, shared_size);
    if (rt == -1) {
      perror("ftruncate");
      goto cleanup;
    }
  }
  queue = calloc(1, sizeof(spmc_queue_t));
  if (!queue) {
    perror("calloc(1, sizeof(spmc_queue_t))");
    goto cleanup;
  }
  shared = mmap(NULL, shared_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  if (shared == MAP_FAILED) {
    perror("mmap");
    goto cleanup;
  }
  queue->header.fd = fd;
  queue->header.mode = mode;
  queue->header.path = strdup(path);
  if (!queue->header.path) {
    perror("strdup");
    goto cleanup;
  }
  queue->header.shared_size = shared_size;
  queue->shared = shared;
  if (mode == spmc_mode_writer) {
    memset(shared, 0, shared_size);
    queue->shared->version = SPMC_QUEUE_VERSION;
    queue->shared->initialized = true;
    queue->shared->client_connected = false;
    queue->shared->element_size = element_size;
    queue->shared->element_capacity = element_capacity;
    queue->shared->writer_idx = 0;
  }
  if (mode == spmc_mode_reader) {
    if (!queue->shared->initialized) {
      fprintf(stderr, "spmc_queue: reader mode encounters shared memory not initialized yet by writer\n");
      goto cleanup;
    }
    if (queue->shared->element_capacity != element_capacity) {
      fprintf(stderr, "spmc_queue: element_capacity %zu != queue's element_capacity %zu\n",
              queue->shared->element_capacity, element_capacity);
      goto cleanup;
    }
    if (queue->shared->element_size < element_size) {
      fprintf(stderr, "spmc_queue: element size %zu > queue's element size %zu\n", element_size,
              queue->shared->element_size);
      goto cleanup;
    }
    queue->shared->client_connected = true;
    queue->shared->reader_idx = 0;
  }
  printf("spmc_queue created\n");
  return queue;
cleanup:
  if (fd != -1)
    close(fd);
  if (shared && shared != MAP_FAILED)
    munmap(shared, shared_size);
  if (mode == spmc_mode_writer && path)
    shm_unlink(path);
  if (queue && queue->header.path)
    free(queue->header.path);
  if (queue)
    free(queue);
  return NULL;
}
void spmc_queue_destroy(spmc_queue_t *queue) {
  printf("destroying spmc_queue %s of mode %s\n", queue->header.path,
         (queue->header.mode == spmc_mode_reader) ? "reader" : "writer");
  int fd = queue->header.fd;
  char *path = queue->header.path;
  enum spmc_mode mode = queue->header.mode;
  munmap(queue->shared, queue->header.shared_size);
  close(fd);
  if (mode == spmc_mode_writer) {
    // writer owns the lifecycle of the queue
    shm_unlink(path);
  }
  free(path);
  free(queue);
  printf("spmc_queue destroyed\n");
}
bool spmc_queue_enqueue(spmc_queue_t *queue, uint8_t *src_data) {
  if (!queue->shared->client_connected) {
    return false;
  }
  int64_t reader_idx = atomic_load_explicit(&queue->shared->reader_idx, memory_order_acquire);
  int64_t writer_idx = atomic_load_explicit(&queue->shared->writer_idx, memory_order_relaxed);
  if (writer_idx >= reader_idx + queue->shared->element_capacity) {
    // queue full
    return false;
  }
  int64_t idx = writer_idx % queue->shared->element_capacity;
  memcpy(&queue->shared->data[idx * queue->shared->element_size], src_data, queue->shared->element_size);
  atomic_store_explicit(&queue->shared->writer_idx, writer_idx+1, memory_order_release);
  return true;
}
bool spmc_queue_dequeue(spmc_queue_t *queue, uint8_t *dst_data) {
  int64_t reader_idx = atomic_load_explicit(&queue->shared->reader_idx, memory_order_relaxed);
  int64_t writer_idx = atomic_load_explicit(&queue->shared->writer_idx, memory_order_acquire);
  if (reader_idx >= writer_idx) {
    // queue empty
    return false;
  }
  int64_t idx = reader_idx % queue->shared->element_capacity;
  memcpy(dst_data, &queue->shared->data[idx * queue->shared->element_size], queue->shared->element_size);
  atomic_store_explicit(&queue->shared->reader_idx, reader_idx+1, memory_order_release);
  return true;
}
- benchmark harness: benchmark.c
#define _POSIX_C_SOURCE 200809L
#define _GNU_SOURCE
#include "spmc_queue.h"
#include <assert.h>
#include <pthread.h>
#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define UNUSED(arg) ((void)arg)
#define TEST_MESSAGE_COUNT 1024 * 1024 * 64 // 64 MB count * 64 bytes = 4 GB data
#define QUEUE_CAPACITY 1024
struct message {
  int64_t num;
  char padding[L1_DCACHE_LINESIZE - sizeof(int64_t)];
};
static spmc_queue_t *producer_queue = NULL;
static spmc_queue_t *consumer_queue = NULL;
static pthread_t producer_thread;
static pthread_t consumer_thread;
static struct message *test_messages;
static volatile bool producer_thread_ready = false;
static volatile bool consumer_thread_ready = false;
static volatile bool test_may_start = false;
static int64_t test_producer_sum = 0;
static int64_t test_consumer_sum = 0;
static void initialize_benchmark(void) {
  printf("Initializing the performance benchmark...\n");
  producer_thread_ready = false;
  consumer_thread_ready = false;
  test_may_start = false;
  test_producer_sum = 0;
  test_consumer_sum = 0;
  producer_queue = spmc_queue_create("/spmc_benchmark_queue", sizeof(struct message), QUEUE_CAPACITY, spmc_mode_writer);
  consumer_queue = spmc_queue_create("/spmc_benchmark_queue", sizeof(struct message), QUEUE_CAPACITY, spmc_mode_reader);
  assert(producer_queue != NULL);
  assert(consumer_queue != NULL);
  test_messages = calloc(TEST_MESSAGE_COUNT, sizeof(struct message));
  for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
    int random_number = rand() % 5;
    test_messages[i].num = random_number;
    test_producer_sum += random_number;
  }
  printf("Initialized performance benchmark\n");
}
static void pin_to_core(int core_num) {
  cpu_set_t cpuset;
  CPU_ZERO(&cpuset);
  CPU_SET(core_num, &cpuset);
  pthread_t current_thread = pthread_self();
  int rc = pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
  assert(rc == 0);
}
static void destroy_benchmark(void) {
  printf("Destroying the performance benchmark...\n");
  spmc_queue_destroy(producer_queue);
  spmc_queue_destroy(consumer_queue);
  producer_queue = NULL;
  consumer_queue = NULL;
  free(test_messages);
  printf("Destroyed performance benchmark\n");
}
static void *consumer_main(void *arg) {
  UNUSED(arg);
  pin_to_core(5);
  printf("consumer thread spawns\n");
  consumer_thread_ready = true;
  static struct message message_buf;
  while (!test_may_start) {
  }
  int idx = 0;
  while (idx < TEST_MESSAGE_COUNT) {
    bool dequeued = spmc_queue_dequeue(consumer_queue, (unsigned char *)&message_buf);
    if (dequeued) {
      idx++;
      test_consumer_sum += message_buf.num;
    }
  }
  return NULL;
}
static void *producer_main(void *arg) {
  UNUSED(arg);
  pin_to_core(7);
  printf("producer thread spawns\n");
  producer_thread_ready = true;
  while (!test_may_start) {
  }
  int idx = 0;
  while (idx < TEST_MESSAGE_COUNT) {
    idx += (int)spmc_queue_enqueue(consumer_queue, (unsigned char *)&test_messages[idx]);
  }
  return NULL;
}
int main(void) {
  struct timespec start;
  struct timespec end;
  double elapsed_sec;
  double total_bytes = (double)TEST_MESSAGE_COUNT * sizeof(struct message);
  double throughput_mb;
  initialize_benchmark();
  pthread_create(&producer_thread, NULL, producer_main, NULL);
  pthread_create(&consumer_thread, NULL, consumer_main, NULL);
  while (!producer_thread_ready || !consumer_thread_ready) {
    static bool prompt = false;
    if (!prompt) {
      printf("waiting for the producer & consumer thread to be ready...\n");
      prompt = true;
    }
  }
  clock_gettime(CLOCK_MONOTONIC, &start);
  test_may_start = true;
  printf("performance benchmark starts\n");
  pthread_join(producer_thread, NULL);
  pthread_join(consumer_thread, NULL);
  clock_gettime(CLOCK_MONOTONIC, &end);
  printf("performance benchmark ends\n");
  elapsed_sec = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
  throughput_mb = total_bytes / elapsed_sec / (1024 * 1024);
  printf("Elapsed time: %.3f seconds\n", elapsed_sec);
  printf("Throughput: %.3f MB/s\n", throughput_mb);
  printf("test_producer_sum = %ld and test_consumer_sum = %ld\n", test_producer_sum, test_consumer_sum);
  assert(test_producer_sum == test_consumer_sum);
  destroy_benchmark();
  return 0;
}
Currently on my host (Ubuntu 24.04.2) it has the throughput of 1241.186 MB/s based on my benchmark harness.
