1

I have the following dataframe:

id, test, date
1, A, 01/20/2020
1, B, 01/25/2020
2, A, 02/20/2020
2, B, 02/25/2020
2, C, 02/25/2020

Since the number of distinct tests is 3(A,B,C) I want a row for test C to be inserted for id 1 with date as "NA"

The resulting dataframe should be:

id, test, date
1, A, 01/20/2020
1, B, 01/25/2020
1, C, NA
2, A, 02/20/2020
2, B, 02/25/2020
2, C, 02/25/2020

2 Answers 2

2

Example data

data = [
    ('1', 'A', '01/20/2020'),
    ('1', 'B', '01/25/2020'),
    ('2', 'A', '02/20/2020'),
    ('2', 'B', '02/25/2020'),
    ('2', 'C', '02/25/2020'),
]
df = spark.createDataFrame(data, ['id', 'test', 'date'])

Generate a cross table first

# Solution
uniq_ids = df.select('id').distinct().coalesce(1)
uniq_tests = df.select('test').distinct().coalesce(1)
skeleton = (
    uniq_ids.
        crossJoin(
            uniq_tests
        )
)
+---+----+
| id|test|
+---+----+
|  1|   B|
|  2|   B|
|  1|   C|
|  2|   C|
|  1|   A|
|  2|   A|
+---+----+

Then left-join to it

(
    skeleton.
        join(
            df,
            ['id', 'test'],
            'left'
        ).
        orderBy('id', 'test', 'date').
        show(truncate=False)
)
+---+----+----------+                                                           
|id |test|date      |
+---+----+----------+
|1  |A   |01/20/2020|
|1  |B   |01/25/2020|
|1  |C   |null      |
|2  |A   |02/20/2020|
|2  |B   |02/25/2020|
|2  |C   |02/25/2020|
+---+----+----------+

A recommendation with real data

  • If you have lots of unique ids and unique tests, you may want to change how many partitions you coalesce(N).
Sign up to request clarification or add additional context in comments.

Comments

1

For spark2.4+,a highly scalable way (without any join) would be to use groupBy, collect_list and array function /higher order functions to determine missing links, add those links and then explode.. It will work for any order of missing A,B or C.

#sampledataframe
#df.show()
#+---+----+----------+
#| id|test|      date|
#+---+----+----------+
#|  1|   A|01/20/2020|
#|  2|   A|02/20/2020|
#|  2|   B|02/25/2020|
#|  2|   C|02/25/2020|
#+---+----+----------+


from pyspark.sql import functions as F
df.groupBy("id").agg(F.collect_list("test").alias("x"),F.collect_list("date").alias("col2"))\
                   .withColumn("zip", F.arrays_zip(F.col("x"),F.col("col2")))\
                   .withColumn("except", F.array_except(F.array(*(F.lit(x) for x in ['A','B','C'])),"x")).drop("x","col2")\
                   .withColumn("except", F.expr("""transform(except,x-> struct(x,'NA'))"""))\
                   .withColumn("zipped", F.explode(F.array_union("zip","except")))\
                   .select("id",F.col("zipped.x").alias("test"),F.col("zipped.col2").alias("date"))\
                   .show(truncate=False)

#+---+----+----------+
#|id |test|date      |
#+---+----+----------+
#|1  |A   |01/20/2020|
#|1  |B   |01/25/2020|
#|1  |C   |NA        |
#|2  |A   |02/20/2020|
#|2  |B   |02/25/2020|
#|2  |C   |02/25/2020|
#+---+----+----------+

Physical Plan for Join Solution:(as shown by @cPak)

== Physical Plan ==
Sort [id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST, date#1339 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST, date#1339 ASC NULLS FIRST, 200), [id=#1110]
   +- *(6) Project [id#1206L, test#1207, date#1339]
      +- SortMergeJoin [id#1206L, test#1207], [id#1337L, test#1338], LeftOuter
         :- Sort [id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#1206L, test#1207, 200), [id=#1101]
         :     +- CartesianProduct
         :        :- Coalesce 1
         :        :  +- *(2) HashAggregate(keys=[id#1206L], functions=[])
         :        :     +- Exchange hashpartitioning(id#1206L, 200), [id=#1089]
         :        :        +- *(1) HashAggregate(keys=[id#1206L], functions=[])
         :        :           +- *(1) Project [id#1206L]
         :        :              +- *(1) Scan ExistingRDD[id#1206L,test#1207,date#1208]
         :        +- Coalesce 1
         :           +- *(4) HashAggregate(keys=[test#1207], functions=[])
         :              +- Exchange hashpartitioning(test#1207, 200), [id=#1095]
         :                 +- *(3) HashAggregate(keys=[test#1207], functions=[])
         :                    +- *(3) Project [test#1207]
         :                       +- *(3) Scan ExistingRDD[id#1206L,test#1207,date#1208]
         +- Sort [id#1337L ASC NULLS FIRST, test#1338 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1337L, test#1338, 200), [id=#1104]
               +- *(5) Filter (isnotnull(id#1337L) && isnotnull(test#1338))
                  +- *(5) Scan ExistingRDD[id#1337L,test#1338,date#1339]

Physical Plan for Non-Join solution(using array functions+explode)

== Physical Plan ==
*(2) Project [id#1206L, zipped#1400.x AS test#1405, zipped#1400.col2 AS date#1406]
+- *(2) Generate explode(array_union(zip#1380, except#1394)), [id#1206L], false, [zipped#1400]
   +- *(2) Project [id#1206L, arrays_zip(x#1374, col2#1376) AS zip#1380, transform(array_except([A,B,C], x#1374), lambdafunction(named_struct(x, lambda x#1395, col2, NA), lambda x#1395, false)) AS except#1394]
      +- ObjectHashAggregate(keys=[id#1206L], functions=[collect_list(test#1207, 0, 0), collect_list(date#1208, 0, 0)])
         +- Exchange hashpartitioning(id#1206L, 200), [id=#1192]
            +- *(1) Project [id#1206L, date#1208, test#1207]
               +- *(1) Scan ExistingRDD[id#1206L,test#1207,date#1208]

As we can see the join solution produces many data shuffle exchanges, and uses cartesian product(highly inefficent for bigdata). The data movement will be extremely high and highly taxing on any cluster. As compared to array function/explode solution, the data movement will be much lower and processing much faster.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.