0

I have a fixed Spark DataFrame order from the target table:

Target Spark Dataframe(col1 string , col2 int , col3 string , col4 double)

Now, if the source data comes in a jumbled order:

Source Spark Dataframe(col3 string , col2 int ,col4 double , col1 string).

How can I rearrange the source DataFrame to match the column order of the target DataFrame using PySpark?

The Source Spark Dataframe should be reordered like below to match the target DataFrame:

Output:

Updated Source Spark Dataframe(col1 string , col2 int , col3 string , col4 double)

Scenario 2:

Source Dataframe =[a,c,d,e]

Target dataframe =[a,b,c,d]

In this scenario, the source DataFrame should be rearranged to [a,b,c,d,e]

  • Keep the order of the target columns
  • Change the datatypes of the source column to match the target dataframe
  • Add the new columns at the end
  • If the target column is not present in the source columns, then the column should still be added but filled with null values.

In the above example, after the source DataFrame is rearranged, it would have a b column added with null values.

This will ensure that when we use saveAsTable, the source DataFrame can easily be pushed into the table without breaking the existing table.

0

1 Answer 1

1

Suppose you had the following two DataFrames:

source.show()
#+---+---+---+---+
#|  a|  c|  d|  e|
#+---+---+---+---+
#|  A|  C|  0|  E|
#+---+---+---+---+

target.show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  A|  B|  C|  1|
#+---+---+---+---+

With the following data types:

print(source.dtypes)
#[('a', 'string'), ('c', 'string'), ('d', 'string'), ('e', 'string')]

print(target.dtypes)
#[('a', 'string'), ('b', 'string'), ('c', 'string'), ('d', 'int')]

If I understand your logic correctly, the following list comprehension should work for you:

from pyspark.sql.functions import col, lit

new_source = source.select(
    *(
        [
            col(t).cast(d) if t in source.columns else lit(None).alias(t) 
            for t, d in target.dtypes
        ] +
        [s for s in source.columns if s not in target.columns]
    )
)

new_source.show()

new_source.show()
#+---+----+---+---+---+
#|  a|   b|  c|  d|  e|
#+---+----+---+---+---+
#|  A|null|  C|  0|  E|
#+---+----+---+---+---+

And the resulting output will have the following schema:

new_source.printSchema()
#root
# |-- a: string (nullable = true)
# |-- b: null (nullable = true)
# |-- c: string (nullable = true)
# |-- d: integer (nullable = true)
# |-- e: string (nullable = true)

As you can see, column d's datatype changed from string to integer to match the target table's schema.

The logic is to first loop over the columns in target and select them if they exist in source.columns or create a column of nulls if it doesn't exist. Then add in the columns from source that don't exist in target.

Sign up to request clarification or add additional context in comments.

6 Comments

..is there a way to cast the newly created column to match the schema of the target dataframe .Else , union may lead to issues of the source and the target dataframes?I can create a new question if required .Please suggest
I Source Schema StructType(List(StructField(name,StringType,true),StructField(field1,LongType,true),StructField(seq_num,IntegerType,true))) Target Schema StructType(List(StructField(name,StringType,true),StructField(age,StringType,true),StructField(field1,StringType,true),StructField(hiveserde,StringType,true),StructField(seq_num,IntegerType,true))) new Source Schema StructType(List(StructField(name,StringType,true),StructField(age,NullType,true),StructField(field1,StringType,true),StructField(hiveserde,NullType,true),StructField(seq_num,IntegerType,true)))
If you see ...the new source Schema is assigning NullType to age even though teh target is String.That should not be the case
that would need to match , so that the source and the targets can be used to union
Probably best if you edited this into the question, formatted as code. Or create a new question. Make sure you include a minimal reproducible example, ideally something small we can cut and paste to recreate your issue.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.