1

In scala+spark, I'm having a dataframe of two columns of Array[String]

scala> val test = spark.sqlContext.read.json(sc.parallelize(Seq("""{"v1":["A", "B", "C"],"v2":["ok", "false", "ok"]}""", """{"v1":["D", "E"],"v2":["false", "ok"]}""")))
test: org.apache.spark.sql.DataFrame = [v1: array<string>, v2: array<string>]

scala> test.show
+---------+---------------+
|       v1|             v2|
+---------+---------------+
|[A, B, C]|[ok, false, ok]|
|   [D, E]|    [false, ok]|
+---------+---------------+

scala> test.printSchema
root
 |-- v1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- v2: array (nullable = true)
 |    |-- element: string (containsNull = true)

I would like to filter the element in v1 based on the value of the corresponding index in v2

I zipped the two columns to try filtering it but I don't see how to do it with .filter() on a Row of Array[String]

scala> val result = test.withColumn("result", arrays_zip($"v1", $"v2")).select("result")
result: org.apache.spark.sql.DataFrame = [result: array<struct<v1:string,v2:string>>]

scala> result.printSchema
root
 |-- result: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- v1: string (nullable = true)
 |    |    |-- v2: string (nullable = true)


scala> result.show
+--------------------+
|              result|
+--------------------+
|[[A, ok], [B, fal...|
|[[D, false], [E, ...|
+--------------------+

Ideally, at the end, I want an row of value like where I'll count, sort and distinct values.

+------+
|result|
+------+
|     A|
|     C|
|     E|
+------+

I have 19milions row in the columns and each row array has a length around a thousand so I wanted for performance issue to mostly use spark functions and avoid UDF if possible.

1 Answer 1

3

I think you are almost there. You can apply explode() to zipped columns and then filter them on required codition. Below code will give you rows with v2 = ok, on which you can execute count, sort, distinct etc.

scala> val result = test.withColumn("result", explode(arrays_zip($"v1", $"v2"))).select("result")
result: org.apache.spark.sql.DataFrame = [result: struct<v1: string, v2: string>]

scala> result.show(false)
+----------+
|result    |
+----------+
|[A, ok]   |
|[B, false]|
|[C, ok]   |
|[D, false]|
|[E, ok]   |
+----------+


scala> val data = result.filter(col("result.v2").equalTo("ok")).select(col("result.v1"))
data: org.apache.spark.sql.DataFrame = [v1: string]

scala> data.show
+---+
| v1|
+---+
|  A|
|  C|
|  E|
+---+
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.