I am trying to use a for loop to add new rows to a dataframe. So the input is:
ColA  ColNum  ColB  ColB_lag1  ColB_lag2
Xyz     25    123      234        345
Abc     40    456      567        678
And the output I want is this:
ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    26    789      123       234
 Abc    40    456      567       678
 Abc    41    890      456       567
So, the code I have is this:
df = df.withColumn("ColNum", (df.ColNum + 1).cast(IntegerType())) \
       .withColumn("ColB_lag2", df.ColB_lag1)
       .withColumn("ColB_lag1", df.ColB)
       .withColumn("ColB", someFunc())
The code works fine when I have to add only one row, but breaks when I have to add multiple rows in a loop. So I used a For loop to accomplish it. I filter for the latest row at the beginning of a loop then run the logic above to calculate the values for the columns. Then append the new row to the dataset which is again used at the top of the loop. The output ends up looking something like this:
ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    25    789      123
 Xyz    26    789      123
 Abc    40    456      567       678
 Abc    40    890      456
 Abc    41    890      456
The question is: Do 'For' loops in PySpark break down due to parallelization or am I chaining too many functions in the for loop(or the order of functions in the loop) that is causing this erratic behavior?
Happy to share more details if I have missed out on any key point here.
Edit 1: The For loop is as below:
num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")
for i in range(num_months):
    df = sc.sql("""
        SELECT *
        FROM df_final mrd
        INNER JOIN
            (SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
            FROM df_final
            GROUP BY ColA) grouped_mrd
        ON mrd.ColA = grouped_mrd.ColA_tmp
        AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
        """)
    df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
    df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) \
               .withColumn("ColB_lag2", df.ColB_lag1) \
               .withColumn("ColB_lag1", df.ColB) \
               .withColumn("ColB", someFunc())
    df_final = df_final.union(df_tmp)
df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')
Solution: The issue was with the union. Since I was dropping the columns and recalculating them, spark adds those columns to the end and the 'Union' does a union by column position and not name. This was what creating an issues in the consequent loops as the data shifted by a few columns for the new rows. The solution was to literally select all the columns and re-order them before doing the union. The snippet above is simplified where I can do it without dropping ColB_lag2. The actual code has another step in between where I refresh some values from another dataframe join and those columns need to be dropped before bringing in from the new dataframe.

for loopcode? Also, there's no code adding rows in this post.