I encountered the concurrency-zip project in OSTEP By Remzi Arpaci-Dusseau. So basically, I've built a simple compression program that uses Run-length Encoding, and multi-threading in C. But I couldn't find any correct solution so I think I can use this forum for code review.
Essentially, the idea is based on Producer-Consumer problem where
One producer thread will load the files and split them into equal-size pages. The page size will be defined by the Linux kernel using sysconf(_SC_PAGE_SIZE). The splitting process will be using mmap to load files into memory for high performance. All the split pages will be put into a work queue. The queue is an unbounded buffer,which is different from traditional bounded-buffer where the buffer is bounded.
Many consumer threads will take a work out of the queue and do the compression using RLE. The number of threads will be defined with get_nprocs by the C library. After taking and compressing one work, each consumer will be put into the result queue and wait for the previous works to be finished.
The main thread will take all the results from the result queue and write into the stdout. Use Linux redirection if a destination file is specified.
More detail can found on the owner's README
Code:
#include <stdio.h> // stderr, stdout, perror, fprintf, fwrite
#include <stdlib.h> // exit, EXIT_FAILURE, EXIT_SUCCESS, malloc
#include <pthread.h> // pthread_t, pthread_create, phthread_exit, pthread_join
#include <fcntl.h> // sysconf, open, O_*
#include <unistd.h> // _SC_*
#include <semaphore.h> // sem_t, sem_init, sem_wait, sem_post
#include <sys/sysinfo.h> // get_nprocs
#include <sys/stat.h> // stat, fstat
#include <sys/mman.h> // mmap
#include <string.h> // memset
#define handle_error(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
/* Global object */
// Argument object for producer thread
typedef struct _arg
{
int argc;
char **argv;
} arg_t;
// Page object for munmap
typedef struct _page
{
char *addr;
long size;
} page_t;
// Work produced by producer
typedef struct _work
{
char *addr;
long pagesz;
long pagenm;
long filenm;
struct _work *next;
} work_t;
// Result after a work consumed by consumer
typedef struct _rle
{
char c;
int count;
struct _rle *next;
} result_t;
/* Global variables */
long nprocs; // Number of processes
long nfiles; // Number of files
long pagesz; // Page size
long pagenm; // Page number #
long filenm; // File number #
static int done = 0;
static int curr_page = 0;
static int *npage_onfile;
static work_t *works, *work_head, *work_tail;
static result_t *results, *result_tail;
static sem_t mutex, filled, page;
static sem_t *order;
/* Global functions */
void *
producer(void *args);
void *consumer(void *args);
void wenqueue(work_t work);
work_t *wdequeue();
result_t *compress(work_t work);
void renqueue(result_t *result);
int main(int argc, char **argv)
{
if (argc < 2)
{
fprintf(stderr, "pzip: file1 [file2 ...]\n");
exit(EXIT_FAILURE);
}
arg_t *args = malloc(sizeof(arg_t));
if (args == NULL)
handle_error("Malloc args");
args->argc = argc - 1;
args->argv = argv + 1; // Remove the first argument
nprocs = get_nprocs(); // Let the system decide how many thread will be used
nfiles = argc - 1; // Remove the first argument
pagesz = sysconf(_SC_PAGE_SIZE); // Let the system decide how big a page is
order = malloc(sizeof(sem_t) * nprocs);
npage_onfile = malloc(sizeof(int) * nfiles);
memset(npage_onfile, 0, sizeof(int) * nfiles);
sem_init(&mutex, 0, 1);
sem_init(&filled, 0, 0);
sem_init(&page, 0, 0);
pthread_t pid, cid[nprocs];
pthread_create(&pid, NULL, producer, (void *)args);
for (int i = 0; i < nprocs; i++)
{
pthread_create(&cid[i], NULL, consumer, (void *)args);
sem_init(&order[i], 0, i ? 0 : 1);
}
for (int i = 0; i < nprocs; i++)
{
pthread_join(cid[i], NULL);
}
pthread_join(pid, NULL);
for (result_t *curr = results; curr != NULL; curr = curr->next)
{
fwrite((char *)&(curr->count), sizeof(int), 1, stdout);
fwrite((char *)&(curr->c), sizeof(char), 1, stdout);
}
sem_destroy(&filled);
sem_destroy(&mutex);
sem_destroy(&page);
for (int i = 0; i < nprocs; i++)
{
sem_destroy(&order[i]);
}
for (result_t *curr = results; curr != NULL; curr = results)
{
results = results->next;
free(curr);
curr = NULL;
}
for (work_t *curr = works; curr != NULL; curr = works)
{
munmap(curr->addr, curr->pagesz);
works = works->next;
free(curr);
curr = NULL;
}
free(order);
free(npage_onfile);
return 0;
}
void wenqueue(work_t work)
{
if (works == NULL)
{
works = malloc(sizeof(work_t));
if (works == NULL)
{
handle_error("malloc work");
sem_post(&mutex);
}
works->addr = work.addr;
works->filenm = work.filenm;
works->pagenm = work.pagenm;
works->pagesz = work.pagesz;
works->next = NULL;
work_head = works;
work_tail = works;
}
else
{
work_tail->next = malloc(sizeof(work_t));
if (work_tail->next == NULL)
{
handle_error("malloc work");
sem_post(&mutex);
}
work_tail->next->addr = work.addr;
work_tail->next->filenm = work.filenm;
work_tail->next->pagenm = work.pagenm;
work_tail->next->pagesz = work.pagesz;
work_tail->next->next = NULL;
work_tail = work_tail->next;
}
}
void *producer(void *args)
{
arg_t *arg = (arg_t *)args;
char **fnames = arg->argv;
for (int i = 0; i < arg->argc; i++)
{
int fd = open(fnames[i], O_RDONLY);
if (fd == -1)
handle_error("open error");
struct stat sb;
if (fstat(fd, &sb) == -1)
handle_error("fstat error");
if (sb.st_size == 0)
continue;
int p4f = sb.st_size / pagesz;
if ((double)sb.st_size / pagesz > p4f)
p4f++;
int offset = 0;
npage_onfile[i] = p4f;
char *addr = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
for (int j = 0; j < p4f; j++)
{
// it should be less than or equal to the default page size
int curr_pagesz = (j < p4f - 1) ? pagesz : sb.st_size - ((p4f - 1) * pagesz);
offset += curr_pagesz;
work_t work;
work.addr = addr;
work.filenm = i;
work.pagenm = j;
work.pagesz = curr_pagesz;
work.next = NULL;
sem_wait(&mutex);
wenqueue(work);
sem_post(&mutex);
sem_post(&filled);
addr += curr_pagesz;
}
close(fd);
}
done = 1;
for (int i = 0; i < nprocs; i++)
{
sem_post(&filled);
}
pthread_exit(NULL);
}
work_t *wdequeue()
{
if (work_head == NULL)
return NULL;
work_t *tmp = work_head;
work_head = work_head->next;
return tmp;
}
result_t *compress(work_t work)
{
result_t *result = malloc(sizeof(result_t));
if (result == NULL)
handle_error("malloc result");
result_t *tmp = result;
int count = 0;
char c, last;
for (int i = 0; i < work.pagesz; i++)
{
c = work.addr[i];
if (count && last != c)
{
tmp->c = last;
tmp->count = count;
tmp->next = malloc(sizeof(result_t));
tmp = tmp->next;
count = 0;
}
last = c;
count++;
}
if (count)
{
tmp->c = last;
tmp->count = count;
tmp->next = NULL;
}
return result;
}
void renqueue(result_t *result)
{
if (results == NULL)
{
results = result;
}
else
{
if (result_tail->c == result->c)
{
result_tail->count += result->count;
result = result->next;
}
result_tail->next = result;
}
result_t *curr = result;
for (; curr->next != NULL; curr = curr->next)
{
}
result_tail = curr;
}
void *consumer(void *args)
{
work_t *work;
while (!done || work_head != NULL)
{
sem_wait(&filled);
sem_wait(&mutex);
if (work_head == work_tail && !done)
{
sem_post(&mutex);
continue;
}
else if (work_head == NULL)
{
sem_post(&mutex);
return NULL;
}
else
{
work = work_head;
work_head = work_head->next;
sem_post(&mutex);
}
result_t *result = compress(*work);
if (work->filenm == 0 && work->pagenm == 0)
{
sem_wait(&order[0]);
renqueue(result);
if (work->pagenm == npage_onfile[work->filenm] - 1)
{
sem_post(&order[0]);
curr_page++;
}
else
sem_post(&order[1]);
sem_post(&page);
}
else
{
while (1)
{
sem_wait(&page);
if (curr_page != work->filenm)
{
sem_post(&page);
continue;
}
if (curr_page == nfiles)
{
sem_post(&page);
return NULL;
}
sem_post(&page);
sem_wait(&order[work->pagenm % nprocs]);
sem_wait(&page);
renqueue(result);
if (work->filenm == curr_page && work->pagenm < npage_onfile[work->filenm] - 1)
{
sem_post(&order[(work->pagenm + 1) % nprocs]);
}
else if (work->filenm == curr_page && work->pagenm == npage_onfile[work->filenm] - 1)
{
sem_post(&order[0]);
curr_page++;
}
sem_post(&page);
break;
}
}
}
return NULL;
}
What I want to check:
- Correctness
- Performance
- Memory leak
- Syntax and code organization
I've already tested with small files. The parallel zip is supposed to have a better performance compared to the sequential zip (non multi-threading). But my program doesn't run faster significantly, and even it runs slower. What I can only think of is the free process at the end that slows the performance
Thanks in advance!