df_hrrchy
|lefId  |Lineage                               |
|-------|--------------------------------------|
|36326  |["36326","36465","36976","36091","82"]|
|36121  |["36121","36908","36976","36091","82"]|
|36380  |["36380","36465","36976","36091","82"]|
|36448  |["36448","36465","36976","36091","82"]|
|36683  |["36683","36465","36976","36091","82"]|
|36949  |["36949","36908","36976","36091","82"]|
|37349  |["37349","36908","36976","36091","82"]|
|37026  |["37026","36908","36976","36091","82"]|
|36879  |["36879","36465","36976","36091","82"]|
df_trans
|tranID     |   T_Id                                                                  |
|-----------|-------------------------------------------------------------------------|
|1000540    |["36121","36326","37349","36949","36380","37026","36448","36683","36879"]|
df_creds
|T_Id   |T_val  |T_Goal |Parent_T_Id    |Parent_Val      |parent_Goal|
|-------|-------|-------|---------------|----------------|-----------|
|36448  |100    |1      |36465          |200             |1          |
|36465  |200    |1      |36976          |300             |2          |
|36326  |90     |1      |36465          |200             |1          |
|36091  |500    |19     |82             |600             |4          |
|36121  |90     |1      |36908          |200             |1          |
|36683  |90     |1      |36465          |200             |1          |
|36908  |200    |1      |36976          |300             |2          |
|36949  |90     |1      |36908          |200             |1          |
|36976  |300    |2      |36091          |500             |19         |
|37026  |90     |1      |36908          |200             |1          |
|37349  |100    |1      |36908          |200             |1          |
|36879  |90     |1      |36465          |200             |1          |
|36380  |90     |1      |36465          |200             |1          |
Desired Result
| T_id | children | T_Val | T_Goal | parent_T_id | parent_Goal | trans_id | 
|---|---|---|---|---|---|---|
| 36091 | ["36976"] | 500 | 19 | 82 | 4 | 1000540 | 
| 36465 | ["36448","36326","36683","36879","36380"] | 200 | 1 | 36976 | 2 | 1000540 | 
| 36908 | ["36121","36949","37026","37349"] | 200 | 1 | 36976 | 2 | 1000540 | 
| 36976 | ["36465","36908"] | 300 | 2 | 36091 | 19 | 1000540 | 
| 36683 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 37026 | null | 90 | 1 | 36908 | 1 | 1000540 | 
| 36448 | null | 100 | 1 | 36465 | 1 | 1000540 | 
| 36949 | null | 90 | 1 | 36908 | 1 | 1000540 | 
| 36326 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 36380 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 36879 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 36121 | null | 90 | 1 | 36908 | 1 | 1000540 | 
| 37349 | null | 100 | 1 | 36908 | 1 | 1000540 | 
Code Tried
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, collect_set, expr, col, collect_list,array_contains, lit
from functools import reduce
for row in df_transactions.rdd.toLocalIterator():
# def find_nodemap(row):
  dfs = [] 
  df_hy_set = (df_hrrchy.filter(df_hrrchy. lefId.isin(row["T_ds"]))
                      .select(explode("Lineage").alias("Terrs"))
                      .agg(collect_set(col("Terrs")).alias("hierarchy_list"))
                      .select(F.lit(row["trans_id"]).alias("trans_id "),"hierarchy_list")
                     )
  
  df_childrens = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
        .select("T_id", "T_Val","T_Goal","parent_T_id", "parent_Goal", "trans _id" )
        .groupBy("parent_T_id").agg(collect_list("T_id").alias("children"))
       )
  df_filter_creds = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
        .select ("T_id", "T_val","T_Goal","parent_T_id", "parent_Goal”, "trans_id")
       )
  df_nodemap = (df_filter_ creds.alias("A").join(df_childrens.alias("B"), col("A.T_id") == col("B.parent_T_id"), "left")
        .select("A.T_id","B.children", "A.T_val","A.terr_Goal","A.parent_T_id", "A.parent_Goal", "A.trans_ id")
       )
  display(df_nodemap)
#   dfs.append(df_nodemap)
  
# df = reduce(DataFrame.union, dfs)
# display(df)
# # display(df)
My problem - Its a bad design. df_trans is having millions of data and looping through dataframe , its taking forever. Without looping can I do it. I tried couple of other methods, not able to get the desired result.
