0

I have been wracking my brain and I am trying to do the following. Essentially I have two dataframes, one from yesterday and one from today, where today is just a delta (ie new rows only). I want to merge these two together so that the new rows are updated and the old are carried forward.

sc = SparkContext.getOrCreate()

old = sc.parallelize([
    {"id": 1, "value": 10, "date": "yesterday"},
    {"id": 2, "value": 20, "date": "yesterday", "foo": "bar"},
    {"id": 3, "value": 30, "date": "yesterday"}
]).toDF()

new = sc.parallelize([
    {"id": 2, "value": 25, "date": "today"},
    {"id": 4, "value": 45, "date": "today"}
]).toDF()

expected = sc.parallelize([
    {"id": 1, "value": 10, "date": "yesterday"},
    {"id": 2, "value": 25, "date": "today"},
    {"id": 3, "value": 30, "date": "yesterday"},
    {"id": 4, "value": 45, "date": "today"},
]).toDF()

# something to merge old and new ...?

In pure python, I would just use:

old = {"a": 10, "b": 20, "c": 30 }
new = {"b": 25, "d": 45}
expected = {"a": 10, "b": 25, "c": 30, "d": 45 }
calculated = {**old, **new}

What is the 'correct' way to do this? Maybe by joining/coalescing at the same time?

Edit: As pointed out this question is a dupe of the below link. However, that example shows a very manually coded query against very specific column names.

I have a need to reuse this code in approximately 5 dataframes each of which have 20+ columns and I don't want to be hardcoding the merge step against the column names if I don't have to; the schema is still shifting.

Is there really no join/coalesce function in pyspark/spark? I have a working solution with left_anti and union but that smells wrong for some reason.

8
  • Can you join them? something = old.join(new) Commented Oct 30, 2018 at 20:49
  • I can't. Joining will just add the columns. Is there a way to join and coalesce at the same time for all shared column names? Commented Oct 30, 2018 at 20:50
  • You want matching columns from right table? Rest retaining from left table? Commented Oct 30, 2018 at 20:53
  • Yes, the columns are the same in both tables. Commented Oct 30, 2018 at 20:54
  • Possible duplicate of How to update a pyspark dataframe with new values from another dataframe? Commented Oct 30, 2018 at 21:30

1 Answer 1

2

I think the simplest way is just to use union and groupby and first function.

old df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  1|   10|yesterday|
|  2|   20|yesterday|
|  3|   30|yesterday|
+---+-----+---------+

new df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  2|   25|    today|
|  4|   45|    today|
+---+-----+---------+

the code below union two dataframes:

import pyspark.sql.functions as f

unionDF = old.union(new).sort("date")
unionDF.show()

union df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  2|   25|    today|
|  4|   45|    today|
|  1|   10|yesterday|
|  2|   20|yesterday|
|  3|   30|yesterday|
+---+-----+---------+

and in the final step, groupby and first function:

firstCols = [f.first(col).alias(str(col)) for col in unionDF.columns[1:]]
finalDF = unionDF.groupby("id").agg(*firstCols).sort("id")

finalDF.show()

final df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  1|   10|yesterday|
|  2|   25|    today|
|  3|   30|yesterday|
|  4|   45|    today|
+---+-----+---------+
Sign up to request clarification or add additional context in comments.

2 Comments

please check out my answer and tell me if it works for you.
I think this is the way we will go for this case. Thanks!

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.