68

I would like to include null values in an Apache Spark join. Spark doesn't include rows with null by default.

Here is the default Spark behavior.

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))

Here is the output of joinedDf.show():

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+

This is the output I would like:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+

8 Answers 8

119

Spark provides a special NULL safe equality operator:

numbersDf
  .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
  .drop(lettersDf("numbers"))
+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|   null|    zzz|
|       |    hhh|
+-------+-------+

Be careful not to use it with Spark 1.5 or earlier. Prior to Spark 1.6 it required a Cartesian product (SPARK-11111 - Fast null-safe join).

In Spark 2.3.0 or later you can use Column.eqNullSafe in PySpark:

numbers_df = sc.parallelize([
    ("123", ), ("456", ), (None, ), ("", )
]).toDF(["numbers"])

letters_df = sc.parallelize([
    ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
]).toDF(["numbers", "letters"])

numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
+-------+-------+-------+
|numbers|numbers|letters|
+-------+-------+-------+
|    456|    456|    def|
|   null|   null|    zzz|
|       |       |    hhh|
|    123|    123|    abc|
+-------+-------+-------+

and %<=>% in SparkR:

numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
letters_df <- createDataFrame(data.frame(
  numbers = c("123", "456", NA, ""),
  letters = c("abc", "def", "zzz", "hhh")
))

head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
  numbers numbers letters
1     456     456     def
2    <NA>    <NA>     zzz
3                     hhh
4     123     123     abc

With SQL (Spark 2.2.0+) you can use IS NOT DISTINCT FROM:

SELECT * FROM numbers JOIN letters 
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers

This is can be used with DataFrame API as well:

numbersDf.alias("numbers")
  .join(lettersDf.alias("letters"))
  .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks. This is another good answer that uses the <=> operator. If you're doing a multiple column join, the conditions can be chained with the && operator.
In my experience (Spark 2.2.1 on Amazon Glue), the SQL syntax is the same as the Scala: SELECT * FROM numbers JOIN letters ON numbers.numbers <=> letters.numbers
is there a way to use eqNullSafe if I am passing to join's on parameter a list of columns?
@zero323 I have a similar question, but I want to do it with Seq. Can you help link is- stackoverflow.com/questions/61128618/…
12
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
val letters2 = lettersDf.withColumnRenamed("numbers","num2")
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull &&  $"num2".isNull) ,"outer")
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show  //rename the columns back to the original names

Comments

7

Based on K L's idea, you could use foldLeft to generate join column expression:

def nullSafeJoin(rightDF: DataFrame, columns: Seq[String], joinType: String)(leftDF: DataFrame): DataFrame = 
{

  val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
  val fullExpr = columns.tail.foldLeft(colExpr) { 
    (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p) 
  }

  leftDF.join(rightDF, fullExpr, joinType)
}

then, you could call this function just like:

aDF.transform(nullSafejoin(bDF, columns, joinType))

1 Comment

this is cleaner: val fullExpr = columns.map(n => leftDF(n) <==> rightDF(n)).reduceLeft(_ && _)
4

Complementing the other answers, for PYSPARK < 2.3.0 you would not have Column.eqNullSafe neither IS NOT DISTINCT FROM.

You still can build the <=> operator with an sql expression to include it in the join, as long as you define alias for the join queries:

from pyspark.sql.types import StringType
import pyspark.sql.functions as F

numbers_df = spark.createDataFrame (["123","456",None,""], StringType()).toDF("numbers")
letters_df = spark.createDataFrame ([("123", "abc"),("456", "def"),(None, "zzz"),("", "hhh") ]).\
    toDF("numbers", "letters")

joined_df = numbers_df.alias("numbers").join(letters_df.alias("letters"),
                                             F.expr('numbers.numbers <=> letters.numbers')).\
    select('letters.*')
joined_df.show()

+-------+-------+
|numbers|letters|
+-------+-------+
|    456|    def|
|   null|    zzz|
|       |    hhh|
|    123|    abc|
+-------+-------+

Comments

1

This is a late answer but there is an elegant way to create eqNullSafe joins in PySpark:

from pyspark.sql.dataframe import DataFrame

def null_safe_join(self, other:DataFrame, cols:list, mode:str):
    """
    Function for null safe joins. In normal joins, null values will be disregarded.
    In a null safe join, null values will be treated as equals.
    """
    join_cond = [self[col].eqNullSafe(other[col]) for col in cols]
    return (
        self.join(other, join_cond, mode)
        .drop(*[other[col] for col in cols])
    )

DataFrame.null_safe_join = null_safe_join

This will add the null_safe_join method to the DataFrame class, allowing it to be used as a method on any DataFrame object like so:

joinedDf = numbersDf.null_safe_join(lettersDf, ["numbers"], "inner")

This performs an inner eqNullSafe join between numbersDf and lettersDf on the column numbers.

Comments

0

Since I struggled a little with this to get a generic function I present my solution based on the answers and comments here. This also drops the superfluous columns of the right DataFrame.

import org.apache.spark.sql.DataFrame

def failSafeEqJoin(
  left: DataFrame,
  right: DataFrame,
  columns: Seq[String],
  joinType: String = "inner"
): DataFrame = {
  val failSafeEq = columns.map(n => left(n) <=> right(n)).reduceLeft(_ && _)
  val joined     = left.join(right, failSafeEq, joinType)
  def dropRight(df: DataFrame): DataFrame = {
    columns.foldLeft(df) { case (acc, c) =>
      acc.drop(right(c))
    }
  }
  dropRight(joined)
}

see also: How to avoid duplicate columns after join

Comments

-1

Based on timothyzhang's idea one can further improve it by removing duplicate columns:

def dropDuplicateColumns(df: DataFrame, rightDf: DataFrame, cols: Seq[String]): DataFrame 
= cols.foldLeft(df)((df, c) => df.drop(rightDf(c)))
def joinTablesWithSafeNulls(rightDF: DataFrame, leftDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = 
{

val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)

val fullExpr = columns.tail.foldLeft(colExpr) {
  (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p)
}

val finalDF = leftDF.join(rightDF, fullExpr, joinType)

val filteredDF = dropDuplicateColumns(finalDF, rightDF, columns)

filteredDF

}

Comments

-2

Try the following method to include the null rows to the result of JOIN operator:

def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = {

    var columnsExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)

    columns.drop(1).foreach(column => {
        columnsExpr = columnsExpr && (leftDF(column) <=> rightDF(column))
    })

    var joinedDF: DataFrame = leftDF.join(rightDF, columnsExpr, joinType)

    columns.foreach(column => {
        joinedDF = joinedDF.drop(leftDF(column))
    })

    joinedDF
}

3 Comments

This method has a problem, it will drop leftDF columns at the end, which is wrong for right joins. I proposed an edit with a TODO, I think it will work as it is (I'm using it now). But just in case someone else copies it, he should verify that too.
The edit was rejected... god knows why, the following "code" should the fix it on the last foreach: columns.foreach(column => { if (joinType.contains("right")) { joinedDF = joinedDF.drop(leftDF(column)) } else { joinedDF = joinedDF.drop(rightDF(column)) } })
Very true -- or you could call and reverse the order... so left and right are switched.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.