2

How to define a data type for the below data using StructType in Spark Java?

sam|mars|1234567|"report": {"Details": [{"subject": "science","grade": "A","remark": "good"},{"subject": "maths","grade": "E","remark": "excellent"},{"subject": "geography","grade": "E","remark": "excellent"}]}
harry|venus|987654|"report": {"Details": [{"subject": "science","grade": "O","remark": "outstanding"},{"subject": "history","grade": "A","remark": "good"}]}

The fields are: NAME,ADDRESS,ID,REPORTCARD

I have below code:

        JavaRDD<Row> row = javaRDD.map(new Function<String, Row>(){
            @Override
            public Row call(String line) throws Exception {
                return RowFactory.create((line.split("|")));
            }
        });
    where, 
    javaRDD is created on top of the above input data.

Now i need to convert the javaRDD to a Dataframe(Dataset df) using below line:

            Dataset<Row> df = spark.createDataFrame(row, <STRUCT TYPE SCHEMA>);

I need to create a StructType schema for this. How to define it in Spark Java.

I created below schema of StructType:

            List<StructField> reportFields = new ArrayList<StructField>();
            reportFields.add(DataTypes.createStructField("subject", DataTypes.StringType, true));
            reportFields.add(DataTypes.createStructField("grade", DataTypes.StringType, true));
            reportFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));

            List<StructField> schemaFields = new ArrayList<StructField>();
            schemaFields.add(DataTypes.createStructField("NAME", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("ADDRESS", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("ID", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("REPORTCARD", DataTypes.createStructType(reportFields), true));
            StructType schema = DataTypes.createStructType(schemaFields);

            Dataset<Row> df = spark.createDataFrame(row, schema);

But I am getting below exception:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of struct<subject:string,grade:string,remark:string>
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, NAME), StringType), true, false) AS NAME#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, ADDRESS), StringType), true, false) AS ADDRESS#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, ID), StringType), true, false) AS ID#2
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(subject, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 0, subject), StringType), true, false), grade, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 1, grade), StringType), true, false), remark, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 2, remark), StringType), true, false)) AS REPORTCARD#3
0

2 Answers 2

5

This StructType should be enough

   StructType details = new StructType(new StructField[]{
    new StructField("subject", DataTypes.StringType, false, Metadata.empty()),
    new StructField("grade", DataTypes.StringType, false, Metadata.empty()),
    new StructField("remark", DataTypes.StringType, false, Metadata.empty())
   });

   StructType recordType = new StructType();
   recordType = recordType.add("details", details, false);

   StructType structType = new StructType();
   structType = structType.add("name", DataTypes.StringType, false);
   structType = structType.add("planet", DataTypes.StringType, false);
   structType = structType.add("number", DataTypes.StringType, false);
   structType = structType.add("record", recordType, false);
Sign up to request clarification or add additional context in comments.

1 Comment

The above schema also throws error: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of struct<Details:struct<subject:string,grade:string,remark:string>>
1

You can try this once,

Dataset<Row> temp=row.withColumn("Details",struct("subject","grade","remark"))
                     .agg(collect_list("Details").as("Details")); 

this will work fine for "Details" part!

You can try same for the "report" as well.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.