0

I am trying to run a job on an HPC using multiprocessing. Each process has a peak memory usage of ~44GB. The job class I can use allows 1-16 nodes to be used, each with 32 CPUs and a memory of 124GB. Therefore if I want to run the code as quickly as possible (and within the max walltime limit) I should be able to run 2 CPUs on each node up to a maximum of 32 across all 16 nodes. However, when I specify mp.Pool(32) the job quickly exceeds the memory limit, I assume because more than two CPUs were used on a node.

My natural instinct was to specify 2 CPUs as the maximum in the pbs script I run my python script from, however this configuration is not permitted on the system. Would really appreciate any insight, having been scratching my head on this one for most of today - and have faced and worked around similar problems in the past without addressing the fundamentals at play here.

Simplified versions of both scripts below:

#!/bin/sh 

#PBS -l select=16:ncpus=32:mem=124gb
#PBS -l walltime=24:00:00

module load anaconda3/personal
source activate py_env

python directory/script.py
#!/usr/bin/env python
# -*- coding: utf-8 -*- 

import numpy as np
import pandas as pd
import multiprocessing as mp

def df_function(df, arr1, arr2):
    df['col3'] = some_algorithm(df, arr1, arr2)
    return df

def parallelize_dataframe(df, func, num_cores):
    df_split = np.array_split(df, num_cores)
    with mp.Pool(num_cores, maxtasksperchild = 10 ** 3) as pool:
        df = pd.concat(pool.map(func, df_split))
    return df

def main():
    # Loading input data    
    direc = '/home/dir1/dir2/'
    file = 'input_data.csv'
    a_file = 'array_a.npy'
    b_file = 'array_b.npy'
    df = pd.read_csv(direc + file)
    a = np.load(direc + a_file)
    b = np.load(direc + b_file)

    # Globally defining function with keyword defaults
    global f
    def f(df):
        return df_function(df, arr1 = a, arr2 = b)

    num_cores = 32 # i.e. 2 per node if evenly distributed.
    # Running the function as a multiprocess:
    df = parallelize_dataframe(df, f, num_cores)
    # Saving:
    df.to_csv(direc + 'outfile.csv', index = False)

if __name__ == '__main__':
    main()
3
  • To clarify, is select=16:ncpus=2:mem=124gb somehow not allowed by policy? Commented Mar 8, 2022 at 0:07
  • Yeah, that's the case. If you use a multiple nodes the policy specifies that you must use all the CPUs on each Commented Mar 8, 2022 at 12:06
  • It turns out I can increase the walltime and work around this problem by specifying 2 processes and one node only, but I am still curious about whether there is a way of doing this Commented Mar 8, 2022 at 14:50

1 Answer 1

2

To run your job as-is, you could simply request ncpu=32 and then in your python script set num_cores = 2. Obviously this has you paying for 32 cores and then leaving 30 of them idle, which is wasteful.

The real problem here is that your current algorithm is memory-bound, not CPU-bound. You should be going to great lengths to read only chunks of your files into memory, operating on the chunks, and then writing the result chunks to disk to be organized later.

Fortunately Dask is built to do exactly this kind of thing. As a first step, you can take out the parallelize_dataframe function and directly load and map your some_algorithm with a dask.dataframe and dask.array:

#!/usr/bin/env python
# -*- coding: utf-8 -*- 

import dask.dataframe as dd
import dask.array as da


def main():
    # Loading input data    
    direc = '/home/dir1/dir2/'
    file = 'input_data.csv'
    a_file = 'array_a.npy'
    b_file = 'array_b.npy'
    df = dd.read_csv(direc + file, blocksize=25e6)
    a_and_b = da.from_np_stack(direc)

    df['col3'] = df.apply(some_algorithm, args=(a_and_b,))
    
    # dask is lazy, this is the only line that does any work    
    # Saving:
    df.to_csv(
        direc + 'outfile.csv',
        index = False,
        compute_kwargs={"scheduler": "threads"},  # also "processes", but try threads first
    )

if __name__ == '__main__':
    main()

That will require some tweaks to some_algorithm, and to_csv and from_np_stack work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot. Don't have the rep to upvote as am new to stackoverflow, but will when I have it. Was wondering how you chose the blocksize?
There is not one simple rule but start by reading blog.dask.org/2021/11/02/choosing-dask-chunk-sizes

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.