7

I am running into the problem when executing below codes:

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")

When I run above code, df3 is an empty DataFrame. However: If I change the code to below, it is giving the correct result (DataFrame of 2 rows):

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")

So My question is: why do I have to create a temp dataframe here? Also, if I can't get the HiveContext in my part of the project, how can I make a duplicate dataframe on top of the existing dataframe?

5
  • What is the desired output? Commented Apr 24, 2018 at 8:56
  • 1
    What version of spark? Commented Apr 24, 2018 at 13:17
  • I guess you should use "alias()". Commented Apr 24, 2018 at 14:06
  • Bounty will be awarded to a person who will provide reference to a related JIRA ticket. Commented Apr 27, 2018 at 20:02
  • This is Spark 1.6 I am using Commented May 9, 2018 at 1:05

2 Answers 2

5
+50

I believe that the problem you've hit here is an instance of a more general issue where certain types of DataFrame self-joins (including joins of a DataFrame against filtered copies of itself) can result in the generation of ambiguous or incorrect query plans.

There are several Spark JIRAs related to this; here are some notable ones:

There are other JIRA tickets dealing with different manifestations / aspects of these problems. Those tickets are discoverable by following chains of JIRA "relates to" links starting from the tickets listed above.

This ambiguity only crops up when referencing columns via the DataFrame instance (via subscripting, as in df["mycol"], or via field accesses, as in df.mycol). This ambiguity can be avoided by aliasing DataFrames and referring to columns via the aliases. For example, the following works correctly:

>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
|  4|  3|  b|  3|  2|  a|
|  3|  2|  a|  2|  1|  a|
+---+---+---+---+---+---+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for the answer. I suspected this might be the case, but I still find it worrying. I can accept that same column join can become trivially true (even if it is not what we'd expect), but with different expressions, it is disturbing. But an answer from the committer is more than I expected, so once again thank you, and I'll award the bounty later today.
1

I see the same behavior with this data set in Spark 2.0, but not always for the same operation. A slightly different data frame works fine.

df1 = spark.createDataFrame(
    [(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
    )
df1.show()

+---+---+---+
|id1|id2|id3|
+---+---+---+
|  1|  2|  a|
|  2|  2|  a|
|  3|  4|  b|
+---+---+---+

df2 = df1.filter(df1.id3 == 'a')
df2.show()

+---+---+---+
|id1|id2|id3|
+---+---+---+
|  1|  2|  a|
|  2|  2|  a|
+---+---+---+


df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()

+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
|  2|  2|  a|  1|  2|  a|
|  2|  2|  a|  2|  2|  a|
+---+---+---+---+---+---+

There must be a bug? I have not tried later versions of spark though. You may want to report this as a bug.

1 Comment

I guess you should use "alias()".

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.