Skip to main content
added 567 characters in body
Source Link
anky
  • 75.3k
  • 11
  • 46
  • 76

you can do this without using udf using spark inbuilt functions.

We can use create_map to map the dictionary and create new timezone column , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

EDIT: replacing create_map with a udf:

import pyspark.sql.functions as F
from pyspark.sql.functions import StringType
TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}
def fun(x):
  return TIMEZONE_LIST.get(x,None)
map_udf = F.udf(fun,StringType())


final = (df.withColumn("TimeZone", map_udf("id")).withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

you can do this without using udf using spark inbuilt functions.

We can use create_map to map the dictionary and create new timezone column , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

you can do this without using udf using spark inbuilt functions.

We can use create_map to map the dictionary and create new timezone column , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

EDIT: replacing create_map with a udf:

import pyspark.sql.functions as F
from pyspark.sql.functions import StringType
TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}
def fun(x):
  return TIMEZONE_LIST.get(x,None)
map_udf = F.udf(fun,StringType())


final = (df.withColumn("TimeZone", map_udf("id")).withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()
added 75 characters in body
Source Link
anky
  • 75.3k
  • 11
  • 46
  • 76

you can do this without using udf using spark inbuilt functions. 

We can use create_map to map the dictionary and create new timezone columnscolumn , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields] 

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

you can do this without using udf. We can use create_map to map the dictionary and create new timezone columns , then convert using from_unixtime and from_utc_timestamp . Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]
+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

you can do this without using udf using spark inbuilt functions. 

We can use create_map to map the dictionary and create new timezone column , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields] 

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+
Source Link
anky
  • 75.3k
  • 11
  • 46
  • 76

you can do this without using udf. We can use create_map to map the dictionary and create new timezone columns , then convert using from_unixtime and from_utc_timestamp . Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]
+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+