num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")
for i in range(num_months):
df = sc.sql("""
SELECT *
FROM df_final mrd
INNER JOIN
(SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
FROM df_final
GROUP BY ColA) grouped_mrd
ON mrd.ColA = grouped_mrd.ColA_tmp
AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
""")
df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) \
.withColumn("ColB_lag2", df.ColB_lag1) \
.withColumn("ColB_lag1", df.ColB) \
.withColumn("ColB", someFunc())
df_final = df_final.union(df_tmp)
df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')
Solution: The issue was with the union. Since I was dropping the columns and recalculating them, spark adds those columns to the end and the 'Union' does a union by column position and not name. This was what creating an issues in the consequent loops as the data shifted by a few columns for the new rows. The solution was to literally select all the columns and re-order them before doing the union. The snippet above is simplified where I can do it without dropping ColB_lag2. The actual code has another step in between where I refresh some values from another dataframe join and those columns need to be dropped before bringing in from the new dataframe.