2

I'm trying to perform dataframe union of thousands of dataframes in a Python list. I'm using two approaches I found. The first one is by means of for loop union and the second one is using functools.reduce. Both them work well for toy examples, however for thousands of dataframes I'm experimenting a severe overhead, probably caused by code out of the JVM, sequentialy appending each dataframe at a time (using both merging approaches).

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

# The reduce approach
def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)

df_list = [td2, td3, td4, td5, td6, td7, td8, td9, td10]
df = unionAll(df_list)

#The loop approach
df = df_list[0].union(df_list[1])
for d in df_list[2:]:
    df = df.union(d)

The question is how to perform this multiple dataframe operation efficiently, probably circunventing the overhead caused by merging dataframes one-by-one.

Thank you very much

3
  • OP, please let me know how useful pairwise_reduction is for your actual dataframes. Commented Jan 17, 2019 at 18:35
  • Thank you very much @coldspeed your suggestion resulted much faster, however the memory oveehead persists.. for large number of dfs the memory crashes. can you help me with that? Commented Jan 19, 2019 at 5:39
  • I would suggest setting a checkpoint dir. See jaceklaskowski.gitbooks.io/mastering-apache-spark/… Commented Jan 19, 2019 at 5:54

1 Answer 1

8

You are currently joining your DataFrames like this:

(((td1 + td2) + td3) + td4)

At each stage, you are concatenating a huge dataframe with a small dataframe, resulting in a copy at each step and a lot of wasted memory. I would suggest combining them like this:

(td1 + td2) + (td3 + td4)

The idea is to iteratively coalesce pairs of roughly the same size until you are left with a single result. Here is a prototype:

def pairwise_reduce(op, x):
    while len(x) > 1:
        v = [op(i, j) for i, j in zip(x[::2], x[1::2])]
        if len(x) > 1 and len(x) % 2 == 1:
            v[-1] = op(v[-1], x[-1])
        x = v
    return x[0]

result = pairwise_reduce(DataFrame.unionAll, df_list)

You will see how this makes a huge difference for python lists.

from functools import reduce 
from operator import add

x = [[1, 2, 3], [4, 5, 6], [7, 8], [9, 10, 11, 12]] * 1000

%timeit sum(x, [])
%timeit reduce(add, x)
%timeit pairwise_reduce(add, x)

64.2 ms ± 606 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
66.3 ms ± 679 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
970 µs ± 9.02 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

sum(x, []) == reduce(add, x) == pairwise_reduce(add, x)
# True
Sign up to request clarification or add additional context in comments.

1 Comment

is there a df.union syntax version of this that cab work with methods?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.