Spark DataFrame: Computing row-wise mean (or any aggregate operation)
All you need here is a standard SQL like this:
SELECT (US + UK + CAN) / 3 AS mean FROM df
which can be used directly with SqlContext.sql
or expressed using DSLdf.select(((col("UK") + col("US") + col("CAN")) / lit(3)).alias("mean"))
If you have a larger number of columns you can generate expression as follows:from functools import reduce
from operator import add
from pyspark.sql.functions import col, lit
n = lit(len(df.columns) - 1.0)
rowMean = (reduce(add, (col(x) for x in df.columns[1:])) / n).alias("mean")
df.select(rowMean)
orrowMean = (sum(col(x) for x in df.columns[1:]) / n).alias("mean")
df.select(rowMean)
Finally its equivalent in Scala:df.select(df.columns
.drop(1)
.map(col)
.reduce(_ + _)
.divide(df.columns.size - 1)
.alias("mean"))
In a more complex scenario you can combine columns using array
function and use an UDF to compute statistics:import numpy as np
from pyspark.sql.functions import array, udf
from pyspark.sql.types import FloatType
combined = array(*(col(x) for x in df.columns[1:]))
median_udf = udf(lambda xs: float(np.median(xs)), FloatType())
df.select(median_udf(combined).alias("median"))
The same operation expressed using Scala API:val combined = array(df.columns.drop(1).map(col).map(_.cast(DoubleType)): _*)
val median_udf = udf((xs: Seq[Double]) =>
breeze.stats.DescriptiveStats.percentile(xs, 0.5))
df.select(median_udf(combined).alias("median"))
Since Spark 2.4 an alternative approach is to combine values into an array and apply aggregate
expression. See for example Spark Scala row-wise average by handling null. PySpark: Compute row wise aggregations with None values
Here's a workable answer I found, but would be interested in anything even more elegant. Thanks.
columns = ['US', 'UK', 'Can']
countries.withColumn('sum', sum([func.coalesce(col(x), func.lit(0)) for x in columns]))
row wise calculation on Spark
Assuming your input dataframe is as below
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|3 |7 |21 |9 |
|5 |15 |10 |2 |
+----+----+----+----+
Then you can write a udf function to get your desired output column as from pyspark.sql import functions as f
from pyspark.sql import types as t
def sortAndIndex(list):
return sorted([(value, index+1) for index, value in enumerate(sorted(list))], reverse=True)
sortAndIndexUdf = f.udf(sortAndIndex, t.ArrayType(t.StructType([t.StructField('key', t.IntegerType(), True), t.StructField('value', t.IntegerType(), True)])))
df.withColumn('sortedAndIndexed', sortAndIndexUdf(f.array([x for x in df.columns])))
which should give you +----+----+----+----+----------------------------------+
|col1|col2|col3|col4|sortedAndIndexed |
+----+----+----+----+----------------------------------+
|3 |7 |21 |9 |[[21, 4], [9, 3], [7, 2], [3, 1]] |
|5 |15 |10 |2 |[[15, 4], [10, 3], [5, 2], [2, 1]]|
+----+----+----+----+----------------------------------+
UpdateYou commented as
for that you can domy calculation should be sum(value/index) so probably using yours udf funcrtion I should return some kind of reduce(add,)?
from pyspark.sql import functions as f
from pyspark.sql import types as t
def divideAndSum(list):
return sum([float(value)/(index+1) for index, value in enumerate(sorted(list))])
divideAndSumUdf = f.udf(divideAndSum, t.DoubleType())
df.withColumn('divideAndSum', divideAndSumUdf(f.array([x for x in df.columns])))
which should give you +----+----+----+----+------------------+
|col1|col2|col3|col4|divideAndSum |
+----+----+----+----+------------------+
|3 |7 |21 |9 |14.75 |
|5 |15 |10 |2 |11.583333333333334|
+----+----+----+----+------------------+
How to calculate rowwise median in a Spark DataFrame
define a user-defined function using udf
, and then using withColumn
to add the specified column to the data frame:
from numpy import median
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
def my_median(a, b, c):
return int(median([int(a),int(b),int(c)]))
udf_median = udf(my_median, IntegerType())
df_t = df.withColumn('median', udf_median(df['a'], df['b'], df['c']))
df_t.show()
Pyspark calculate row-wise weighted average with null entries
The idea is to sum all weights per row where the columns are not null and then divide the individual weights by this sum.
To get some flexibility with the number of columns and their weights I store the weights in a dict, using the column name as key:
weights = {"first_value": 0.3, "second_value": 0.4, "third_value": 0.3}
Then I can iterate over the dict to- calculate the sum of the weights for the non-null columns
- and then calculate the sum over all non-columns for
value of column
*weight
/sum of weights
wf = "1 / ("
val = ""
for col in weights:
wf += f"if({col} is null,0 ,{weights[col]}) + "
val += f"if( {col} is null, 0, {col} * {weights[col]} * weight_factor) + "
wf += "0 )"
val += "0"
combined_df = combined_df.withColumn("weight_factor", F.expr(wf)) \
.withColumn("val", F.expr(val))
Output:+------+-----------+------------+-----------+-----------------+------------------+
|person|first_value|second_value|third_value| weight_factor| val|
+------+-----------+------------+-----------+-----------------+------------------+
| 1| 1.0| 0.5| 0.2|1.000000000000000| 0.56|
| 2| 0.9| 0.6| null|1.428571428571429|0.7285714285714289|
| 3| 0.8| null| 0.9|1.666666666666667|0.8500000000000002|
| 4| 0.8| 0.7| 0.6|1.000000000000000| 0.7|
+------+-----------+------------+-----------+-----------------+------------------+
As next step you can proceed with the aggregation and sum over val
. spark - Calculating average of values in 2 or more columns and putting in new column in every row
One of the easiest and optimized way is to create a list of columns of marks columns and use it with withColumn
as
pyspark
from pyspark.sql.functions import col
marksColumns = [col('marks1'), col('marks2')]
averageFunc = sum(x for x in marksColumns)/len(marksColumns)
df.withColumn('Result(Avg)', averageFunc).show(truncate=False)
and you should get +-----+------+------+-----------+
|name |marks1|marks2|Result(Avg)|
+-----+------+------+-----------+
|Alice|10 |20 |15.0 |
|Bob |20 |30 |25.0 |
+-----+------+------+-----------+
scala-spark
the process is almost same in scala as done in python aboveimport org.apache.spark.sql.functions.{col, lit}
val marksColumns = Array(col("marks1"), col("marks2"))
val averageFunc = marksColumns.foldLeft(lit(0)){(x, y) => x+y}/marksColumns.length
df.withColumn("Result(Avg)", averageFunc).show(false)
which should give you same output as in pysparkI hope the answer is helpful
Related Topics
Break the Function After Certain Time
Generate Rfc 3339 Timestamp in Python
In What Order Does Python Display Dictionary Keys
What Is the Relationship Between Google's App Engine Sdk and Cloud Sdk
Append Dataframe to Excel with Pandas
How to Save Final Model Using Keras
Check If an Item Is in a Nested List
Django Abstract Models Versus Regular Inheritance
Strip/Trim All Strings of a Dataframe
Multiprocessing.Pool Makes Numpy Matrix Multiplication Slower
Does Python Evaluate If's Conditions Lazily
Elif' in List Comprehension Conditionals