0

Disclaimer: I'm VERY new to spark and scala. I am working on a document similarity project in Scala with Spark. I have a dataframe which looks like this:

+--------+--------------------+------------------+
|    text|            shingles|   hashed_shingles|
+--------+--------------------+------------------+
|  qwerty|[qwe, wer, ert, rty]|  [-4, -6, -1, -9]|
|qwerasfg|[qwe, wer, era, r...|[-4, -6, 6, -2, 2]|
+--------+--------------------+------------------+

Where I split the document text into shingles and computed a hash value for each one.

Imagine I have a hash_function(integer, seed) -> integer. Now I want to apply n different hash functions of this form to the hashed_shingles arrays. I.e. obtain an array of n arrays such that each array is hash_function(hashed_shingles, seed) with seed from 1 to n.

I'm trying something like this, but I cannot get it to work:

val n = 3
df = df.withColumn("tmp", array_repeat($"hashed_shingles", n)) // Repeat minhashes
val minhash_expr = "transform(tmp,(x,i) -> hash_function(x, i))"
df = df.withColumn("tmp", expr(minhash_expr)) // Apply hash to each array

I know how to do it with a udf, but as I understand they are not optimized and I should try to avoid using them, so I try to do everything with org.apache.spark.sql.functions.

Any ideas on how to approach it without udf?

The udf which achieves the same goal is this:

// Family of hashing functions
class Hasher(seed: Int, max_val : Int, p : Int = 104729) {
  private val random_generator = new scala.util.Random(seed)
  val a = 1 + 2*random_generator.nextInt((p-2)/2)// a odd in [1, p-1]
  val b = 1 + random_generator.nextInt(p - 2) // b in [1, p-1]
  def getHash(x : Int) : Int = ((a*x + b) % p) % max_val
}

// Compute a list of minhashes from a list of hashers given a set of ids
class MinHasher(hashes : List[Hasher]) {
  def getMinHash(set : Seq[Int])(hasher : Hasher) : Int = set.map(hasher.getHash).min
  def getMinHashes(set: Seq[Int]) : Seq[Int] = hashes.map(getMinHash(set))
}

// Minhasher
val minhash_len = 100
val hashes = List.tabulate(minhash_len)(n => new Hasher(n, shingle_bins))
val minhasher = new MinHasher(hashes)

// Compute Minhashes
val minhasherUDF = udf[Seq[Int], Seq[Int]](minhasher.getMinHashes)
df = df.withColumn("minhashes", minhasherUDF('hashed_shingles))
4
  • 1
    It would help to understand the exact problem/requirement if you could provide the failed result/error along with expected result. Commented Nov 5, 2020 at 20:18
  • You are right, I added the udf which achieves the same goal. Commented Nov 6, 2020 at 17:05
  • 1
    Higher-order functions like transform (or aggregate like in this SO answer) are for transforming data of complex type (e.g. Array) "element-wise" with a user-provided function. In your use case, the entire Array is being used as a whole by your custom function, thus it isn't suitable to use transform. I would go with your UDF approach. Commented Nov 6, 2020 at 18:43
  • Thank you, good to know. Commented Nov 6, 2020 at 19:44

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.