4

I have data that looks like this

+--------------+---------+-------+---------+
|       dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
|          Best|     tree|      5|      533|
|            OK|     bush|      e|     3535|
|           MEH|      cow|      -|     3353|
|           MEH|      oak|   none|       12|
+--------------+---------+-------+---------+

and I'm trying to get it into the output of

+--------------+---------+
|       dataOne|    Count|
+--------------+---------|
|          Best|        1|
|            OK|        1|
|           Meh|        2|
+--------------+---------+

I have no problem getting the dataOne into a dataframe by itself and showing the contents of it in order to make sure I'm just grabbing the dataOne column, However I can't seem to find the correct syntax for either turning that sql query into a the data I need. I tried creating this following dataframe from the temp view created by the entire data set

Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from 
dataFrame group by dataOne");
dataOneCount.show();

But spark The documentation I was able to find on this only showed how to do this type of aggregation in spark 1.6 and prior so any help would be appreciated.

Here's the error message I get, However I've checked the data and there is no indexing error in there.

 java.lang.ArrayIndexOutOfBoundsException: 11

I've also tried applying the functions() method countDistinct

Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();

where dataOneDataFrame is a dataFrame created from running

select dataOne from dataFrame

But it returns an analysis exception, I'm still new to spark so I'm not sure if there's an error with how/when I'm evaluating the countDistinct method

edit: To clarify, the first table shown is the result of the dataFrame I've created from reading the text file and applying a custom schema to it (they are still all strings)

Dataset<Row> dataFrame 

Here is my full code

public static void main(String[] args) {


    SparkSession spark = SparkSession
            .builder()
            .appName("Log File Reader")
            .getOrCreate();

    //args[0] is the textfile location
    JavaRDD<String> logsRDD = spark.sparkContext()
            .textFile(args[0],1)
            .toJavaRDD();

    String schemaString = "dataOne OtherData dataTwo dataThree";

    List<StructField> fields = new ArrayList<>();
    String[] fieldName = schemaString.split(" ");


    for (String field : fieldName){
        fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
    }
    StructType schema = DataTypes.createStructType(fields);

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
       String[] attributes = record.split(" ");
       return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
    });


    Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);

    //first attempt
    dF.groupBy(col("dataOne")).count().show();

    //Trying with a sql statement
    dF.createOrReplaceTempView("view");
    dF.sparkSession().sql("select command, count(*) from view group by command").show();

The most likely thing that comes to mind is the lambda function that returns the row using RowFactory? The idea seems sound but I'm not sure how it really holds up or if there's another way I could do it. Other than that I'm quite puzzled

sample data

best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12

1 Answer 1

2

Using Scala syntax for convenience. It's very similar to the Java syntax:

// Input data
val df = {
  import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  import scala.collection.JavaConverters._

  val simpleSchema = StructType(
    StructField("dataOne", StringType) ::
    StructField("OtherData", StringType) ::
    StructField("dataTwo", StringType) ::
    StructField("dataThree", IntegerType) :: Nil)

  val data = List(
    Row("Best", "tree", "5", 533),
    Row("OK", "bush", "e", 3535),
    Row("MEH", "cow", "-", 3353),
    Row("MEH", "oak", "none", 12)
  )

  spark.createDataFrame(data.asJava, simpleSchema)
}

df.show
+-------+---------+-------+---------+
|dataOne|OtherData|dataTwo|dataThree|
+-------+---------+-------+---------+
|   Best|     tree|      5|      533|
|     OK|     bush|      e|     3535|
|    MEH|      cow|      -|     3353|
|    MEH|      oak|   none|       12|
+-------+---------+-------+---------+
df.groupBy(col("dataOne")).count().show()
+-------+-----+
|dataOne|count|
+-------+-----+
|    MEH|    2|
|   Best|    1|
|     OK|    1|
+-------+-----+

I can submit the Java code given above as follows with the four row data file on S3 and it works fine:

$SPARK_HOME/bin/spark-submit \
  --class sparktest.FromStackOverflow \
  --packages "org.apache.hadoop:hadoop-aws:2.7.3" \
  target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt"
Sign up to request clarification or add additional context in comments.

8 Comments

I've already tried this approach in my java program and it returns java.lang.ArrayIndexOutOfBoundsException: 11
Can you reproduce it with a small isolated example like what I showed? What version of Spark are you using? Have you tried others?
Added the full code, using spark 2.1, I'm not looking to regress to previous versions of spark though
I used your exact Java code. I had to change "command" to "dataOne". There is no column named "command" in your dataframe. But other than that it worked perfectly with a local Spark 2.1.1 install.
I should of been more explicit sorry, I'm trying to deploy this code to a local cluster, pulling the file stored in a hadoop file system. Perhaps it is an environment problem that is causing this issue. Although it puzzles me how as it runs simple sql commands like select * from some tempview
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.