Skip to main content
Notice removed Draw attention by Alexander Pivovarov
Bounty Ended with ayplam's answer chosen by Alexander Pivovarov
edited title
Link

Is it possible to generate Generating large DataFrame in a distributed way in pyspark efficiently (without pyspark.sql.Row)

Notice added Draw attention by Alexander Pivovarov
Bounty Started worth 50 reputation by Alexander Pivovarov
added 1761 characters in body
Source Link
def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
dfmy_schema = spark.createDataFrame(row_rdd, schema=StructTypeStructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

UPD. I've found one way to avoid creation of Row objects - using RDD of python tuples. As expected it is still way too slow, but still a bit faster than using Row objects. Still, this is not really what I'm looking for (which is a really efficient way of passing columnar data to Spark from python).

Also measured time to do certain operations on a machine (crude way with quite a bit of variation in measured time, but still it representative in my opinion): The dataset in question is 10M rows, 3 cols (one column is constant integer, other is integer range from 0 to 10M-1, third is floating point value generated using np.random.random_sample:

  • Locally generate pandas dataframe (10M rows): ~440-450ms
  • Locally generate python list of spark.sql.Row objects (10M rows): ~12-15s
  • Locally generate python list of tuples representing rows (10M rows): ~3.4-3.5s

Generate Spark dataframe using just 1 executor and 1 initial seed value:

  • using spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s
  • using spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s
  • (non-distributed creation) using spark.createDataFrame(pandas_df, schema=my_schema): ~0.4-0.5s (without pandas df generation itself which takes roughly same time) - with spark.sql.execution.arrow.enabled set to true.

The example with local-to-driver pandas dataframe converted to Spark dataframe in ~1s for 10M rows gives me a reason to believe same should be possible with dataframes generated in executors. However fastest I can achieve now is ~40s for 10M rows using RDD of python tuples.

So the question still stays - is there a way to generate a large Spark dataframe in a distributed way efficiently in pyspark?

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
df = spark.createDataFrame(row_rdd, schema=StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())]))

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

UPD. I've found one way to avoid creation of Row objects - using RDD of python tuples. As expected it is still way too slow, but still a bit faster than using Row objects. Still, this is not really what I'm looking for (which is a really efficient way of passing columnar data to Spark from python).

Also measured time to do certain operations on a machine (crude way with quite a bit of variation in measured time, but still it representative in my opinion): The dataset in question is 10M rows, 3 cols (one column is constant integer, other is integer range from 0 to 10M-1, third is floating point value generated using np.random.random_sample:

  • Locally generate pandas dataframe (10M rows): ~440-450ms
  • Locally generate python list of spark.sql.Row objects (10M rows): ~12-15s
  • Locally generate python list of tuples representing rows (10M rows): ~3.4-3.5s

Generate Spark dataframe using just 1 executor and 1 initial seed value:

  • using spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s
  • using spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s
  • (non-distributed creation) using spark.createDataFrame(pandas_df, schema=my_schema): ~0.4-0.5s (without pandas df generation itself which takes roughly same time) - with spark.sql.execution.arrow.enabled set to true.

The example with local-to-driver pandas dataframe converted to Spark dataframe in ~1s for 10M rows gives me a reason to believe same should be possible with dataframes generated in executors. However fastest I can achieve now is ~40s for 10M rows using RDD of python tuples.

So the question still stays - is there a way to generate a large Spark dataframe in a distributed way efficiently in pyspark?

edited tags
Link
added 7 characters in body
Source Link
Loading
Source Link
Loading