2

I am working on a parallel sorting program using MPI to sort a large CSV file (35 GB, 1 billion lines). However, I am first testing the program with a smaller file, and I am encountering an issue where only a portion of the data is written to the output file.

The program reads the CSV file, divides the data among multiple MPI processes, sorts the data locally in each process, and then collects and writes the sorted data to an output file. Despite the input CSV file containing 100 entries (for testing purposes), the output file only contains 15 entries.

My code :

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stddef.h>

typedef struct {
    char key[128];
    uint64_t value;
} CSVEntry;

void create_mpi_csventry_type(MPI_Datatype *MPI_CSVEntry) {
    int lengths[2] = {128, 1};
    MPI_Aint offsets[2] = {offsetof(CSVEntry, key), offsetof(CSVEntry, value)};
    MPI_Datatype types[2] = {MPI_CHAR, MPI_UNSIGNED_LONG};
    MPI_Type_create_struct(2, lengths, offsets, types, MPI_CSVEntry);
    MPI_Type_commit(MPI_CSVEntry);
}

int read_csv_partial(const char *filename, long offset, long length, CSVEntry **entries, int rank) {
    FILE *file = fopen(filename, "r");
    if (!file) {
        perror("Erreur lors de l'ouverture du fichier");
        return -1;
    }

    fseek(file, offset, SEEK_SET);

    size_t capacity = 1000;
    size_t count = 0;
    *entries = (CSVEntry *)malloc(capacity * sizeof(CSVEntry));
    if (!*entries) {
        perror("Erreur d'allocation mémoire");
        fclose(file);
        return -1;
    }

    char line[256];
    if (offset > 0) {
        fgets(line, sizeof(line), file); // Sauter une éventuelle ligne incomplète
    }

    while (ftell(file) < offset + length && fgets(line, sizeof(line), file)) {
        if (sscanf(line, "%127s %lu", (*entries)[count].key, &(*entries)[count].value) == 2) {
            count++;
            if (count >= capacity) {
                capacity *= 2;
                *entries = (CSVEntry *)realloc(*entries, capacity * sizeof(CSVEntry));
                if (!*entries) {
                    perror("Erreur lors du réajustement mémoire");
                    fclose(file);
                    return -1;
                }
            }
        }
    }

    fclose(file);
    printf("Processus %d a lu %zu entrées depuis offset %ld\n", rank, count, offset);
    return count;
}

void write_csv(const char *filename, CSVEntry *entries, int count) {
    FILE *file = fopen(filename, "w");
    if (!file) {
        perror("Erreur lors de l'écriture du fichier");
        return;
    }

    for (int i = 0; i < count; i++) {
        fprintf(file, "%s %lu\n", entries[i].key, entries[i].value);
    }

    fclose(file);
    printf("Écriture terminée dans %s (%d entrées)\n", filename, count);
}

int compare_entries(const void *a, const void *b) {
    CSVEntry *entry_a = (CSVEntry *)a;
    CSVEntry *entry_b = (CSVEntry *)b;
    if (entry_a->value != entry_b->value) {
        return (entry_a->value > entry_b->value) - (entry_a->value < entry_b->value);
    }
    return strcmp(entry_a->key, entry_b->key);
}

int main(int argc, char **argv) {
    if (argc != 3) {
        fprintf(stderr, "Usage: %s <input_csv> <output_csv>\n", argv[0]);
        return EXIT_FAILURE;
    }

    MPI_Init(&argc, &argv);

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    long file_size = 0, chunk_size = 0;
    if (rank == 0) {
        FILE *file = fopen(argv[1], "r");
        if (!file) {
            perror("Erreur lors de l'ouverture du fichier d'entrée");
            MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
        }
        fseek(file, 0, SEEK_END);
        file_size = ftell(file);
        fclose(file);

        chunk_size = (file_size + size - 1) / size;
    }

    MPI_Bcast(&chunk_size, 1, MPI_LONG, 0, MPI_COMM_WORLD);
    MPI_Bcast(&file_size, 1, MPI_LONG, 0, MPI_COMM_WORLD);

    long offset = rank * chunk_size;
    long length = (rank == size - 1) ? (file_size - offset) : chunk_size;

    CSVEntry *entries = NULL;
    int num_entries = read_csv_partial(argv[1], offset, length, &entries, rank);

    if (num_entries < 0) {
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }

    uint64_t local_min = UINT64_MAX, local_max = 0;
    for (int i = 0; i < num_entries; i++) {
        if (entries[i].value < local_min) local_min = entries[i].value;
        if (entries[i].value > local_max) local_max = entries[i].value;
    }

    uint64_t global_min, global_max;
    MPI_Allreduce(&local_min, &global_min, 1, MPI_UNSIGNED_LONG, MPI_MIN, MPI_COMM_WORLD);
    MPI_Allreduce(&local_max, &global_max, 1, MPI_UNSIGNED_LONG, MPI_MAX, MPI_COMM_WORLD);

    uint64_t range = (global_min == global_max) ? 1 : (global_max - global_min + size - 1) / size;

    CSVEntry *local_bucket = (CSVEntry *)malloc(num_entries * sizeof(CSVEntry));
    if (!local_bucket) {
        perror("Erreur d'allocation mémoire pour le bucket local");
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }

    int local_count = 0;
    for (int i = 0; i < num_entries; i++) {
        uint64_t lower_bound = global_min + rank * range;
        uint64_t upper_bound = global_min + (rank + 1) * range;

        if ((entries[i].value >= lower_bound && entries[i].value < upper_bound) ||
            (rank == size - 1 && entries[i].value == global_max)) {
            local_bucket[local_count++] = entries[i];
        }
    }

    qsort(local_bucket, local_count, sizeof(CSVEntry), compare_entries);

    MPI_Datatype MPI_CSVEntry;
    create_mpi_csventry_type(&MPI_CSVEntry);

    int *recv_counts = (int *)malloc(size * sizeof(int));
    MPI_Gather(&local_count, 1, MPI_INT, recv_counts, 1, MPI_INT, 0, MPI_COMM_WORLD);

    int *displs = NULL;
    CSVEntry *sorted_entries = NULL;

    if (rank == 0) {
        displs = (int *)malloc(size * sizeof(int));
        int total_entries = 0;

        for (int i = 0; i < size; i++) {
            displs[i] = total_entries;
            total_entries += recv_counts[i];
        }

        sorted_entries = (CSVEntry *)malloc(total_entries * sizeof(CSVEntry));
        if (!sorted_entries) {
            perror("Erreur d'allocation mémoire pour les entrées triées");
            MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
        }
    }

    MPI_Gatherv(local_bucket, local_count, MPI_CSVEntry,
                sorted_entries, recv_counts, displs, MPI_CSVEntry, 0, MPI_COMM_WORLD);

    if (rank == 0) {
        int total_entries = 0;
        for (int i = 0; i < size; i++) {
            total_entries += recv_counts[i];
        }
        printf("Total des entrées collectées : %d\n", total_entries);

        write_csv(argv[2], sorted_entries, total_entries);
        free(sorted_entries);
        free(displs);
    }

    free(entries);
    free(local_bucket);
    free(recv_counts);
    MPI_Type_free(&MPI_CSVEntry);

    MPI_Finalize();
    return EXIT_SUCCESS;
}

Debug Output:

Process 0 read 17 entries from offset 0
Process 1 read 16 entries from offset 495
Process 2 read 17 entries from offset 990
Process 3 read 17 entries from offset 1485
Process 4 read 16 entries from offset 1980
Process 5 read 16 entries from offset 2475
Total entries collected: 15
Writing completed to output file (15 entries)

For context, I am eventually working on processing a much larger file (35 GB, 1 billion lines), but I am testing with a small CSV file for now. I’d appreciate any help or suggestions in resolving this issue.

7
  • 1
    Going from entries to local_bucket you are filtering elements, effectively dropping them. At the end you will only collect and dump these values. For parallel bucket sort, you would probably need to read the whole file on each process, filter, sort, collect and dump. Commented Dec 30, 2024 at 3:02
  • 1
    On a side note, MPI_UNSIGNED_LONG is not the proper type to communicate uint64_t. You must use MPI_UINT64_T instead, or unsigned long int for the variables. Commented Dec 30, 2024 at 3:07
  • Thank you @Joachim. After applying those changes, the program works perfectly with small test files. However, when I try to process a 35GB CSV file, I encounter the following error: Primary job terminated normally, but 1 process returned a non-zero exit code. Per user-direction, the job has been aborted. -------------------------------------------------------------------------- mpirun noticed that process rank 7 with PID 0 on node aragon exited on signal 9 (Killed). Commented Dec 30, 2024 at 9:20
  • Your input has 1 billion lines and you allocate 132 bytes per line. With the new approach, you allocate 132 GB just for the input for each process which probably exceeds the memory available. The root rank will allocate the same size for output. All processes allocate their bucket consuming additional memory. How much memory do you have available per MPI process? Commented Dec 30, 2024 at 11:41
  • If the available main memory is the limitation for your problem, you probably need to go back to distributed loading as before. As additional step, you redistribute the data, so that each process gets all data for it's bucket: load, sort, calculate bucket boundaries, redistribute data according to the buckets, sort, dump data. Considering the problem size, you probably need to dump the local bucket per process rather than collecting everything at root. Dump at rank 0, ping rank 1, dump rank 1, ping 2, ... Commented Dec 30, 2024 at 11:48

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.