1

I have two dataframes, say dfA and dfB.
I want to take their intersection and then count the number of unique user_ids in that intersection.

I've tried the following which is very slow and it crashes a lot:

dfA.join(broadcast(dfB), ['user_id'], how='inner').select('user_id').dropDuplicates().count()

I need to run many such lines, in order to get a plot.

How can I perform such query in an efficient way?

6
  • Remark: Doing the dropDuplicates() before the join might already reduce the workload, since then no user_id has to be mapped to multiple records in the second dataframe. So dropDuplicates for dfA and dfB before the join can help. Commented Dec 17, 2018 at 12:37
  • But can I tell dropDuplicates to act only on the user_id column ? Since I may have one user_id multiple times, but other columns are not similar. Commented Dec 17, 2018 at 12:38
  • I think you can give the column name as an argument so dropDuplicates("user_id") Commented Dec 17, 2018 at 12:39
  • You can also select the required columns before the join. If it is parquet, I think only the required columns are loaded, which could also speed up your code Commented Dec 17, 2018 at 12:41
  • @gaw py4j.Py4JException: Method toSeq([class java.lang.String]) does not exist , you cannot pass a column name to dropDuplicates. I am trying a select before it. TESTED: Works perfectly for now. Want to turn that into an answer, so I can accept it? Commented Dec 17, 2018 at 12:45

2 Answers 2

3

As described in the question, the only relevant part of the dataframe is the column user_id (in your question you describe that you join on user_id and afterwards uses only the user_id field)

The source of the performance problem is joining two big dataframes when you need only the distinct values of one column in each dataframe.

In order to improve the performance I'd do the following:

  1. Create two small DFs which will holds only the user_id column of each dataframe
    This will reduce dramatically the size of each dataframe as it will hold only one column (the only relevant column)

    dfAuserid = dfA.select("user_id")
    dfBuserid = dfB.select("user_id")
    
  2. Get the distinct (Note: it is equivalent to dropDuplicate() values of each dataframe
    This will reduce dramatically the size of each dataframe as each new dataframe will hold only the distinct values of column user_id.

    dfAuseridDist = dfA.select("user_id").distinct()
    dfBuseridDist = dfB.select("user_id").distinct()
    
  3. Perform the join on the above two minimalist dataframes in order to get the unique values in the intersection

Sign up to request clarification or add additional context in comments.

Comments

1

I think you can either select the necessary columns before and perform the join afterwards. It should also be beneficial to move the dropDuplicates before the join as well, since then you get rid of user_ids that appear multiple times in one of the dataframes.

The resulting query could look like:

dfA.select("user_id").join(broadcast(dfB.select("user_id")), ['user_id'], how='inner')\
    .select('user_id').dropDuplicates().count()

OR:

dfA.select("user_id").dropDuplicates(["user_id",]).join(broadcast(dfB.select("user_id")\
    .dropDuplicates(["user_id",])), ['user_id'], how='inner').select('user_id').count()

OR the version with distinct should work as well.

dfA.select("user_id").distinct().join(broadcast(dfB.select("user_id").distinct()),\
    ['user_id'], how='inner').select('user_id').count()

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.