0

I am attempting to make a new column from another column in Apache Spark.

The data (heavily abbreviated) looks like

Date    Day_of_Week
2018-05-26T00:00:00.000+0000    5
2018-05-05T00:00:00.000+0000    6

and should look like

Date    Day_of_Week    Weekday
2018-05-26T00:00:00.000+0000    5    Thursday
2018-05-05T00:00:00.000+0000    6    Friday

I have tried advice from the manual https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#register-the-function-as-a-udf & How to pass a constant value to Python UDF? & PySpark add a column to a DataFrame from a TimeStampType column

which resulted in:

def int2day (day_int):
  if day_int == 1:
    return 'Sunday'
  elif day_int == 2:
    return 'Monday'
  elif day_int == 3:
    return 'Tuesday'
  elif day_int == 4:
    return 'Wednesday'
  elif day_int == 5:
    return 'Thursday'
  elif day_int == 6:
    return 'Friday'
  elif day_int == 7:
    return 'Saturday'
  else:
    return 'FAIL'

spark.udf.register("day", int2day, IntegerType())
df2 = df.withColumn("Day", day("Day_of_Week"))

and gives a long error

SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 262, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 257, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 325, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 141, in dump_stream
    self._write_with_length(obj, stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 151, in _write_with_length
    serialized = self.dumps(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 556, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I don't see how I can apply How to pass a constant value to Python UDF? here, as their example was much much simpler (only true or false)

I've also tried using map functions, as in PySpark add a column to a DataFrame from a TimeStampType column

but

df3 = df2.withColumn("weekday", map(lambda x: int2day, col("Date"))) just says that TypeError: argument 2 to map() must support iteration but I thought col does support iteration.

I've read every example online I can find. I don't see how I can apply what other questions have asked to my case.

How can I add another column, using a function of another column?

2
  • 1
    You shouldn't use a udf for this. See this post on how to do IF-THEN-ELSE logic. Commented Oct 26, 2018 at 16:40
  • 1
    If you did want to use udf , your syntax is incorrect. The return type should be StringType() instead of integer. You can refer to this post for an example on the correct syntax. Commented Oct 26, 2018 at 16:54

1 Answer 1

1

You shouldn't need a UDF here at all to accomplish what you're trying to do. You can leverage the built-in pyspark date_format function to extract the name for each day of the week given the date in a column.

import pyspark.sql.functions as func
df = df.withColumn("day_of_week", func.date_format(func.col("Date"), "EEEE"))

The result is a new column added to your dataframe called day_of_week that will display Sunday, Monday, Tuesday etc. based on the value in the Date column.

Sign up to request clarification or add additional context in comments.

1 Comment

It doesn't look like the "Date" column is a DateType in this example. It's likely a StringType that first needs to be converted before you can use date_format.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.