0

I have a dataframe df_original like below enter image description here

I want to convert it into a nested json format like below

enter image description here

So far I hv done this

val df_original =data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val data1 = data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val resultDf = df_original.join(data1, Seq("unique_id")).dropDuplicates()

which generates the below json

{
  "unique_id": "12345678",
  "transaction_status": "posted",
  "amount": "116.26",
  "category": "Family",
  "email_id": "[email protected]",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": [
    {
      "acct_no": "51663",
      "ciskey": "47626220"
    },
    {
      "acct_no": "51663",
      "ciskey": "47626221"
    }, 
    {
      "acct_no": "51663",
      "ciskey": "47626222"
    }

  ]
}

Please help me to move forward
1
  • First of all please don't attach the images of data. Its very difficult to mimic the test data for consumers Commented Jun 13, 2020 at 4:14

2 Answers 2

1

another alternative-

Load the test data

  val data =
      """
        |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey
        |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626220
        |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626221
        |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626222
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626220|
      * |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626221|
      * |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626222|
      * +------------------+------+--------+--------------+---------+-------+--------+
      *
      * root
      * |-- transaction_status: string (nullable = true)
      * |-- amount: double (nullable = true)
      * |-- category: string (nullable = true)
      * |-- email_id: string (nullable = true)
      * |-- unique_id: integer (nullable = true)
      * |-- acct_no: integer (nullable = true)
      * |-- ciskey: integer (nullable = true)
      */

create required json

    val groupBy = df.columns.filter(_!="ciskey")
    df.groupBy(groupBy.map(col): _*).agg(collect_list($"ciskey").as("accounts"))
      .withColumn("ciskey", element_at($"accounts", 1) )
      .withColumn("customers", expr("TRANSFORM(accounts, " +
        "x -> named_struct('ciskey_no', x, 'ciskey_val', 'IND'))"))
      .withColumn("accounts",
        struct($"acct_no", $"customers"))
      .drop("customers")
      .toJSON
      .show(false)

    /**
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |value                                                                                                                                                                                                                                                                                                                          |
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"[email protected]","unique_id":12345678,"acct_no":51663,"accounts":{"acct_no":51663,"customers":[{"ciskey_no":47626220,"ciskey_val":"IND"},{"ciskey_no":47626221,"ciskey_val":"IND"},{"ciskey_no":47626222,"ciskey_val":"IND"}]},"ciskey":47626220}|
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      */

Json-

{
    "transaction_status": "posted",
    "amount": 116.26,
    "category": "Family",
    "email_id": "[email protected]",
    "unique_id": 12345678,
    "acct_no": 51663,
    "accounts": {
        "acct_no": 51663,
        "customers": [{
            "ciskey_no": 47626220,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626221,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626222,
            "ciskey_val": "IND"
        }]
    },
    "ciskey": 47626220
}
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the response.I was looking something like this.Seems that "withColumn("customers", expr("TRANSFORM(accounts, " + "x -> named_struct('ciskey_no', x, 'ciskey_val', 'IND'))"))" This is the part I was not able to figure out.
is there a without the transformation function.I amusing spark 2.3
then you need to create UDF. Meanwhile, please check other answers if it helps to get the same solution using spark 2.3 :)
1

Check below code.

scala> df.show(false)
+------------------+------+--------+--------------+---------+-------+--------+
|transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
+------------------+------+--------+--------------+---------+-------+--------+
|posted            |116.26|Family  |[email protected]|12345678 |51663  |47626220|
|posted            |116.26|Family  |[email protected]|12345678 |51663  |47626221|
|posted            |116.26|Family  |[email protected]|12345678 |51663  |47626222|
+------------------+------+--------+--------------+---------+-------+--------+
scala> 

df
.groupBy($"unique_id")
.agg(
    collect_set(
        struct(
            $"transaction_status",
            $"amount",
            $"category",
            $"email_id",
            $"unique_id",
            $"acct_no"
        )).as("json_data"),
    first($"ciskey").as("ciskey"),
    first("acct_no").as("acct_no"),
    collect_list(struct($"ciskey")).as("customers")
)
.withColumn("json_data",explode($"json_data"))
.withColumn("accounts",struct($"acct_no",$"customers"))
.select($"json_data.*",$"ciskey",$"accounts")
.toJSON
.show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"[email protected]","unique_id":"12345678","acct_no":"51663","ciskey":"47626220","accounts":{"acct_no":"51663","customers":[{"ciskey":"47626220"},{"ciskey":"47626221"},{"ciskey":"47626222"}]}}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Above code generate data like below, may be you can add logic on top of this.

{
  "transaction_status": "posted",
  "amount": 116.26,
  "category": "Family",
  "email_id": "[email protected]",
  "unique_id": "12345678",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": {
    "acct_no": "51663",
    "customers": [
      {
        "ciskey": "47626220"
      },
      {
        "ciskey": "47626221"
      },
      {
        "ciskey": "47626222"
      }
    ]
  }
}

1 Comment

Thanks for the response.Though this is a very good solution I was looking for something generic where i do not have to pass all the columns(I have a lot ofcolumns to process).There is another answer which worked for me.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.