0

With Java and Spark, if I have a Dataset<Row> where I want to add a column filled with the values following the sequence 1 2 2 3 3 3...

I made a little Iterator<Integer> implementation but how can I tell Spark to create the column and for each row call my Iterator.next() method ? I was hoping I could pass a function to WithColumn but there is no such handle.

public class ExampleIterator implements Iterator<Integer> {

    private int n = 0;
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
      return ((1 + (int)Math.sqrt(1+(8*n++))) /2);
    }
}
3
  • You want to add this pattern to a column in you dataset: 1 2 2 3 3 3 4 4 4 4 5 5 5 5 5 ... ? if so you don't need an iterator for that. Commented Nov 30, 2022 at 8:58
  • @AbdennacerLachiheb Yes, do you have an implementation without iterator ? Commented Nov 30, 2022 at 9:04
  • I added that java code also Commented Nov 30, 2022 at 10:06

1 Answer 1

1

Here's a solution using Scala with udf, I can transform it to Java if you want but the idea is the same, I used your function to generate the new column:

  val columns = Seq("c1", "c2")
  val data = Seq((4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7))
  val df = spark.sparkContext.parallelize(data).toDF(columns: _*)
  var n = -1;
  val createSeq = () => {
    n = n + 1
    (1 + Math.sqrt(1 + (8 * n)).toInt) / 2
  }
  val createSeqUDF = udf(createSeq)
  df.withColumn("c3", createSeqUDF()).show(false)

Make sure to define the variable n ouside of your udf function, here's the results:

+---+---+---+
|c1 |c2 |c3 |
+---+---+---+
|4  |7  |1  |
|4  |7  |2  |
|4  |7  |2  |
|4  |7  |3  |
|4  |7  |3  |
|4  |7  |3  |
|4  |7  |4  |
|4  |7  |4  |
|4  |7  |4  |
|4  |7  |4  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |6  |
+---+---+---+

Here's the Java code:

    long n = 0;

    void test(SparkSession session) {
        session.sqlContext().udf().register("seqUdf", o -> (1 + (long) Math.sqrt(1 + (8 * n++))) / 2, DataTypes.LongType);
        List<String> data = Arrays.asList("abc", "klm", "xyz", "abc", "klm", "xyz", "abc", "klm", "xyz", "abc", "klm", "xyz");
        Dataset<String> dataDs = session.createDataset(data, Encoders.STRING());
        Dataset<Row> results = dataDs.withColumn("newColumn",
                functions.callUDF("seqUdf", dataDs.col("value")));
        results.show(false);
    }
Sign up to request clarification or add additional context in comments.

1 Comment

I was trying to make my own Java implementation but I got distracted by issues when with winutils not found when executing this little example in a unit test. Thank you for the implementation. I dont like how n is declared but it's a java problem, not a spark one :)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.