2

I'm trying to operate on a df with the following data:

+---+----------------------------------------------------+
|ka |readingsWFreq                                       |
+---+----------------------------------------------------+
|列  |[[[列,つ],220], [[列,れっ],353], [[列,れつ],47074]]   |
|制  |[[[制,せい],235579]]                                |

And the following structure:

root
 |-- ka: string (nullable = true)
 |-- readingsWFreq: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- furigana: struct (nullable = true)
 |    |    |    |-- _1: string (nullable = true)
 |    |    |    |-- _2: string (nullable = true)
 |    |    |-- Occ: long (nullable = true)

My goal is to split readingsWFreq's values into three different columns. For that purpose I've tried to use udfs as follows:

val uExtractK = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._1._1))
val uExtractR = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._1._2))
val uExtractN = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._2)

val df2 = df.withColumn("K", uExtractK('readingsWFreq))
            .withColumn("R", uExtractR('readingsWFreq))
            .withColumn("N", uExtractN('readingsWFreq))
            .drop('readingsWFreq)

However, I'm getting an exception related to the input parameter of the udfs:

[error] (run-main-0) org.apache.spark.sql.AnalysisException: cannot resolve
'UDF(readingsWFreq)' due to data type mismatch: argument 1 requires
 array<struct<_1:struct<_1:string,_2:string>,_2:bigint>> type, however, 
'`readingsWFreq`' is of
 array<struct<furigana:struct<_1:string,_2:string>,Occ:bigint>> type.;;

My question is, how can I manipulate the dataframe so that it results in the following?

+---+----------------------------------------------------+
|ka |K            |R               |N                    |
+---+----------------------------------------------------+
|列  |[列, 列, 列] | [つ, れっ, れつ] | [220, 353, 47074]   |
|制  |[制]        | [せい]          | [235579]            |
1

2 Answers 2

7

Dataframe API approach:

You don't need an UDF for that, just do :

df.select(
  $"readingsWFreq.furigana._1".as("K"),
  $"readingsWFreq.furigana._2".as("R"),
  $"i.Occ".as("N")
)

The trick here is that . on columns of type array also acts as a mapping/projection operator. On columns of type struct this operator is for selecting an element.

UDF-approach

You cannot pass tuples into UDFs, rather you need to pass them as Rows, see e.g. Using Spark UDFs with struct sequences

In your case you have nested tuples, therefore you need to decompose the row twice:

import org.apache.spark.sql.Row


val uExtractK = udf((kWFreq:Seq[Row]) => kWFreq.map(r => r.getAs[Row](0).getAs[String](0)))
val uExtractR = udf((kWFreq:Seq[Row]) => kWFreq.map(r => r.getAs[Row](0).getAs[String](1)))
val uExtractN = udf((kWFreq:Seq[Row]) => kWFreq.map(r => r.getAs[Long](1)))

Or with pattern-matching on Row:

val uExtractK = udf((kWFreq:Seq[Row]) => kWFreq.map{case Row(kr:Row,n:Long) => kr match {case Row(k:String,r:String) => k}})
val uExtractR = udf((kWFreq:Seq[Row]) => kWFreq.map{case Row(kr:Row,n:Long) => kr match {case Row(k:String,r:String) => r}})
val uExtractN = udf((kWFreq:Seq[Row]) => kWFreq.map{case Row(kr:Row,n:Long) =>  n})
Sign up to request clarification or add additional context in comments.

Comments

1

You could explode the outer array at first and get each value and again group later and collect as a list with collect_list.

val df1 = df.withColumn("readingsWFreq", explode($"readingsWFreq"))

df1.select("ka", "readingsWFreq.furigana.*", "readingsWFreq.Occ")
    .groupBy("ka").agg(collect_list("_1").as("K"),
                  collect_list("_2").as("R"),
                  collect_list("Occ").as("N")
     )

Hope this helps!

3 Comments

are you sure the ordering is maintained? AFAIK you cannot rely on this in spark in general
Yeah, the ordering is not guaranteed, but the there hasn't mention about the order in question.
You could use posexplode instead to have an explicit ordering.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.