0

I am trying to create a new dataframe (in SparkSQL 1.6.2) after applying a mapPartition function as follow:

FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTTF=rows->
{
    List<Row> mappedRows=new ArrayList<Row>();      
    while(rows.hasNext())
    {
        Row row=rows.next();            
        Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5),
                row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),0L);      
        mappedRows.add(mappedRow);

    }
    return mappedRows;

};


JavaRDD<Row> sensorDataDoubleRDD=oldsensorDataDoubleDF.toJavaRDD().mapPartitions(mapPartitonstoTTF);

StructType oldSchema=oldsensorDataDoubleDF.schema();
StructType newSchema =oldSchema.add("TTF",DataTypes.LongType,false);

System.out.println("The new schema is: ");
newSchema.printTreeString();

System.out.println("The old schema is: ");
oldSchema.printTreeString();

DataFrame sensorDataDoubleDF=hc.createDataFrame(sensorDataDoubleRDD, newSchema);
sensorDataDoubleDF.show();

As seen from above I am adding a new LongType column with values of 0 to RDDs using RowFactory.create() function

However, I get exception at line running sensorDataDoubleDF.show(); as follow:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 117 in stage 26.0 failed 4 times, most recent failure: Lost task 117.3 in stage 26.0 (TID 3249, AUPER01-01-20-08-0.prod.vroc.com.au): scala.MatchError: 1435766400001 (of class java.lang.Long)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The old schema is

root
 |-- data_quality: double (nullable = false)
 |-- data_sensor: string (nullable = true)
 |-- data_timestamp: long (nullable = false)
 |-- data_valueDouble: double (nullable = false)
 |-- day: integer (nullable = false)
 |-- dpnode: string (nullable = true)
 |-- dsnode: string (nullable = true)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- nodeid: string (nullable = true)
 |-- nodename: string (nullable = true)

The new schema is like above with addition of a TTF column as LongType

root
 |-- data_quality: double (nullable = false)
 |-- data_sensor: string (nullable = true)
 |-- data_timestamp: long (nullable = false)
 |-- data_valueDouble: double (nullable = false)
 |-- day: integer (nullable = false)
 |-- dpnode: string (nullable = true)
 |-- dsnode: string (nullable = true)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- nodeid: string (nullable = true)
 |-- nodename: string (nullable = true)
 |-- TTF: long (nullable = false)

I appreciate any help to figure it our where I am making mistake.

1 Answer 1

1

You have 11 columns in old schema but you are mapping only 10. Add row.getString(10) in RowFactory.create function.

Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5),
               row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),row.getString(10),0L); 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.