2

I have two Spark Dataframes, the first one contains information from Events as below:

Id User_id Date
1 1 2020-12-01
2 2 2021-10-10

The second Dataframe contains information related to Purchase as following:

Id User_id Date Value
1 1 2020-11-10 50
2 1 2020-10-10 25
3 2 2020-09-15 100

I want to join both dataframes and create a column containing the last Value, another column with the difference between the the past 2 values, and the date difference in days between past 2 purchase as below:

Id User_id Date Last_Value Diff_Value Diff_Date
1 1 2020-12-01 50 25 30
2 2 2021-10-10 100 null null

To join the dataframes I'm using the following code:

(Events.join(Purchase,
         on = [Events.User_id == Purchase.User_id,
               Events.Date >= Purchase.Date],
         how = "left")
   .withColumn('rank_date', F.rank().over(W.partitionBy(Events['Id']).orderBy(Purchase['Data'].desc())))

With this code I can see what are the Purchase prior to Event ordered by Date, but how can I access rows values and create a columns based on those values?

2
  • Dates showed in events are not related to those in purchase ? I would think that "last purchase event of user_id 1 happened at 2020-12-01 for a value of 50". Commented Sep 28, 2021 at 15:26
  • The joined dataframe carry the Events Date. The join must contain only purchase that happened before the Events Date, on the Event of user 1 on 2020-12-01 the last purchase done was on 2020-11-10 with a value of 50. But I'm ommitting the Purchase Date when join both dataframes Commented Sep 28, 2021 at 15:30

2 Answers 2

2

I think it's easier to proceed this way:

  • work with purchase dataframe
  • join / filter with events dataframe

It can then be done as:

window_user_id = Window.partitionBy('user_id')

(
    purchase
    .withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
    .withColumn('previous_value',   F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
    .withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
    .withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
    .drop("previous_value")
    .show()
)

+---+-------+----------+-----+-------------+----------+---------+
| id|user_id|      date|value|purchase_rank|diff_value|diff_days|
+---+-------+----------+-----+-------------+----------+---------+
|  2|      1|2020-10-10|   25|            1|         0|     null|
|  1|      1|2020-11-10|   50|            2|        25|       31|
|  3|      2|2020-09-15|  100|            1|         0|     null|
+---+-------+----------+-----+-------------+----------+---------+

From here, it's easier to join / filter with any other data.

To keep only last purchases of each user_id:

(
    purchase
    .withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
    .withColumn('previous_value',   F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
    .withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
    .withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
    .withColumn("last_purchase", F.last("purchase_rank").over(window_user_id))
    .filter(F.col("purchase_rank") == F.col("last_purchase"))
    .drop("previous_value", "purchase_rank", "last_purchase")
    .show()
)

+---+-------+----------+-----+----------+---------+
| id|user_id|      date|value|diff_value|diff_days|
+---+-------+----------+-----+----------+---------+
|  1|      1|2020-11-10|   50|        25|       31|
|  3|      2|2020-09-15|  100|         0|     null|
+---+-------+----------+-----+----------+---------+
Sign up to request clarification or add additional context in comments.

Comments

0

This is how I answered the question working with Events dataframe instead of working with Purchase dataframe:

(Events.join(Purchase,
             on = [Events.User_id == Purchase.User_id,
                   Events.Date >= Purchase.Date],
             how = "left")
   .withColumn('rank_date', F.row_number().over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc())))
   .withColumn('LastValue', when(F.col('rank_date') == 1, F.col('Value')).otherwise(F.lag('Value', 1).over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc()))))
   .withColumn('LastDate', when(F.col('rank_date') == 1, Purchase['Date']).otherwise(F.lag(Purchase['Date'], 1).over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc()))))
   .withColumn('totalPurchase', F.max('rank_date').over(W.partitionBy(Events['Id'])))
   .withColumn('DiffValue', when(F.col('totalPurchase') == 1, 0).otherwise(F.col('LastValue') - F.col('Value')))
   .withColumn('DiffDays', when(F.col('totalPurchase') == 1, None).otherwise(F.datediff(F.col('LastDate'), Purchase['Date'])))
   .withColumn('keepRow', when(F.col('totalPurchase') == 1, 1).otherwise(when(F.col('rank_date') == 2, 1).otherwise(0)))
   .filter(F.col('keepRow') == 1)
   .drop('rank_date', 'LastDate', 'totalPurchase', 'keepRow'))

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.