If you have a small number of deadline dates, you can:
- add one column by deadline date on
dates_df dataframe, with value is 1 when DT_DATE is between ref_date and deadline date and 0 otherwise
- then sum each deadline date columns
- finally transpose result dataframe to obtain your desired dataframe
Let's see step by step
Add one column by deadline date:
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
And you get, with your example, the following dates_with_deadlines dataframe:
+----------+----------+----------+----------+----------+----------+----------+----------+
|DT_DATE |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+----------+
|2021-04-02|1 |0 |1 |1 |1 |1 |1 |
|2021-04-21|1 |0 |1 |1 |1 |1 |1 |
|2021-05-01|1 |0 |1 |1 |1 |1 |1 |
|2021-06-03|1 |0 |1 |1 |1 |1 |1 |
|2021-09-07|1 |0 |1 |0 |1 |1 |1 |
|2021-10-12|1 |0 |1 |0 |1 |0 |1 |
|2021-11-02|1 |0 |1 |0 |1 |0 |1 |
+----------+----------+----------+----------+----------+----------+----------+----------+
Sum deadlines
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
After this step, you get the following aggregated_df dataframe:
+----------+----------+----------+----------+----------+----------+----------+
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+
|7 |0 |7 |4 |7 |5 |7 |
+----------+----------+----------+----------+----------+----------+----------+
Transpose dataframe
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')
And you have your expected result_df dataframe:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2023-07-15|7 |
|2018-08-10|0 |
|2022-03-28|7 |
|2021-06-22|4 |
|2021-12-18|7 |
|2021-10-11|5 |
|2021-11-13|7 |
+----------+--------+
Complete Code
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')
Advantages and limits of this solution
With this solution, the only step that cannot be done using a distributed system is the transpose step.
Moreover, instead of your current solution, we perform all aggregation for each deadline column in parallele, and not sequentially.
However, this solutions works only if there are few deadline dates (hundreds, maybe thousands deadline dates), first because we retrieve all those deadline dates in the Spark driver with .collect(), second because in first step we create one column per deadline date, creating rows with lot of data, and finally because the last step is also executed on only one executor.
deadlines_dfanddates_dfdataframes ?