Skip to main content
added 148 characters in body
Source Link
Nina
  • 89
  • 1
  • 2
  • 8

SoAnd I have pyspark DF such as:need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

    Input DF                                           Output DF
+-------------+--+
|utc_timestamp|id|
                         +-------------+--+
|1608000000782|1 |
+-----------+--+--+
|1608000240782|2|utc_timestamp|id| |                        |utc_timestamp|id|date      |hour|
+-------------+--+

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable. So the output should look like this:

                         +-------------+--+----------+----+|
|utc_timestamp|id|date|1608000000782|1 |     |hour|                    |1608000000782|1 |2020-12-14|20  |
+-------------+--+-                         +---------+----|
|1608000000782|1 |2020+-12-14|20  |
+----------+----+
|1608000240782|2 |                         |1608000240782|2 |2020-12-15|11  |
+----------+----+
|1608000240782|2 |2020-12-15|11+  |
                       +-------------+--+----------+----+

So I have pyspark DF such as:

+-------------+--+
|utc_timestamp|id|
+-------------+--+
|1608000000782|1 |
+-------------+--+
|1608000240782|2 |
+-------------+--+

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable. So the output should look like this:

+-------------+--+----------+----+
|utc_timestamp|id|date      |hour|
+-------------+--+----------+----|
|1608000000782|1 |2020-12-14|20  |
+-------------+--+----------+----+
|1608000240782|2 |2020-12-15|11  |
+-------------+--+----------+----+

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

    Input DF                                           Output DF
+-------------+--+                         +-------------+--+----------+----+
|utc_timestamp|id|                         |utc_timestamp|id|date      |hour|
+-------------+--+                         +-------------+--+----------+----|
|1608000000782|1 |                         |1608000000782|1 |2020-12-14|20  |
+-------------+--+                         +-------------+--+----------+----+
|1608000240782|2 |                         |1608000240782|2 |2020-12-15|11  |
+-------------+--+                         +-------------+--+----------+----+
edited body
Source Link
Nina
  • 89
  • 1
  • 2
  • 8

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

1 - 'America/Chicago'
2 - 'Asia/Tokyo'

So So the output should look like this:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    
 

def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

Is there a batterbetter way to do it? Without a need to use the same udf provider twice?

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

1 - 'America/Chicago'
2 - 'Asia/Tokyo'

So the output should look like this:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    
 

def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

Is there a batter way to do it? Without a need to use the same udf provider twice?

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable. So the output should look like this:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    

def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

Is there a better way to do it? Without a need to use the same udf provider twice?

Source Link
Nina
  • 89
  • 1
  • 2
  • 8

How to add multiple columns to pyspark DF using pandas_udf with multiple source columns?

So I have pyspark DF such as:

+-------------+--+
|utc_timestamp|id|
+-------------+--+
|1608000000782|1 |
+-------------+--+
|1608000240782|2 |
+-------------+--+

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

1 - 'America/Chicago'
2 - 'Asia/Tokyo'

So the output should look like this:

+-------------+--+----------+----+
|utc_timestamp|id|date      |hour|
+-------------+--+----------+----|
|1608000000782|1 |2020-12-14|20  |
+-------------+--+----------+----+
|1608000240782|2 |2020-12-15|11  |
+-------------+--+----------+----+

I have pandas_udf that allows me to extract one column at a time and I have to create it twice:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    


def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

Is there a batter way to do it? Without a need to use the same udf provider twice?