I'm working with big 2d numpy arrays, and a function that I need to iterate over each row of these arrays.
To speed things up, I've implemented parallel processing using Python's multiprocessing module. Each worker of the pool gets an array index, which is used to read the data from the shared array, and after the function is executed, overwrite the data in the shared array on the same location.
The function returns the modified values as well as a processing parameter, which in turn is also stored in a separate 1d shared array.
So my questions are:
I've read many times that it's not particularly safe to have multiple processes reading and writing data from a shared array. Until now, things have just worked fine. Should I change my approach? Are there real concerns that things could not work as intended?
If the above mentioned approach isn't safe and/or practical, what would be a good alternative approach to implement this kind of processing, where I need to access shared arrays, for both reading and writing?
Of course, I'm also grateful for any other constructive comment on my code!
Here's a working example. In this case the function is merely generating a random integer by which the raw data is then multiplied and overwritten in the shared array. The generated integer can be seen as the processing parameter, which I need to store in the second shared array.
import numpy as np
import ctypes
import array
import multiprocessing as mp
import random
from contextlib import contextmanager, closing
def init_shared(ncell):
'''Create shared value array for processing.'''
shared_array_base = mp.Array(ctypes.c_float,ncell,lock=False)
return(shared_array_base)
def tonumpyarray(shared_array):
'''Create numpy array from shared memory.'''
nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
assert nparray.base is shared_array
return nparray
def init_parameters(**kwargs):
'''Initialize parameters for processing in workers.'''
params = dict()
for key, value in kwargs.items():
params[key] = value
return params
def init_worker(shared_array_,parameters_):
'''Initialize worker for processing.
Args:
shared_array_: Object returned by init_shared
parameters_: Dictionary returned by init_parameters
'''
global shared_array
global shared_parr
global dim
shared_array = tonumpyarray(shared_array_)
shared_parr = tonumpyarray(parameters_['shared_parr'])
dim = parameters_['dimensions']
def worker_fun(ix):
'''Function to be run inside each worker'''
arr = tonumpyarray(shared_array)
parr = tonumpyarray(shared_parr)
arr.shape = dim
random.seed(ix)
rint = random.randint(1,10)
parr[ix] = rint
arr[ix,...] = arr[ix,...] * rint
##----------------------------------------------------------------------
def main():
nrows = 100
ncols = 10
shared_array = init_shared(nrows*ncols)
shared_parr = init_shared(nrows)
params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols))
arr = tonumpyarray(shared_array)
parr = tonumpyarray(params['shared_parr'])
arr.shape = (nrows,ncols)
arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')
with closing(mp.Pool(processes=8,initializer = init_worker, initargs = (shared_array,params))) as pool:
res = pool.map(worker_fun,range(arr.shape[0]))
pool.close()
pool.join()
# check PARR output
print(parr)
if __name__ == '__main__':
main()
An the output:
array([ 7., 3., 1., 4., 4., 10., 10., 6., 4., 8., 10., 8., 8., 5., 2., 4., 6., 9., 3., 1., 3., 3., 3., 5., 7., 7., 4., 8., 2., 9., 9., 1., 2., 10., 9., 9., 6., 10., 7., 4., 8., 7., 2., 1., 7., 5., 2., 6., 9., 2., 8., 4., 5., 10., 3., 2., 9., 1., 10., 4., 5., 8., 10., 8., 8., 7., 2., 2., 8., 1., 2., 6., 2., 5., 10., 8., 6., 5., 4., 3., 5., 9., 3., 8., 5., 4., 1., 3., 7., 2., 4., 2., 7., 8., 9., 9., 6., 4., 6., 7.], dtype=float32)