1

Suppose I have the following data frame :

+----------+-----+----+-------+
|display_id|ad_id|prob|clicked|
+----------+-----+----+-------+
|       123|  989| 0.9|      0|
|       123|  990| 0.8|      1|
|       123|  999| 0.7|      0|
|       234|  789| 0.9|      0|
|       234|  777| 0.7|      0|
|       234|  769| 0.6|      1|
|       234|  798| 0.5|      0|
+----------+-----+----+-------+

I then perform the following operations to get a final data set (shown below the code) :

# Add a new column with the clicked ad_id if clicked == 1, 0 otherwise 
df_adClicked = df.withColumn("ad_id_clicked", when(df.clicked==1, df.ad_id).otherwise(0))

# DF -> RDD with tuple : (display_id, (ad_id, prob), clicked)
df_blah = df_adClicked.rdd.map(lambda x : (x[0],(x[1],x[2]),x[4])).toDF(["display_id", "ad_id","clicked_ad_id"])

# Group by display_id and create column with clicked ad_id and list of tuples : (ad_id, prob)
df_blah2 = df_blah.groupby('display_id').agg(F.collect_list('ad_id'), F.max('clicked_ad_id'))

# Define function to sort list of tuples by prob and create list of only ad_ids
def sortByRank(ad_id_list):
    sortedVersion = sorted(ad_id_list, key=itemgetter(1), reverse=True)
    sortedIds = [i[0] for i in sortedVersion]
    return(sortedIds)

# Sort the (ad_id, prob) tuples by using udf/function and create new column ad_id_sorted
sort_ad_id = udf(lambda x : sortByRank(x), ArrayType(IntegerType()))
df_blah3 = df_blah2.withColumn('ad_id_sorted', sort_ad_id('collect_list(ad_id)'))

# Function to change clickedAdId into an array of size 1
def createClickedSet(clickedAdId):
    setOfDocs = [clickedAdId]
    return setOfDocs

clicked_set = udf(lambda y : createClickedSet(y), ArrayType(IntegerType()))
df_blah4 = df_blah3.withColumn('ad_id_set', clicked_set('max(clicked_ad_id)'))

# Select the necessary columns
finalDF = df_blah4.select('display_id', 'ad_id_sorted','ad_id_set')

+----------+--------------------+---------+
|display_id|ad_id_sorted        |ad_id_set|
+----------+--------------------+---------+
|234       |[789, 777, 769, 798]|[769]    |
|123       |[989, 990, 999]     |[990]    |
+----------+--------------------+---------+

Is there a more efficient way of doing this? Doing this set of transformations in the way that I am seems to be the bottle neck in my code. I would greatly appreciate any feedback.

1 Answer 1

1

I haven't done any timing comparisons, but I would think that by not using any UDFs Spark should be able to optimally optimize itself.

#scala:  val dfad = sc.parallelize(Seq((123,989,0.9,0),(123,990,0.8,1),(123,999,0.7,0),(234,789,0.9,0),(234,777,0.7,0),(234,769,0.6,1),(234,798,0.5,0))).toDF("display_id","ad_id","prob","clicked")
#^^^that's^^^ the only difference (besides putting val in front of variables) between this python response and a Scala one

dfad = sc.parallelize(((123,989,0.9,0),(123,990,0.8,1),(123,999,0.7,0),(234,789,0.9,0),(234,777,0.7,0),(234,769,0.6,1),(234,798,0.5,0))).toDF(["display_id","ad_id","prob","clicked"])
dfad.registerTempTable("df_ad")



df1 = sqlContext.sql("SELECT display_id,collect_list(ad_id) ad_id_sorted FROM (SELECT * FROM df_ad SORT BY display_id,prob DESC) x GROUP BY display_id")
+----------+--------------------+
|display_id|        ad_id_sorted|
+----------+--------------------+
|       234|[789, 777, 769, 798]|
|       123|     [989, 990, 999]|
+----------+--------------------+

df2 = sqlContext.sql("SELECT display_id, max(ad_id) as ad_id_set from df_ad where clicked=1 group by display_id")
+----------+---------+
|display_id|ad_id_set|
+----------+---------+
|       234|      769|
|       123|      990|
+----------+---------+


final_df = df1.join(df2,"display_id")
+----------+--------------------+---------+
|display_id|        ad_id_sorted|ad_id_set|
+----------+--------------------+---------+
|       234|[789, 777, 769, 798]|      769|
|       123|     [989, 990, 999]|      990|
+----------+--------------------+---------+

I didn't put the ad_id_set into an Array because you were calculating the max and max should only return 1 value. I'm sure if you really need it in an array you can make that happen.

I included the subtle Scala difference if a future someone using Scala has a similar problem.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for providing this solution. I timed both the solutions. Your solution executed in 1.38 ms and the original solution executed in 2.01 ms. :)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.