0

I have a dataframe df1:

+-------------------+-----+
|      start_date   |value|
+-------------------+-----+
|2019-03-17 00:00:00|   35|
+-------------------+-----+
|2019-05-20 00:00:00|   40|
+-------------------+-----+
|2019-06-03 00:00:00|   10|
+-------------------+-----+
|2019-07-01 00:00:00|   12|
+-------------------+-----+

and another dataframe df_date :

+-------------------+
|       date        |
+-------------------+
|2019-02-01 00:00:00|
+-------------------+
|2019-04-10 00:00:00|
+-------------------+
|2019-06-14 00:00:00|   
+-------------------+

I did the join and now I have df with date , start_date and value but the value I want should be like this :

+-------------------+-------------------+-----+
|      start_date   |       date        |value|
+-------------------+-------------------+-----+
|2019-02-01 00:00:00|2019-03-17 00:00:00|    0|
+-------------------+-------------------+-----+
|2019-04-10 00:00:00|2019-05-20 00:00:00|   35|
+-------------------+-------------------+-----+
|2019-06-14 00:00:00|2019-06-03 00:00:00|   85|
+-------------------+-------------------+-----+ 

everytime I should compare start_date with date if it's different I should add previous value with my value else I should keep the previous value
I already have the new dataframe with the join in Pyspark and trying to have the new value

I used this code to get the results

win = Window.partitionBy().orderBy("date")
df = df.withColumn("prev_date", F.lag(F.col("start_date")).over(win))
df = df.fillna({'prev_date': 0})

df = df.withColumn("value",F.when(F.isnull( F.lag(F.col("value"), 1).over(win)),df.value).when(df.start_date != df.prev_date,df.value + F.lag(F.col("value"), 1).over(win)) .otherwise(F.lag(F.col("value"),1).over(win)))
df.show(df.count(),False) 

The problem that the modifications is done in the same time and I need the previous value everytime

Thank you

10
  • What does the value in the df column mean? Days before start date? I suggest replacing df.value in the fifth line of your code with F.lag(F.col('value'), 1) and see if it helps. Commented Oct 6, 2019 at 5:04
  • @QuantStats the date I take it from another dataframe and I did the join ,and it gives me what I need ,...but the value I need have to change evrytime the start_date changes ...(so if start_date changes the value is the current value in the table + the previous one ,else it's the previous one ) Commented Oct 6, 2019 at 9:30
  • I think I am starting to understand. Can you also show the other dataframe that you used to perform the join? Commented Oct 6, 2019 at 11:30
  • @QuantStats this dataframe +-------------------+ | date | +-------------------+ |2019-02-01 00:00:00| +-------------------+ |2019-04-10 00:00:00| +-------------------+ |2019-06-14 00:00:00| +-------------------+ Commented Oct 6, 2019 at 12:14
  • @QuantStats and the first dataframe with start_date and value Commented Oct 6, 2019 at 12:14

1 Answer 1

1

Here is some code that does what you want.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# step 1: init dataframes

cols = ["start_date", "value"]
data = [["2019-03-17 00:00:00", 35],
["2019-05-20 00:00:00", 40],
        ["2019-06-03 00:00:00", 10],
        ["2019-07-01 00:00:00", 12],
        ]
df = spark.createDataFrame(data, cols)

additional_dates = spark.createDataFrame([["2019-02-01 00:00:00"], ["2019-04-10 00:00:00"], ["2019-06-14 00:00:00"]], ["date"])

# step 2 calculate correct values.
# This is done by joining the df to the additinal dates and summing all values per 'date'
additional_dates = additional_dates.join(df, F.col("date") > F.col("start_date"), "left_outer").fillna(0, subset="value")
additional_dates = additional_dates.groupBy("date").agg(F.sum("value").alias("value"))
# at this point you already have 'date' + the correct value. you only need to join back the original date column

# step 3 get back the original date column
# we do this by joining on the row_number
# note that spark does not have an easy operation for adding a column from another dataframe
window_df = Window.orderBy("start_date")
window_add = Window.orderBy("date")

df = df.withColumn("row_number", F.row_number().over(window_df))
additional_dates = additional_dates.withColumn("row_number", F.row_number().over(window_add))

df = df.drop("value").join(additional_dates, "row_number").drop("row_number")
df.show()
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.