1

(Edited Feb 14th)

Let's say I have a Spark (PySpark) dataframe with the following schema:

root
 |-- myarray: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- myindices: array (nullable = true)
 |    |-- element: integer (containsNull = true)

It looks like:

+--------------------+----------+
|          myarray   | myindices|
+--------------------+----------+
|                 [A]|    [0]   |
|              [B, C]|    [1]   |
|        [D, E, F, G]|   [0,2]  |
+--------------------+----------+

How can I use the second array to index the first?

My goal is to create a new dataframe which would look like:

+--------------------+----------+------+
|          myarray   | myindices|result|
+--------------------+----------+------+
|                 [A]|    [0]   |  [A] |
|              [B, C]|    [1]   |  [C] |
|        [D, E, F, G]|   [0,2]  | [D,F]|
+--------------------+----------+------+

(It is safe to assume that the contents of myindices are always guaranteed to be within the cardinality of myarray for the row in question, so there are no out-of-bounds problems.)

It appears that the .getItem() method only works with single arguments, so I might need a UDF here, but I know of no way to create a UDF that has more than one column as input. Any solutions, with or without UDFs?

4
  • 3
    df.withColumn('item', df['myarray'].getItem(df['myposition'])) Commented Feb 14, 2017 at 3:28
  • 1
    @zhangtong this should be an answer, not a comment. Commented Feb 14, 2017 at 5:21
  • @zhangtong : thanks; unfortunately my actual need is a little more complicated. I have edited the question to make this clearer. Can you please look at the reformulated question and see if you have any suggestions? Commented Feb 15, 2017 at 0:08
  • @xenocyon see below Commented Feb 15, 2017 at 1:01

1 Answer 1

4
from pyspark.sql import functions as f

rdd = spark.sparkContext.parallelize([(['A'], [0]), (['B', 'C'], [1]), (['D', 'E', 'F'], [0, 2])])
df = spark.createDataFrame(rdd, ['myarray', 'myindices'])
my_UDF = f.UserDefinedFunction(lambda x, y: map(lambda z: x[z], y), returnType=ArrayType(StringType()))
res = df.withColumn('result', my_UDF(df['myarray'], df['myindices']))
res.show(truncate=False)

output:
+---------+---------+------+
|myarray  |myindices|result|
+---------+---------+------+
|[A]      |[0]      |[A]   |
|[B, C]   |[1]      |[C]   |
|[D, E, F]|[0, 2]   |[D, F]|
+---------+---------+------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks, this works perfectly and is a neat example of a UDF taking two columns as arguments.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.