User Defined Aggregate Function in PySpark SQL
A Pandas UDF can be used, where the definition is compatible from Spark 3.0
and Python 3.6+
. See the issue and documentation for details.
Full implementation in Spark SQL:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
@pandas_udf(DoubleType())
def avg_udf(s: pd.Series) -> float:
return s.mean()
spark.udf.register('avg_udf', avg_udf)
rv = spark.sql('SELECT id, avg_udf(value) FROM df GROUP BY id').toPandas()
with return value
In [2]: rv
Out[2]:
id avg_udf(value)
0 1 1.5
1 2 3.5
How to use UDFs with pandas on pyspark groupby?
According to GroupedData.agg
documentation, you need to define your pandas_udf
with PandasUDFType
. And if you need an aggregation then it would be PandasUDFType.GROUPED_AGG
.
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
return (x**2).mean()
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df.groupby('A').agg({'B':'agg_a_','C':'agg_b_'}).show()
# +---+---------+---------+
# | A|agg_a_(B)|agg_b_(C)|
# +---+---------+---------+
# | b| 9.0| 5.0|
# | a| 2.5| 5.0|
# +---+---------+---------+
Creating PySpark UDFs from python method with numpy array input, to calculate and return a single float value
What you want is groupby and use collect_list
to get all integer values into an array column then apply your UDF on that column. Also, you need to explicitly return float from calc_rms
:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
def calc_rms(float_array):
return float(np.sqrt(np.mean(np.diff(float_array) ** 2)))
calc_rms_udf = F.udf(calc_rms, FloatType())
df.groupby().agg(F.collect_list("_c0").alias("_c0")) \
.select(calc_rms_udf(F.col("_c0")).alias("rms")) \
.show()
#+--------+
#| rms|
#+--------+
#|67.16202|
#+--------+
Apply a function to groupBy data with pyspark
A natural approach could be to group the words into one list, and then use the python function Counter()
to generate word counts. For both steps we'll use udf
's. First, the one that will flatten the nested list resulting from collect_list()
of multiple arrays:
unpack_udf = udf(
lambda l: [item for sublist in l for item in sublist]
)
Second, one that generates the word count tuples, or in our case struct
's:
from pyspark.sql.types import *
from collections import Counter
# We need to specify the schema of the return object
schema_count = ArrayType(StructType([
StructField("word", StringType(), False),
StructField("count", IntegerType(), False)
]))
count_udf = udf(
lambda s: Counter(s).most_common(),
schema_count
)
Putting it all together:
from pyspark.sql.functions import collect_list
(df.groupBy("id")
.agg(collect_list("message").alias("message"))
.withColumn("message", unpack_udf("message"))
.withColumn("message", count_udf("message"))).show(truncate = False)
+-----------------+------------------------------------------------------+
|id |message |
+-----------------+------------------------------------------------------+
|10100718890699676|[[oecd,1], [the,1], [with,1], [at,1]] |
|10100720363468236|[[what,3], [me,1], [sad,1], [to,1], [does,1], [the,1]]|
+-----------------+------------------------------------------------------+
Data:
df = sc.parallelize([(10100720363468236,["what", "sad", "to", "me"]),
(10100720363468236,["what", "what", "does", "the"]),
(10100718890699676,["at", "the", "oecd", "with"])]).toDF(["id", "message"])
User defined function to be applied to Window in PySpark?
Spark >= 3.0:
SPARK-24561 - User-defined window functions with pandas udf (bounded window) is a a work in progress. Please follow the related JIRA for details.
Spark >= 2.4:
SPARK-22239 - User-defined window functions with pandas udf (unbounded window) introduced support for Pandas based window functions with unbounded windows. General structure is
return_type: DataType
@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...
w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('foo', f('bar').over(w))
Please see the doctests and the unit tests for detailed examples.
Spark < 2.4
You cannot. Window functions require UserDefinedAggregateFunction
or equivalent object, not UserDefinedFunction
, and it is not possible to define one in PySpark.
However, in PySpark 2.3 or later, you can define vectorized pandas_udf
, which can be applied on grouped data. You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling
. Furthermore function used with GroupedData.apply
can return arbitrary number of rows.
You can also call Scala UDAF from PySpark Spark: How to map Python with Scala or Java User Defined Functions?.
Related Topics
Nested Ssh Using Python Paramiko
Why Does a Class' Body Get Executed at Definition Time
List Running Processes on 64-Bit Windows
How to Use Selenium to Automate Chase Site Login
Pandas New Column from Groupby Averages
Pandas: Replace Substring in String
Understanding Recursion in Python
What Is the Purpose of _Str_ and _Repr_
Preserving Styles Using Python's Xlrd,Xlwt, and Xlutils.Copy
In Python, How to Import Filename Starts with a Number
Pandas: Peculiar Performance Drop for Inplace Rename After Dropna
Pyqt Showing Video Stream from Opencv
Parameterized Queries with Psycopg2/Python Db-API and Postgresql
How to Convert a Currency String to a Floating Point Number in Python
Python Parse CSV Ignoring Comma with Double-Quotes
Pandas Extract Number from String