2

I am trying to get avg of ratings of all JSON objects in a file. I loaded the file and converted to data frame but getting error while parsing for avg. Sample Request :

{
        "country": "France",
        "customerId": "France001",
        "visited": [
            {
                "placeName": "US",
                "rating": "2.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"

            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"

            },
              {
                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"

            }        
    ]
}

so for this JSON, US avg rating will be (2.3 + 3.3)/2 = 2.8

{
        "country": "Egypt",
        "customerId": "Egypt009",
        "visited": [
            {
                "placeName": "US",
                "rating": "1.3",
                "famousRest": "McDonald",
                "placeId": "Dedcf3"

            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "EagleNest",
                "placeId": "CDfet3"

            },


}

{
        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
            {
                "placeName": "UK",
                "rating": "3.3",
                "famousRest": "N/A",
                "placeId": "XSdce2"

            },


    ]
}

for this avg for us= (3.3 +1.3)/2 = 2.3

so over all, the average rating will be : (2.8 + 2.3)/2 = 2.55 (only two requests have 'US' in their visited list)

My schema :

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show() 

so basically I need to get average of ratings where placeName = 'US' in say for eg. AVG_RATING = sum of rating in each JSON object where placeName is US / count of such visited entry and FINAL_VALUE = Sum of all AVG_RATING in each JSON object with placeName 'US' / count of all JSON objects with placeName = 'US' .

So far I tried :

 df.registerTempTable("people")
   sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US' ").collect().foreach(println)

    val result = df.select("*").where(array_contains (df("visited.placeName"), "US"));  - gives the list where visited array contains US. But I am not sure how do parse through list of structs.

Can some one tell me how do I do this ?

11
  • 2
    could you paste just one sample json for trying? Commented Jun 30, 2016 at 4:27
  • 1
    @WoodChopper updated with a sample json object Commented Jun 30, 2016 at 4:40
  • Did u find the solution for this? Commented Jun 30, 2016 at 9:36
  • @user3407267 No, trying many different options but no solution yet. Still stuck. Commented Jun 30, 2016 at 9:58
  • 1
    I don't get it. 2.3 + 3.3 = 5.6, 5.6 / 2 = 2.8 != 4.25 Commented Jun 30, 2016 at 13:28

2 Answers 2

2

It looks you want something like this:

import org.apache.spark.sql.functions.{avg, explode}

val result = df
  .withColumn("visit", explode($"visited"))    // Explode visits
  .groupBy($"customerId", $"visit.placeName")  // Group by using dot syntax
  .agg(avg($"visit.rating".cast("double")).alias("tmp"))
  .groupBy($"placeName").agg(avg($"tmp").alias("value"))

After that you can filter this for a country of your choice.

result.where($"placeName" === "US").show
// +---------+-----+
// |placeName|value|
// +---------+-----+
// |       US| 2.55|
// +---------+-----+

Less elegant approach is to use an UDF:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf

def userAverage(country: String) = udf((visits: Seq[Row]) => Try {
   val filtered = visits
     .filter(_.getAs[String]("placeName") == country)
     .map(_.getAs[String]("rating").toDouble)
   filtered.sum / filtered.size
}.toOption)

df.select(userAverage("US")($"visited").as("tmp")).na.drop.agg(avg("tmp"))

Note: This follows the decription provided in the question by computing average of averages which is different from the accepted answer. For simple average:

val result = df
  .select(explode($"visited").alias("visit"))
  .groupBy($"visit.placeName")
  .agg(avg($"visit.rating".cast("double")))
Sign up to request clarification or add additional context in comments.

4 Comments

Well, you can try an UDF to avoid explode but in general Spark support for nested structures is fairly limited so I wouldn't expect impressive performance, especially with JSON.
Thank you for ur explanation. Can I add filter here ? like saying use the rating for us in avg calc only if it is above 3 (visited.rating)?
val result = df .withColumn("visit", explode($"visited")).filter($"visited.rating" > 3.0) .groupBy($"customerId", $"visit.placeName") .agg(avg($"visit.rating".cast("double")).alias("tmp")) .groupBy($"placeName").agg(avg($"tmp").alias("value"))
I used the above to filter the rating (include only those are > 3) Am I doing the correct approach ?
0

Follows my solution to your problem.

val DF = sqlContext.jsonFile("sample.json")


DF.registerTempTable("temp")


sqlContext.sql("select place_and_rating.placeName as placeName, avg(place_and_rating.rating) as avg_rating from temp lateral view explode(visited) exploded_table as place_and_rating where place_and_rating.placeName='US' group by place_and_rating.placeName").show()

11 Comments

I am getting failure: ``union'' expected but identifier view found for ur solution. Am I missing anything ?
do I need to import anything ?
You don't need to import anything for this solution. jsonFile & resisterTempTable are inbuilt methods available for sqlContext and rest is just HQL.
What version of Spark are you using? And can you please post your code in the comments section?
spark 1.5.2 I created a json file with given structure. Used :sqlContext.sql("select place_and_rating.placeName as placeName, avg(place_and_rating.rating) as avg_rating from temp lateral view explode(visited) exploded_table as place_and_rating where place_and_rating.placeName='US' group by place_and_rating.placeName").show()
|