0

I have a json which has below schema:

 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)

And the output that i am trying to achieve is:

 PoolId ClientID Client_Active
 1      1        true
 1      2        false
 2      1        true

This schema keeps on changing with json.Eg for now there are 2 Pool id, there may be another json which will have 5 Pool Id and same is with CLient Id.

The problem with is :

  1. We cant use Explode on struct.
  2. Pool cant be converted to Map as each time client has different client ID that leads to different schema for each row.

Any thought how to achieve this?

I have tried this link for converting to Struct to Map and then exploding but it doesn't work when there are different numbers of Client IDs in different Pool.

2
  • Why are you using Spark for this? This is just a basic JSON handling, most higher-level languages can deal with it just fine. If you are going to have a large json with millions of Pool IDs, Spark is also not a good choice for this, since Spark does not scale up well horizontally. Commented Jan 6, 2020 at 6:56
  • I need to do this using spark as this is part of my pipeline. Commented Jan 6, 2020 at 7:02

1 Answer 1

4

From my perspective you only need to define an UDF.

Here's an example :

  1. Define a projection case class (what you want as a resulting structure)
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
  1. Define an UDF like the one below, allowing you to work both with your structure (fields) and data:
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
  1. Use your UDF :
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache

newDF.show(false)

The output is the expected one :

+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.