1

I am trying to compare two columns, every column from different DF. I have these two DF:

df1
+----+-------+-------+
|Game|rev_1_t|rev_2_t|
+----+-------+-------+
|  CA|    AA |    AA |
|  FT|    B  |    C  |
+----+-------+-------+

df_prev
+----+-------+-------+
|Game|rev_1_t|rev_2_t|
+----+-------+-------+
|  CA|    C  |   AA  |  
|  FT|    B  |   C   |
+----+-------+-------+

I want to compare rev_1_t from df1 with rev_1_t from df_prev, and if there is a change put a new column called change with "Y" and "N" if there is not change. At the same time, I want to add a new column called prev_value where I store the previous value of rev_1_t that is in df_prev. Same for rev_2_t. The output would be:

Output:
+----+-------+--------+------------+---------+----------+--------------+
|Game|rev_1_t| change | prev_value | rev_2_t | change_2 | prev_value_2 | 
+----+-------+--------+------------+---------+----------+--------------+
|  CA|    C  |   Y    |      C     |     AA  |   Y      |      C       |
|  FT|    B  |   Y    |      B     |     C   |   Y      |      B       |
+----+-------+--------+------------+---------+----------+--------------+

I am trying to do as you can see here but I am having different errors:

val change = df1.withColumn(
   "change", when (df1("rev_1_t") === df_prev("rev_1_t"), df1("rev_1_t")).otherwise(df_prev("rev_1_t"))
  .withColumn(
   "prev_value", when(df1("rev_1_t") === df_prev("rev_1_t"), "N").otherwise("Y"))

1 Answer 1

1

You can do a join and then compare the relevant columns:

import org.apache.spark.sql.expressions.Window

val result = df1.join(df_prev, Seq("Game"), "left")
    .select(col("Game"), 
            df1("rev_1_t"), 
            when(df1("rev_1_t") === df_prev("rev_1_t"), "N").otherwise("Y").as("change"), 
            df_prev("rev_1_t").as("prev_value"), 
            df1("rev_2_t"), 
            when(df1("rev_2_t") === df_prev("rev_2_t"), "N").otherwise("Y").as("change_2"), 
            df_prev("rev_2_t").as("prev_value_2")
    )
    .withColumn("change", max("change").over(Window.orderBy(lit(1))))
    .withColumn("change_2", max("change_2").over(Window.orderBy(lit(1))))

result.show
+----+-------+------+----------+-------+--------+------------+
|Game|rev_1_t|change|prev_value|rev_2_t|change_2|prev_value_2|
+----+-------+------+----------+-------+--------+------------+
|  CA|     AA|     Y|         C|     AA|       N|          AA|
|  FT|      B|     Y|         B|      C|       N|           C|
+----+-------+------+----------+-------+--------+------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Wow you are so fast always! My bad, here in the column change, if there is a just one or more changes always be "Y" otherwise "N", so in your output example, the column change for Game FT would be Y too.
Hi mck! Im getting the following error with this code: AnalysisException: resolved attribute(s) in Spark . What could be the problem? I am trying to clone the DF into another one but the error persists ...

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.