1

I have a few dataframes like this:

 rdd_1 = sc.parallelize([(0,10,"A",2), (1,20,"B",1), (2,30,"A",2)])
 rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,3213,"201602"),(1,20,3003,"201601"), (1,20,9872,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, 2321,"201601")])
 df_tg = sqlContext.createDataFrame(rdd_1, ["id", "type", "route_a", "route_b"])
 df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])

 df_tg.show()


+---+----+-------+-------+
| id|type|route_a|route_b|
+---+----+-------+-------+
|  0|10  |      A|      2|
|  1|20  |      B|      1|
|  2|30  |      A|      2|
+---+----+-------+-------+

df_data.show()   

+---+----+----+------+
| id|type|cost|  date|
+---+----+----+------+
|  0|10  | 223|201603|
|  0|10  | 83 |201602|
|  1|20  |3003|201601|
|  1|20  |3213|201602|
|  1|20  |9872|201603|
|  2|30  |  10|201602|
|  2|30  |  62|201601|
|  2|40  |2321|201601|
+---+----+----+------+

So I need to add the columns like this:

+---+----+-------+-------+-----------+-----------+-----------+
| id|type|route_a|route_b|cost_201603|cost_201602|cost_201601|
+---+----+-------+-------+-----------+-----------+-----------+
|  0|10  |      A|      2|       223 |   83      |       None|
|  1|20  |      B|      1|      9872 |     3213  |       3003|
|  2|30  |      A|      2|      None |   10      |         62|
+---+----+-------+-------+-----------+-----------+-----------+

For this I would have to do a few joins:

df_tg = df_tg.join(df_data[df_data.date == "201603"], ["id", "type"])

and with that I would have to rename the columns too, to not overwrite them:

df_tg = df_tg.join(df_data[df_data.date == "201603"], ["id", "type"]).withColumnRenamed("cost","cost_201603")

I can write a function to do this, but I would have to loop through both the dates available and the columns, generating ton of joins with full table scans:

def feature_add(df_target, df_feat, feat_cols, period):
    for ref_month in period:
        df_target = df_target.join(df_feat, ["id", "type"]).select(
                *[df_target[column] for column in df_target.columns] + [df_feat[feat_col]]
                ).withColumnRenamed(feat_col, feat_col + '_' + ref_month)
    return df_target

df_tg = feature_add(df_tg, df_data, ["cost"], ["201602", "201603", "201601"])

This works, but it's terrible. How can I add these columns, including when I'm calling the same function for other dataframes? Notice that the columns are not perfectly aligned and I need to do a inner join.

1 Answer 1

3

I would suggest to use pivot functions as following :

from pyspark.sql.functions import *

rdd_1 = sc.parallelize([(0,10,"A",2), (1,20,"B",1), (2,30,"A",2)])
rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,3213,"201602"),(1,20,3003,"201601"), (1,20,9872,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, 2321,"201601")])
df_tg = sqlContext.createDataFrame(rdd_1, ["id", "type", "route_a", "route_b"])
df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])

pivot_df_data = df_data.groupBy("id","type").pivot("date").agg({"cost" : "sum"})

pivot_df_data.join(df_tg, ['id','type'], 'inner').select('id','type','route_a','route_b','201601','201602','201603','2016032').show()

# +---+----+-------+-------+------+------+------+-------+
# | id|type|route_a|route_b|201601|201602|201603|2016032|
# +---+----+-------+-------+------+------+------+-------+
# |  0|  10|      A|      2|   223|  null|  null|     83|
# |  1|  20|      B|      1|  3003|  3213|  9872|   null|
# |  2|  30|      A|      2|  null|    10|  null|   null|
# +---+----+-------+-------+------+------+------+-------+
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.