3

I'm a Spark newbie and am trying to use pyspark (Spark 2.2) to perform filter and aggregation operations on a very wide feature set (~13 million rows, 15,000 columns). The feature set is stored as parquet files on an S3 drive. I am running a test script to load the feature set in a dataframe, select a few thousand records, groupby a particular region code and average each of the 15k feature cols. The problem is that the job either errors out or is taking way too long (approx 8 hours for a sample of 5% records).

Are there any ways of speeding up these kinds of operations on a wide dataframe in Pyspark? I'm using Jupyter notebooks and would like these queries to complete in minutes instead of hours.

Here's my code

df_feature_store = spark.read.parquet(PATH_FEATURE_STORE).sample(False, 0.05, seed=0).cache()
    logger.info("Initial data set loaded and sampled")

    df_selected_rors = spark.read.csv(PATH_DATA_SOURCE+"ROR Sample.csv", header=True)
    agg_cols = [x for x in df_feature_store.columns if re.search("^G\d{2}",x)]
    agg_cols = agg_cols[:10]  # just testing with fewer columns
    expr = {x:"mean" for x in agg_cols}
    joineddf = df_feature_store.join(df_selected_rors, df_feature_store.ROLLOUTREGION_IDENTIFIER == df_selected_rors.ROR, "inner")
    aggdf = joineddf.groupby("ROLLOUT_REGION_IDENTIFIER").agg(expr)
    # replace groupby
    # loop for a 1000 column aggregations 
    # transpose columns into rows as arrays
    aggdf.write.mode("overwrite").csv(PATH_FEATURE_STORE + "aggregated", header=True)
    logger.info("Done")`

1 Answer 1

3

I'd try splitting this up to see where the problems lie

  • Some versions of Spark have issues with many, many columns in DFs; I can't remember the specifics.
  • read from CSV and save in Parquet locally, before any queries, filtering down columns if you can
  • run the queries Parquet local- to Parquet local

S3 as a destination of work is (a) slow to commit and (b) runs the risk of losing data on account of S3's eventual consistency. Unless you are using S3mper/S3Guard/EMR consistent EMRFS you shoudn't be using it as a direct destination of work.

Sign up to request clarification or add additional context in comments.

2 Comments

Some version - more like all, to different extent.
Thanks, I'll try option 3 as it looks most promising. The csv file that I'm reading has only 100 records and is used to filter the larger dataframe, so I don't think converting that file to parquet will help. Unfortunately, this is a POC and the purpose is to find ways to utilise the 15k or so features, so I do not want to filter down the columns.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.