15

How can I sum multiple columns in Spark? For example, in SparkR the following code works to get the sum of one column, but if I try to get the sum of both columns in df, I get an error.

# Create SparkDataFrame
df <- createDataFrame(faithful)

# Use agg to sum total waiting times
head(agg(df, totalWaiting = sum(df$waiting)))
##This works

# Use agg to sum total of waiting and eruptions
head(agg(df, total = sum(df$waiting, df$eruptions)))
##This doesn't work

Either SparkR or PySpark code will work.

7 Answers 7

61

For PySpark, if you don't want to explicitly type out the columns:

from operator import add
from functools import reduce
new_df = df.withColumn('total',reduce(add, [F.col(x) for x in numeric_col_list]))
Sign up to request clarification or add additional context in comments.

3 Comments

Why this tool is not in spark api?
That is a useful technique, and surely will help many people who google this question, but not what the original question asked about :) (it asked about an aggregation, not a row operation)
The original question was confusing aggregation (summing rows) with calculated fields (in this case summing columns).
9

you can do something like the below in pyspark

>>> from pyspark.sql import functions as F
>>> df = spark.createDataFrame([("a",1,10), ("b",2,20), ("c",3,30), ("d",4,40)], ["col1", "col2", "col3"])
>>> df.groupBy("col1").agg(F.sum(df.col2+df.col3)).show()
+----+------------------+
|col1|sum((col2 + col3))|
+----+------------------+
|   d|                44|
|   c|                33|
|   b|                22|
|   a|                11|
+----+------------------+

1 Comment

Yes but... 1 + NULL = NULL.
5
org.apache.spark.sql.functions.sum(Column e)

Aggregate function: returns the sum of all values in the expression.

As you can see, sum takes just one column as input so sum(df$waiting, df$eruptions) wont work.Since you wan to sum up the numeric fields, you can dosum(df("waiting") + df("eruptions")).If you wan to sum up values for individual columns then, you can df.agg(sum(df$waiting),sum(df$eruptions)).show

3 Comments

For me, this one worked df.withColumn("newCol", col("col1")+col("col2"))
@Ali yes that is also an alternative.
The original question as I understood it is about aggregation: summing columns "vertically" (for each column, sum all the rows), not a row operation: summing rows "horizontally" (for each row, sum the values in columns on that row).
4

You can use expr():

import pyspark.sql.functions as f

numeric_cols = ['col_a','col_b','col_c']
df = df.withColumn('total', f.expr('+'.join(cols)))

PySpark expr() is a SQL function to execute SQL-like expressions.

1 Comment

it should be df = df.withColumn('total', f.expr('+'.join(numeric_cols)))
3

sparkR code:

library(SparkR)
df <- createDataFrame(sqlContext,faithful)
w<-agg(df,sum(df$waiting)),agg(df,sum(df$eruptions))
head(w[[1]])
head(w[[2]])

Comments

1

The accepted answer was helpful for me, but I found out the one below is simpler and it does not use external API.

sum_df = df.withColumn('total', lit(0))
for c in col_list:
    sum_df = sum_df.withColumn('total', col('total') + col(c))

Comments

0

You can do

from pyspark.sql.types import StructType, StructField, FloatType

schema = StructType([
    StructField("col1", FloatType(), True),
    StructField("col2", FloatType(), True),
    StructField("col3", FloatType(), True),
    StructField("col4", FloatType(), True),
    StructField("col5", FloatType(), True)
])

data = [
    (1.1, 2.2, 3.3, 4.4, 5.5),
    (6.6, 7.7, 8.8, 9.9, 10.0)
]
df = spark.createDataFrame(data, schema)
numeric_col_list = df.columns

df = df.withColumn(
            'total',
            sum(
                [F.col(x) for x in numeric_col_list], 
                start=F.lit(0)
            )
        )
df.show()

or simply

sum(F.col(x) for x in numeric_col_list)

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.