3

I need to implement a auto increment column in my spark sql table, how could i do that. Kindly guide me. i am using pyspark 2.0

Thank you Kalyan

2
  • check this stackoverflow.com/questions/31955309/… Commented Oct 25, 2016 at 4:42
  • @MRSrinivas thanks for your detailed reply i will try it, recently i have tried from pyspark.sql.functions import monotonically_increasing_id for solving the problem it has worked . It gives ids for every row indexing from 0 thank you very much Commented Nov 15, 2016 at 8:12

1 Answer 1

1

I would write/reuse stateful Hive udf and register with pySpark as Spark SQL does have good support for Hive.

check this line @UDFType(deterministic = false, stateful = true) in below code to make sure it's stateful UDF.

package org.apache.hadoop.hive.contrib.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
 * UDFRowSequence.
 */
@Description(name = "row_sequence",
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
  private LongWritable result = new LongWritable();

  public UDFRowSequence() {
    result.set(0);
  }

  public LongWritable evaluate() {
    result.set(result.get() + 1);
    return result;
  }
}

// End UDFRowSequence.java

Now build the jar and add the location when pyspark get's started.

$ pyspark --jars your_jar_name.jar

Then register with sqlContext.

sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")

Now use row_seq() in select query

sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")

Project to use Hive UDFs in pySpark

Sign up to request clarification or add additional context in comments.

4 Comments

I have built the jar as you have specified and created the temporary functions as well. Now I created a table sqlContext.sql("Create table abc(id int, name string)") and sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'John'") and sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'Tim'"). When I do select * statement I am getting both iD as 1 instead of 1 and 2.
Are setting stateful = true inside tag @UDFType in your code?
I need something like this but the question is, Will it scale with data of 200 millions. Actually I want to break a big file containing 200 millions rows in smaller files of exact 10K rows containing file. I thought to add auto-increment number for each row and read in batch with the help of like this (id >10,001 and id < 20,000). Will this work at that scale, please suggest.
is it possible to do this UDF in python? and register it in sqlContext as well?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.