3

I have a collection of 300 000 points and I would like to compute the distance between them.

    id      x    y
0   0       1    0
1   1       28   76
…

Thus I do a Cartesian product between those points and I filter such as I keep only one combination of points. Indeed for my purpose distance between points (0, 1) is same as (1,0)

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import math


@udf(returnType=IntegerType())
def compute_distance(x1,y1, x2,y2):
    return math.square(math.pow(x1-x2) + math.pow(y1-y2))


columns = ['id','x', 'y']
data = [(0, 1, 0), (1, 28,76), (2, 33,42)]
spark = SparkSession\
            .builder \
            .appName('distance computation') \
            .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
            .config('spark.executor.memory', '2g') \
            .master('local[20]') \
            .getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
result = df.alias('a')\
               .join(df.alias('b'),
                     F.array(*['a.id']) < F.array(*['b.id']))\
               .withColumn('distance', compute_distance(F.col('a.x'), F.col('a.y'), F.col('b.x'), F.col('b.y')))

result.write.parquet('distance-between-points')

While that seems to work, the CPU usage for my latest task (parquet at NativeMethodAccessorImpl.java:0) did not go above 100%. Also, it took and a day to complete.

I would like to know if the withColumn operation is performed on multiple executors in order to achieve parallelism?

Is there a way to split the data in order to compute distance by batch and to store the result in one or multiple Parquet files?

Thanks for your insight.

5
  • UDF is not necessary - just use the hypot function. Should be faster than UDF Commented Feb 4, 2021 at 13:18
  • thanks @mck I will take a look to the hypot function. It is true in order to express my problem I put simplified view. The udf calculus is more complex Commented Feb 4, 2021 at 13:22
  • 1
    to be honest this sort of calculations seem to be more suitable for something like Numba rather than Spark Commented Feb 4, 2021 at 13:22
  • I will try with numba Commented Feb 4, 2021 at 13:23
  • The dataframe will be partitioned across your cluster and the udf will run in parallel on each executor, but you've got a large result set (1.5x10^10). Perhaps your cluster is under spec'd? The level of parallelism is limited by the number of cores on your cluster available to Spark. Commented Feb 8, 2021 at 15:07

1 Answer 1

2

I would like to know if the withColumn operation is performed on multiple executor in order to achieve parallelism ?

Yes, assuming a correctly configured cluster, the dataframe will be partitioned across your cluster and the executors will work through the partitions in parallel running your UDF.

Is there a way to split the data in order to compute distance by batch in // and to store them into one or multiples parquet files ?

By default, the resulting dataframe will be partitioned across the cluster and written out as one Parquet file per partition. You can change that by re-partioning if required, but that will result in a shuffle and take longer.

I recommend the 'Level of Parallelism' section in the Learning Spark book for further reading.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.