1

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

    Input DF                                           Output DF
+-------------+--+                         +-------------+--+----------+----+
|utc_timestamp|id|                         |utc_timestamp|id|date      |hour|
+-------------+--+                         +-------------+--+----------+----|
|1608000000782|1 |                         |1608000000782|1 |2020-12-14|20  |
+-------------+--+                         +-------------+--+----------+----+
|1608000240782|2 |                         |1608000240782|2 |2020-12-15|11  |
+-------------+--+                         +-------------+--+----------+----+

I have pandas_udf that allows me to extract one column at a time and I have to create it twice:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    

def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

Is there a better way to do it? Without a need to use the same udf provider twice?

0

1 Answer 1

1

you can do this without using udf using spark inbuilt functions.

We can use create_map to map the dictionary and create new timezone column , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

EDIT: replacing create_map with a udf:

import pyspark.sql.functions as F
from pyspark.sql.functions import StringType
TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}
def fun(x):
  return TIMEZONE_LIST.get(x,None)
map_udf = F.udf(fun,StringType())


final = (df.withColumn("TimeZone", map_udf("id")).withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()
Sign up to request clarification or add additional context in comments.

9 Comments

create_map does not work for me, I receive an error: Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 1, in <listcomp> File "/home/ninav/venvs/acutecare-rdb-nlp-processor/lib/python3.6/site-packages/pyspark/sql/functions.py", line 44, in _ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) AttributeError: 'NoneType' object has no attribute '_jvm'
I tried on the example TIMEZONE_LIST and received an error
@NinaVolfenzon Updated my answer where I used an udf instead of create_map, can you try the second method on the example dataframe you shared?
I did the import the same way you showed in your first example. I'm using python 3.6, could this be the reason for my error?
@NinaVolfenzon Are you still getting the error when you use the second method shown that uses a udf?
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.