1

Halo

i'm done to build recommendation using Mllib ALS in apache spark, with output

user | product | rating
    1 | 20 | 0.002
    1 | 30 | 0.001
    1 | 10 | 0.003
    2 | 20 | 0.002
    2 | 30 | 0.001
    2 | 10 | 0.003

but i need to change data structure based on sort by rating, like that :

user | product | rating | number_rangking
    1 | 10 | 0.003 | 1
    1 | 20 | 0.002 | 2 
    1 | 30 | 0.001 | 3
    2 | 10 | 0.002 | 1
    2 | 20 | 0.001 | 2
    2 | 30 | 0.003 | 3

how can i do that? maybe any one can give me a clue...

thx

0

1 Answer 1

1

All you need is a window functions depending on details you choose either rank or rowNumber

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank

val w = Window.partitionBy($"user").orderBy($"rating".desc)

df.select($"*", rank.over(w).alias("number_rangking")).show
// +----+-------+------+---------------+
// |user|product|rating|number_rangking|
// +----+-------+------+---------------+
// |   1|     10| 0.003|              1|
// |   1|     20| 0.002|              2|
// |   1|     30| 0.001|              3|
// |   2|     10| 0.003|              1|
// |   2|     20| 0.002|              2|
// |   2|     30| 0.001|              3|
// +----+-------+------+---------------+

Using plain RDD you can groupByKey, process locally and flatMap:

rdd
  // Convert to PairRDD
  .map{case (user, product, rating) => (user, (product, rating))}
  .groupByKey 
  .flatMap{case (user, vals) => vals.toArray
    .sortBy(-_._2) // Sort by rating
    .zipWithIndex // Add index
    // Yield final values
    .map{case ((product, rating), idx) => (user, product, rating, idx + 1)}}
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.