Spark Dataframe: Computing Row-Wise Mean (Or Any Aggregate Operation)

Spark DataFrame: Computing row-wise mean (or any aggregate operation)

All you need here is a standard SQL like this:

SELECT (US + UK + CAN) / 3 AS mean FROM df

which can be used directly with SqlContext.sql or expressed using DSL

df.select(((col("UK") + col("US") + col("CAN")) / lit(3)).alias("mean"))

If you have a larger number of columns you can generate expression as follows:

from functools import reduce
from operator import add
from pyspark.sql.functions import col, lit

n = lit(len(df.columns) - 1.0)
rowMean = (reduce(add, (col(x) for x in df.columns[1:])) / n).alias("mean")

df.select(rowMean)

or

rowMean  = (sum(col(x) for x in df.columns[1:]) / n).alias("mean")
df.select(rowMean)

Finally its equivalent in Scala:

df.select(df.columns
.drop(1)
.map(col)
.reduce(_ + _)
.divide(df.columns.size - 1)
.alias("mean"))

In a more complex scenario you can combine columns using array function and use an UDF to compute statistics:

import numpy as np
from pyspark.sql.functions import array, udf
from pyspark.sql.types import FloatType

combined = array(*(col(x) for x in df.columns[1:]))
median_udf = udf(lambda xs: float(np.median(xs)), FloatType())

df.select(median_udf(combined).alias("median"))

The same operation expressed using Scala API:

val combined = array(df.columns.drop(1).map(col).map(_.cast(DoubleType)): _*)
val median_udf = udf((xs: Seq[Double]) =>
breeze.stats.DescriptiveStats.percentile(xs, 0.5))

df.select(median_udf(combined).alias("median"))

Since Spark 2.4 an alternative approach is to combine values into an array and apply aggregate expression. See for example Spark Scala row-wise average by handling null.

PySpark: Compute row wise aggregations with None values

Here's a workable answer I found, but would be interested in anything even more elegant. Thanks.

columns = ['US', 'UK', 'Can']
countries.withColumn('sum', sum([func.coalesce(col(x), func.lit(0)) for x in columns]))

row wise calculation on Spark

Assuming your input dataframe is as below

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|3 |7 |21 |9 |
|5 |15 |10 |2 |
+----+----+----+----+

Then you can write a udf function to get your desired output column as

from pyspark.sql import functions as f
from pyspark.sql import types as t
def sortAndIndex(list):
return sorted([(value, index+1) for index, value in enumerate(sorted(list))], reverse=True)

sortAndIndexUdf = f.udf(sortAndIndex, t.ArrayType(t.StructType([t.StructField('key', t.IntegerType(), True), t.StructField('value', t.IntegerType(), True)])))

df.withColumn('sortedAndIndexed', sortAndIndexUdf(f.array([x for x in df.columns])))

which should give you

+----+----+----+----+----------------------------------+
|col1|col2|col3|col4|sortedAndIndexed |
+----+----+----+----+----------------------------------+
|3 |7 |21 |9 |[[21, 4], [9, 3], [7, 2], [3, 1]] |
|5 |15 |10 |2 |[[15, 4], [10, 3], [5, 2], [2, 1]]|
+----+----+----+----+----------------------------------+

Update

You commented as

my calculation should be sum(value/index) so probably using yours udf funcrtion I should return some kind of reduce(add,)?

for that you can do

from pyspark.sql import functions as f
from pyspark.sql import types as t
def divideAndSum(list):
return sum([float(value)/(index+1) for index, value in enumerate(sorted(list))])

divideAndSumUdf = f.udf(divideAndSum, t.DoubleType())

df.withColumn('divideAndSum', divideAndSumUdf(f.array([x for x in df.columns])))

which should give you

+----+----+----+----+------------------+
|col1|col2|col3|col4|divideAndSum |
+----+----+----+----+------------------+
|3 |7 |21 |9 |14.75 |
|5 |15 |10 |2 |11.583333333333334|
+----+----+----+----+------------------+

How to calculate rowwise median in a Spark DataFrame

define a user-defined function using udf, and then using withColumn to add the specified column to the data frame:

from numpy import median
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

def my_median(a, b, c):
return int(median([int(a),int(b),int(c)]))

udf_median = udf(my_median, IntegerType())

df_t = df.withColumn('median', udf_median(df['a'], df['b'], df['c']))
df_t.show()

Pyspark calculate row-wise weighted average with null entries

The idea is to sum all weights per row where the columns are not null and then divide the individual weights by this sum.

To get some flexibility with the number of columns and their weights I store the weights in a dict, using the column name as key:

weights = {"first_value": 0.3, "second_value": 0.4, "third_value": 0.3}

Then I can iterate over the dict to

  • calculate the sum of the weights for the non-null columns
  • and then calculate the sum over all non-columns for value of column * weight / sum of weights
wf = "1 / ("
val = ""
for col in weights:
wf += f"if({col} is null,0 ,{weights[col]}) + "
val += f"if( {col} is null, 0, {col} * {weights[col]} * weight_factor) + "
wf += "0 )"
val += "0"

combined_df = combined_df.withColumn("weight_factor", F.expr(wf)) \
.withColumn("val", F.expr(val))

Output:

+------+-----------+------------+-----------+-----------------+------------------+
|person|first_value|second_value|third_value| weight_factor| val|
+------+-----------+------------+-----------+-----------------+------------------+
| 1| 1.0| 0.5| 0.2|1.000000000000000| 0.56|
| 2| 0.9| 0.6| null|1.428571428571429|0.7285714285714289|
| 3| 0.8| null| 0.9|1.666666666666667|0.8500000000000002|
| 4| 0.8| 0.7| 0.6|1.000000000000000| 0.7|
+------+-----------+------------+-----------+-----------------+------------------+

As next step you can proceed with the aggregation and sum over val.

spark - Calculating average of values in 2 or more columns and putting in new column in every row

One of the easiest and optimized way is to create a list of columns of marks columns and use it with withColumn as

pyspark

from pyspark.sql.functions import col

marksColumns = [col('marks1'), col('marks2')]

averageFunc = sum(x for x in marksColumns)/len(marksColumns)

df.withColumn('Result(Avg)', averageFunc).show(truncate=False)

and you should get

+-----+------+------+-----------+
|name |marks1|marks2|Result(Avg)|
+-----+------+------+-----------+
|Alice|10 |20 |15.0 |
|Bob |20 |30 |25.0 |
+-----+------+------+-----------+

scala-spark

the process is almost same in scala as done in python above

import org.apache.spark.sql.functions.{col, lit}

val marksColumns = Array(col("marks1"), col("marks2"))

val averageFunc = marksColumns.foldLeft(lit(0)){(x, y) => x+y}/marksColumns.length

df.withColumn("Result(Avg)", averageFunc).show(false)

which should give you same output as in pyspark

I hope the answer is helpful



Related Topics



Leave a reply



Submit