0

I have a python code that calculates a function F(X), where both F and X are arrays of the same shape. F(X) uses another function called from a package that only accepts a scalar as an argument, but I need to calculate function for all values in X. Using nested for-loops is very time expensive. Is there a way i could optimize this code, or else other alternatives to looping over the dimensions of X?

import numpy as np

def F(X):
    """
    Calculates a matrix F of the same shape as X, using nested for-loops
    Args:
        X (np.ndarray): Input array, likely representing angles or similar.
    Returns:

        F (np.ndarray): An array of values with the same shape as X.
    """
    matrix = np.zeros(np.shape(X))

    for i in range(np.shape(X)[0]):
        for j in range(np.shape(X)[1]):

            x_ij = X[i,j]

            P = external_function(x_ij) # This is my external function that accepts a scalar and returns a 4x4 matrix
            f = - P[0, 1] / P[0, 0]
            matrix[i,j] = f

    return matrix

def external_function(x):
    """ 
    This here is just an example. I have no control of the behaviour of this 
    function, since it can be one of several functions that I need to be 
    able to interchange and they depend on a lot of other variables. """
    P  = np.zeros((4,4))
    for i in range(4):
        for j in range(4):
          for k in range(0,10):
            P[i,j] = np.cos((x + i + j +k))
    return P

X = np.random.rand(100, 100)
F(X)

I tried an using np.vectorize:

def F_opt(X):
    """
    Calculates a matrix F of the same shape as X, by vectorizing the
    scalar-only 'function'.

    Args:
        X (np.ndarray): Input array, m x n

    Returns:
        F (np.ndarray): An array of values with the same shape as X.
    """

    # Define the scalar operation that needs to be applied to each element
    def scalar_operation(x_ij):

        P = external_function(x_ij) # This is my external function that accepts a scalar and returns a 4x4 matrix

        f = - P[0, 1] / P[0, 0]
        return f

    vectorized_scalar_operation = np.vectorize(scalar_operation, otypes=[float])

    F = vectorized_scalar_operation(X)

    return F

but i arrived at a very similar performance:

import time

X_large = X = np.random.rand(500, 500)

start_time = time.time()
F_large_optimized = F_opt(X_large)
end_time = time.time()
print(f"Optimized F_optimized took: {end_time - start_time:.4f} seconds")


start_time = time.time()
F_large_original = F(X_large)
end_time = time.time()
print(f"Original F_original took: {end_time - start_time:.4f} seconds")

yielding : Optimized F_optimized took: 57.4816 seconds Original F_original took: 64.0786 seconds

10
  • 2
    Is the external function any other function - or exactly this function? Commented Jul 19 at 8:01
  • you are calculating 4x4 matrix and only using 2 elements in this matrix .... maybe only calculate those two, should bring down the time by a factor of 8 ... also this can be done with less loops. Commented Jul 19 at 8:05
  • your loop over k is just overwriting the same slot 10 times in a row ... remove this loop, should bring down the time by a factor of 10 ... so in short this code can be 80 times faster if you just remove the redundant work in external_function .... but if you don't control this function then there's nothing you can do. Commented Jul 19 at 8:07
  • 1
    If external_function is a given that can't be modified then you can't vectorize your work. np.vectorize is just a wrapper for a loop. There's no magic or free lunch. You can turn your double loop into a single one with itertools but that won't help you in any significant way. Commented Jul 19 at 8:12
  • 1
    i wrote an explict form of external_function just to show a slow process. In reality I have no control of the behaviour of this function, since it can be one of several functions that I need to be able to interchange and they depend on a lot of other variables. so the problem is optimizing my definition of F(X). Commented Jul 19 at 11:08

2 Answers 2

0

Use Python’s multiprocessing.Pool to parallelize computation across CPU cores

import numpy as np
from multiprocessing import Pool, cpu_count


def scalar_operation(x):
   P = external_function(x)
   return -P[0, 1] / P[0, 0]


def F_parallel(X):
    with Pool(cpu_count()) as pool:
        results = pool.map(scalar_operation, X.ravel())
        return np.array(results).reshape(X.shape)
Sign up to request clarification or add additional context in comments.

2 Comments

As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.
you should use ``` for block of code instead of putting every line in `. And you have to use correct indentations in code.
-1

Vectorization (Broadcasting)

The bottleneck is actually the triple nested for-loop in the external_function. You could use numpy broadcasting to make this a magnitude of order faster:

def external_function_vec(x):
    i = np.arange(4).reshape(-1, 1, 1)  # shape: (4,1,1)
    j = np.arange(4).reshape(1, -1, 1)  # shape: (1,4,1)
    k = np.arange(10).reshape(1, 1, -1) # shape: (1,1,10)

    P = np.sum(np.cos(x + i + j + k), axis=2)  # shape: (4,4)
    return P

You can then apply it like this:

def F_vectorized_internal(X):
    result = np.empty_like(X)
    it = np.nditer(X, flags=['multi_index'])
    while not it.finished:
        x = it[0]
        P = external_function_vec(x)
        result[it.multi_index] = -P[0,1] / P[0,0]
        it.iternext()
    return result

Use numba

The easiest way with nearly no code modification is to use the numba JIT-compiler. This can accelerate your code a magnitude of order or even two magnitude of orders.

# after `pip install numba` simply apply the @njit decorator:

from numba import njit, prange

@njit
def external_function_numba(x):
    P = np.zeros((4, 4))
    for i in range(4):
        for j in range(4):
            for k in range(10):
                P[i, j] += np.cos(x + i + j + k)
    return P

@njit(parallel=True)
def F_numba(X):
    m, n = X.shape
    result = np.empty((m, n))
    for i in prange(m):
        for j in range(n):
            P = external_function_numba(X[i, j])
            result[i, j] = -P[0, 1] / P[0, 0]
    return result

Benchmark

import numpy as np
import time
from numba import njit, prange

# ORIGINAL SLOW
def external_function(x):
    P = np.zeros((4, 4))
    for i in range(4):
        for j in range(4):
            for k in range(10):
                P[i, j] += np.cos(x + i + j + k)
    return P

def F(X):
    m, n = X.shape
    result = np.empty((m, n))
    for i in range(m):
        for j in range(n):
            P = external_function(X[i, j])
            result[i, j] = -P[0, 1] / P[0, 0]
    return result

# VECTORIZED

def external_function_vec(x):
    i = np.arange(4).reshape(-1, 1, 1)
    j = np.arange(4).reshape(1, -1, 1)
    k = np.arange(10).reshape(1, 1, -1)
    return np.sum(np.cos(x + i + j + k), axis=2)

def F_vectorized(X):
    result = np.empty_like(X)
    it = np.nditer(X, flags=['multi_index'])
    while not it.finished:
        x = it[0]
        P = external_function_vec(x)
        result[it.multi_index] = -P[0, 1] / P[0, 0]
        it.iternext()
    return result

# NUMBA version

@njit
def external_function_numba(x):
    P = np.zeros((4, 4))
    for i in range(4):
        for j in range(4):
            for k in range(10):
                P[i, j] += np.cos(x + i + j + k)
    return P

@njit(parallel=True)
def F_numba(X):
    m, n = X.shape
    result = np.empty((m, n))
    for i in prange(m):
        for j in range(n):
            P = external_function_numba(X[i, j])
            result[i, j] = -P[0, 1] / P[0, 0]
    return result

# BENCHMARK HELPER

def benchmark(fn, name, runs=3):
    print(f"\nBenchmarking: {name}")
    times = []
    for _ in range(runs):
        start = time.perf_counter()
        fn(X)
        end = time.perf_counter()
        times.append(end - start)
    print(f"Best of {runs}: {min(times):.4f} s | Avg: {sum(times)/runs:.4f} s")

# Test & Benchmark
np.random.seed(42)
X = np.random.rand(500, 500)

# Warm up JIT
F_numba(X)

# Benchmark
benchmark(F, "Original (Nested Loops)")
benchmark(F_vectorized, "Vectorized external_function")
benchmark(F_numba, "Numba Optimized")

# Validate output consistency
print("\nChecking correctness:  ========")
print("Original vs Vectorized:", np.allclose(F(X), F_vectorized(X)))
print("Original vs Numba:", np.allclose(F(X), F_numba(X)))

Output:

Benchmarking: Original (Nested Loops)
Best of 3: 19.7497 s | Avg: 19.8582 s

Benchmarking: Vectorized external_function
Best of 3: 1.4444 s | Avg: 1.4584 s

Benchmarking: Numba Optimized
Best of 3: 0.0129 s | Avg: 0.0132 s

Checking correctness:  ========
Original vs Vectorized: True
Original vs Numba: True

It is impressive how fast Numba makes your code. With minimal modification.

But I see - probably the external_function is really external.

You can use introspection to peak in its code:

import inspect

print(inspect.getsource(external_function))

The applicability of @njit depends on that your external function is pure python and doesn't use C (e.g. via numpy).

2 Comments

Almost certainly that function is an external dependency. The point is to optimise the code outside.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.