26

I am having a PySpark DataFrame -

valuesCol = [('Sweden',31),('Norway',62),('Iceland',13),('Finland',24),('Denmark',52)]
df = sqlContext.createDataFrame(valuesCol,['name','id'])
+-------+---+
|   name| id|
+-------+---+
| Sweden| 31|
| Norway| 62|
|Iceland| 13|
|Finland| 24|
|Denmark| 52|
+-------+---+

I wish to add a row column to this DataFrame, which is the row number (serial number) of the row, like shown below -

My final output should be:

+-------+---+--------+
|   name| id|row_num |
+-------+---+--------+
| Sweden| 31|       1|
| Norway| 62|       2|
|Iceland| 13|       3|
|Finland| 24|       4|
|Denmark| 52|       5|
+-------+---+--------+

My Spark version is 2.2

I am trying this code, but it doesn't work -

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().orderBy()
df = df.withColumn("row_num", row_number().over(w))
df.show()

I am getting an Error:

AnalysisException: 'Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'

If I understand it correctly, I need to order some column, but I don't want something like this w = Window().orderBy('id') because that will reorder the entire DataFrame.

Can anyone suggest how to achieve the above mentioned output using row_number() function?

1

3 Answers 3

51

You should define column for order clause. If you don't need to order values then write a dummy value. Try below;

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))
Sign up to request clarification or add additional context in comments.

11 Comments

Thank you Sir. It works perfectly. Just a small question - I was missing ´lit('A')´. Can you kindly explain what is this part of the code doing? What is 'A' here, as it doesn't appear in the final output anyway. I will accept it as an answer anyway because that yields the output expected.
It is a dummy value. It means nothing you can write anything instead of A
Understood, thanks :) Just one last question - I have seen that row_number() is used along with partitionBy() many a times, so if I load data from HDFS and add a column of row numbers, like above, will there be a reshuffle on the partitions? I know that Spark will only trigger an execution when an action is called and the Catalyst will rearrange operations to yield an optimal solution. My Query: I think there will be no repartitioning of the data by using row_numbers() after we load data from HDFS (and before we invoke any action), but just wanted to seek your perspective!
I think it will work. if you don't need to group data and obtain row numbers for each group, no need to use partitionBy clause.
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. I get the above warning Is there any better way to implement this without getting the warning if do not need group or partition. I just want row number.
|
6

I had a similar problem, but in my case @Ali Yesilli's solution failed, because I was reading multiple input files separately and ultimately unioning them all in a single dataframe. In this case, the order within the window ordered by a dummy variable proved to be unpredictable.

So to achieve more robust ordering, I used monotonically_increasing_id:

df = df.withColumn('original_order', monotonically_increasing_id())
df = df.withColumn('row_num', row_number().over(Window.orderBy('original_order')))
df = df.drop('original_order')

3 Comments

Wouldn't this approach cause: WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.?
Yes it will give that warning, but I don't think there's a way to avoid that if you want to do this. It makes sense to me that in order to robustly order things, you need to process them in the same partition.
I had this problem due to unions and it was driving me crazy! Works for me though, and I don't get any warning about "No Partition Defined". Thank you!
0
my_data_df.createOrReplaceTempView("my_data")
my_data_indexed_df = spark.sql("select row_number() over (order by (select null)) as row_num,* from my_data")

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.