1

I want to compare two columns in a Spark DataFrame: if the value of a column (attr_value) is found in values of another (attr_valuelist) I want only that value to be kept. Otherwise, the column value should be null.

For example, given the following input

id1 id2   attrname  attr_value   attr_valuelist
1   2     test      Yes          Yes, No
2   1     test1     No           Yes, No
3   2     test2     value1       val1, Value1,value2

I would expect the following output

id1 id2   attrname  attr_value   attr_valuelist
1   2     test      Yes          Yes
2   1     test1     No           No
3   2     test2     value1       Value1
3
  • 2
    And if the value of attr_value is not in attr_valuelist, should the row stay unchanged? Commented Jul 20, 2018 at 7:27
  • A Spark custom transformation may help Commented Jul 20, 2018 at 7:49
  • Please change the second column value to null, first column value remains same. Commented Jul 20, 2018 at 7:50

2 Answers 2

4

I assume, given your sample input, that the column with the search item contains a string while the search target is a sequence of strings. Also, I assume you're interested in case-insensitive search.

This is going to be the input (I added a column that would have yielded a null to test the behavior of the UDF I wrote):

+---+---+--------+----------+----------------------+
|id1|id2|attrname|attr_value|attr_valuelist        |
+---+---+--------+----------+----------------------+
|1  |2  |test    |Yes       |[Yes, No]             |
|2  |1  |test1   |No        |[Yes, No]             |
|3  |2  |test2   |value1    |[val1, Value1, value2]|
|3  |2  |test2   |value1    |[val1, value2]        |
+---+---+--------+----------+----------------------+

You can solve your problem with a very simple UDF.

val find = udf {
  (item: String, collection: Seq[String]) =>
    collection.find(_.toLowerCase == item.toLowerCase)
}

val df = spark.createDataFrame(Seq(
  (1, 2, "test", "Yes", Seq("Yes", "No")),
  (2, 1, "test1", "No", Seq("Yes", "No")),
  (3, 2, "test2", "value1", Seq("val1", "Value1", "value2")),
  (3, 2, "test2", "value1", Seq("val1", "value2"))
)).toDF("id1", "id2", "attrname", "attr_value", "attr_valuelist")

df.select(
  $"id1", $"id2", $"attrname", $"attr_value",
  find($"attr_value", $"attr_valuelist") as "attr_valuelist")

showing the output of the last command would yield the following output:

+---+---+--------+----------+--------------+
|id1|id2|attrname|attr_value|attr_valuelist|
+---+---+--------+----------+--------------+
|  1|  2|    test|       Yes|           Yes|
|  2|  1|   test1|        No|            No|
|  3|  2|   test2|    value1|        Value1|
|  3|  2|   test2|    value1|          null|
+---+---+--------+----------+--------------+

You can execute this code in any spark-shell. If you are using this from a job you are submitting to a cluster, remember to import spark.implicits._.

Sign up to request clarification or add additional context in comments.

Comments

2

can you try this code. I think it will work with that SQL contains case when.

val emptyRDD = sc.emptyRDD[Row] 

var emptyDataframe = sqlContext.createDataFrame(emptyRDD, your_dataframe.schema)

your_dataframe.createOrReplaceTempView("tbl")  

emptyDataframe = sqlContext.sql("select id1, id2, attrname, attr_value, case when
attr_valuelist like concat('%', attr_value, '%') then attr_value else
null end as attr_valuelist from tbl") 

emptyDataframe.show

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.