1

If have a DataFrame and want to do some manipulation of the data in a function depending on the values of the row.

my_udf(row):
    threshold = 10
        if row.val_x > threshold
        row.val_x = another_function(row.val_x)
        row.val_y = another_function(row.val_y)
        return row
    else:
        return row

Does anyone know how to apply my udf to the DataFrame?

2 Answers 2

1

From my understanding, udf parameters are column names. Your example might be rewrote like this:

from pyspark.sql.functions import udf, array
from pyspark.sql.types import IntegerType

def change_val_x(val_x):
    threshold = 10
    if val_x > threshold:
        return another_function(val_x)
    else:
        return val_x

def change_val_y(arr):
    threshold = 10
    # arr[0] -> val_x, arr[0] -> val_y 
    if arr[0] > threshold:
        return another_function(arr[1])
    else:
        return val_y

change_val_x_udf = udf(change_val_x, IntegerType())
change_val_y_udf = udf(change_val_y, IntegerType())

# apply these functions to your dataframe
df = df.withColumn('val_y', change_val_y_udf(array('val_x', 'val_y')))\
       .withColumn('val_x', change_val_x_udf('val_x'))

To modify val_x column, a simple udf is enough but for val_y you need val_y and val_x columns values, the solution is to use an array. Note that this code is not tested...

See this question to apply udf on multiple columns.

Sign up to request clarification or add additional context in comments.

2 Comments

it's .withColumn('val_y', change_val_y_udf(array('val_x', 'val_y'))) not .withColumn('val_y', change_val_x_udf(array('val_x', 'val_y')))
Also you might have changed val_x value before using it in in change_val_y_udf .
1

It's better not to use UDFs if you can use pyspark functions, if you can't translate another_function into pyspark functions you can do this:

from pyspark.sql.types import *
import pyspark.sql.functions as psf

def another_function(val):
    ...

another_function_udf = psf.udf(another_function, [outputType()])

where outputType() is the pyspark type corresponding to the output of another_function (IntegerType(), StringType()...)

def apply_another_function(val):
    return psf.when(df.val_x > threshold, another_function_udf(val)).otherwise(val)

df = df.withColumn('val_y', apply_another_function(df.val_y))\
       .withColumn('val_x', apply_another_function(df.val_x))

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.