1

I am new to scala. I need some immediate help.

I have M*N spark sql dataframe something like below. I need to compare each row column values with next row column value.

Some thing like A1 to A2,A1 to A3, so on up to N . B1 to B2 B1 to B3 .

Could you someone please guide me how can compare row wise in spark sql?

ID  COLUMN1 Column2
1   A1  B1
2   A2  B2
3   A3  B3

Thank you in Advance Santhosh

3
  • I don't understand if you have to compare values in different columns with each other on a per-row basis or if you want to compare two subsequent rows with each other. Commented Jan 25, 2017 at 12:19
  • What do you mean by in comparing? do you want to add a boolean column whether the current one is the same as the previous one? Also what are you ordering by? the id? are they consecutive? do you need to go over everything (i.e. is there some other column you are grouping by first?) Commented Jan 25, 2017 at 13:05
  • please add -at least- the expected output and even better if you explain the use case and add the code you've tried so far. Commented Jan 25, 2017 at 13:08

1 Answer 1

1

If I understand the question correctly - you want to compare (using some function) each value to the value of the same column in the previous record. You can do that using the lag Window Function:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import spark.implicits._

// some data...
val df = Seq(
  (1, "A1", "B1"),
  (2, "A2", "B2"),
  (3, "A3", "B3")
).toDF("ID","COL1", "COL2")

// some made-up comparisons - fill in whatever you want...
def compareCol1(curr: Column, prev: Column): Column = curr > prev
def compareCol2(curr: Column, prev: Column): Column = concat(curr, prev)

// creating window - ordered by ID
val window = Window.orderBy("ID")

// using the window with lag function to compare to previous value in each column
df.withColumn("COL1-comparison", compareCol1($"COL1", lag("COL1", 1).over(window)))
  .withColumn("COL2-comparison", compareCol2($"COL2", lag("COL2", 1).over(window)))
  .show()

// +---+----+----+---------------+---------------+
// | ID|COL1|COL2|COL1-comparison|COL2-comparison|
// +---+----+----+---------------+---------------+
// |  1|  A1|  B1|           null|           null|
// |  2|  A2|  B2|           true|           B2B1|
// |  3|  A3|  B3|           true|           B3B2|
// +---+----+----+---------------+---------------+
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you for your quick response: getting error Exception in thread "main" org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
Which Spark version are you using? I think it should work out-of-the-box without enabling Hive support in Spark 2.0.x
I am using scala version 2.11.8 and spark mllib_2.11 version 1.6.2

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.