1

I am creating a dataframe using

  val snDump = table_raw
    .applyMapping(mappings = Seq(
      ("event_id", "string", "eventid", "string"),
      ("lot-number", "string", "lotnumber", "string"),
      ("serial-number", "string", "serialnumber", "string"),
      ("event-time", "bigint", "eventtime", "bigint"),
      ("companyid", "string", "companyid", "string")),
      caseSensitive = false, transformationContext = "sn")
    .toDF()
    .groupBy(col("eventid"), col("lotnumber"), col("companyid"))
    .agg(collect_list(struct("serialnumber", "eventtime")).alias("snetlist"))
    .createOrReplaceTempView("sn")

I have data like this in the df

    eventid | lotnumber | companyid | snetlist
    123     | 4q22      | tu56ff    | [[12345,67438]]
    456     | 4q22      | tu56ff    | [[12346,67434]]
    258     | 4q22      | tu56ff    | [[12347,67455], [12333,67455]]
    999     | 4q22      | tu56ff    | [[12348,67459]]

I want to explode it put data in 2 columns in my table for that what I am doing is

    val serialNumberEvents = snDump.select(col("eventid"), col("lotnumber"), explode(col("snetlist")).alias("serialN"), explode(col("snetlist")).alias("eventT"), col("companyid"))

Also tried

    val serialNumberEvents = snDump.select(col("eventid"), col("lotnumber"), col($"snetlist.serialnumber").alias("serialN"), col($"snetlist.eventtime").alias("eventT"), col("companyid"))

but it turns out that explode can be only used once and I get error in the select so how do I use explode/or something else to achieve what I am trying to.

    eventid | lotnumber | companyid | serialN  | eventT |
    123     | 4q22      | tu56ff    | 12345    | 67438  |
    456     | 4q22      | tu56ff    | 12346    | 67434  |
    258     | 4q22      | tu56ff    | 12347    | 67455  |
    258     | 4q22      | tu56ff    | 12333    | 67455  |
    999     | 4q22      | tu56ff    | 12348    | 67459  |

I have looked at a lot of stackoverflow threads but none of it helped me. It is possible that such question is already answered but my understanding of scala is very less which might have made me not understand the answer. If this is a duplicate then someone could direct me to the correct answer. Any help is appreciated.

2
  • You can explode twice. Commented Aug 26, 2019 at 18:53
  • Only one generator allowed per select is the error I get on exploding twice Commented Aug 26, 2019 at 18:58

3 Answers 3

2

First, explode the array in a temporary struct-column, then unpack it:

val serialNumberEvents = snDump
  .withColumn("tmp",explode((col("snetlist"))))
  .select(
    col("eventid"),
    col("lotnumber"),
    col("companyid"),
    // unpack struct
    col("tmp.serialnumber").as("serialN"),
    col("tmp.eventtime").as("serialT")
  )
Sign up to request clarification or add additional context in comments.

2 Comments

I notice the point on unpack. Can you elaborate? Do you mean getItem(0), etc.?
@thebluephantom yes, but I think getItem(0) etc is only for arrays, in struct you can access the item by name
2

The trick is to pack the columns you want to explode in an array (or struct), use explode on the array and then unpack them.

val col_names = Seq("eventid", "lotnumber", "companyid", "snetlist")
val data = Seq(
    (123, "4q22", "tu56ff", Seq(Seq(12345,67438))),
    (456, "4q22", "tu56ff", Seq(Seq(12346,67434))),
    (258, "4q22", "tu56ff", Seq(Seq(12347,67455), Seq(12333,67455))),
    (999, "4q22", "tu56ff", Seq(Seq(12348,67459)))
    )

val snDump = spark.createDataFrame(data).toDF(col_names: _*)

val serialNumberEvents = snDump.select(col("eventid"), col("lotnumber"), explode(col("snetlist")).alias("snetlist"), col("companyid"))

val exploded = serialNumberEvents.select($"eventid", $"lotnumber", $"snetlist".getItem(0).alias("serialN"), $"snetlist".getItem(1).alias("eventT"), $"companyid")
exploded.show()

Note that my snetlist has the schema Array(Array) rather then Array(Struct). You can simply get this by also creating an array instead of a struct out of your columns

Comments

1

Another approach, if needing to explode twice, is as follows - for another example, but to demonstrate the point:

val flattened2 = df.select($"director", explode($"films.actors").as("actors_flat"))
val flattened3 = flattened2.select($"director", explode($"actors_flat").as("actors_flattened"))

See Is there an efficient way to join two large Datasets with (deeper) nested array field? for a slightly different context, but same approach applies.

This answer in response to your assertion you can only explode once.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.