0

I have a LARGE, sorted PySpark dataframe "df", that I need to iterate through and do the following for each row (by index):

If "row['col1'] == nextrow['col1']:
    If nextrow['col3'] == 1:
        thisrow['col4'] == 1

For example given:

# +---+----+----+----+
# |id |col1|col3|col4|
# +---+----+----+----+
# |1  |33  |1   |0   |
# |2  |33  |0   |0   |
# |3  |33  |0   |0   |
# |4  |11  |1   |0   |
# |5  |11  |1   |0   |
# |6  |22  |0   |0   |
# |7  |22  |1   |0   |
# +---+----+----+----+

Would generate:

# +---+----+----+----+
# |id |col1|col3|col4|
# +---+----+----+----+
# |1  |33  |1   |0   |
# |2  |33  |0   |0   |
# |3  |33  |0   |0   |
# |4  |11  |1   |1   |
# |5  |11  |1   |0   |
# |6  |22  |0   |1   |
# |7  |22  |1   |0   |
# +---+----+----+----+

I know Spark dataframes are immutable. What is the best way to do this? I've thought about converting it to an RDD and creating a function for a map+lambda combo, but I do not know how to determine which row I am on without adding an index column.

1 Answer 1

1

have a try of this method:

d1 = [
    (1, 33, 1, 9),
    (2, 33, 1, 9),
    (3, 22, 1, 9),
    (4, 11, 1, 9),
    (5, 11, 1, 9),
    (6, 22, 1, 9),
    (7, 33, 1, 9),
]

df1 = spark.createDataFrame(d1, ['id', 'col1', 'col3', 'col4'])
df1.show(10, False)
# +---+----+----+----+
# |id |col1|col3|col4|
# +---+----+----+----+
# |1  |33  |1   |9   |
# |2  |33  |1   |9   |
# |3  |22  |1   |9   |
# |4  |11  |1   |9   |
# |5  |11  |1   |9   |
# |6  |22  |1   |9   |
# |7  |33  |1   |9   |
# +---+----+----+----+
df1\
    .withColumn('next_col1', lead(col('col1')).over(Window.partitionBy().orderBy('id')))\
    .withColumn('next_col3', lead(col('col3')).over(Window.partitionBy().orderBy('id')))\
    .withColumn('new_col4', when((col('col1')==col('next_col1')) & (col('next_col3')==1), lit(1)).otherwise(col('col4')))\
.show(10, False)
# +---+----+----+----+---------+---------+--------+
# |id |col1|col3|col4|next_col1|next_col3|new_col4|
# +---+----+----+----+---------+---------+--------+
# |1  |33  |1   |9   |33       |1        |1       |
# |2  |33  |1   |9   |22       |1        |9       |
# |3  |22  |1   |9   |11       |1        |9       |
# |4  |11  |1   |9   |11       |1        |1       |
# |5  |11  |1   |9   |22       |1        |9       |
# |6  |22  |1   |9   |33       |1        |9       |
# |7  |33  |1   |9   |null     |null     |9       |
# +---+----+----+----+---------+---------+--------+
Sign up to request clarification or add additional context in comments.

1 Comment

Linus, I was able to get this work just as expected with smaller data sets, but the last line is really causing our cluster issues with a large dataset. I've tried to get it to execute over the course of the last couple of days, and even broke out the previous two lines. The latest job has been running for 12 hours as of this morning, which is longer than our table join took. Any ideas to speed it up?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.