Applying Udfs on Groupeddata in Pyspark (With Functioning Python Example)

User Defined Aggregate Function in PySpark SQL

A Pandas UDF can be used, where the definition is compatible from Spark 3.0 and Python 3.6+. See the issue and documentation for details.

Full implementation in Spark SQL:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')

@pandas_udf(DoubleType())
def avg_udf(s: pd.Series) -> float:
return s.mean()
spark.udf.register('avg_udf', avg_udf)

rv = spark.sql('SELECT id, avg_udf(value) FROM df GROUP BY id').toPandas()

with return value

In [2]: rv
Out[2]:
id avg_udf(value)
0 1 1.5
1 2 3.5

How to use UDFs with pandas on pyspark groupby?

According to GroupedData.agg documentation, you need to define your pandas_udf with PandasUDFType. And if you need an aggregation then it would be PandasUDFType.GROUPED_AGG.

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
return (x**2).mean()

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
return x.mean()

spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)

df.groupby('A').agg({'B':'agg_a_','C':'agg_b_'}).show()

# +---+---------+---------+
# | A|agg_a_(B)|agg_b_(C)|
# +---+---------+---------+
# | b| 9.0| 5.0|
# | a| 2.5| 5.0|
# +---+---------+---------+

Creating PySpark UDFs from python method with numpy array input, to calculate and return a single float value

What you want is groupby and use collect_list to get all integer values into an array column then apply your UDF on that column. Also, you need to explicitly return float from calc_rms:

from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

def calc_rms(float_array):
return float(np.sqrt(np.mean(np.diff(float_array) ** 2)))

calc_rms_udf = F.udf(calc_rms, FloatType())

df.groupby().agg(F.collect_list("_c0").alias("_c0")) \
.select(calc_rms_udf(F.col("_c0")).alias("rms")) \
.show()

#+--------+
#| rms|
#+--------+
#|67.16202|
#+--------+

Apply a function to groupBy data with pyspark

A natural approach could be to group the words into one list, and then use the python function Counter() to generate word counts. For both steps we'll use udf's. First, the one that will flatten the nested list resulting from collect_list() of multiple arrays:

unpack_udf = udf(
lambda l: [item for sublist in l for item in sublist]
)

Second, one that generates the word count tuples, or in our case struct's:

from pyspark.sql.types import *
from collections import Counter

# We need to specify the schema of the return object
schema_count = ArrayType(StructType([
StructField("word", StringType(), False),
StructField("count", IntegerType(), False)
]))

count_udf = udf(
lambda s: Counter(s).most_common(),
schema_count
)

Putting it all together:

from pyspark.sql.functions import collect_list

(df.groupBy("id")
.agg(collect_list("message").alias("message"))
.withColumn("message", unpack_udf("message"))
.withColumn("message", count_udf("message"))).show(truncate = False)
+-----------------+------------------------------------------------------+
|id |message |
+-----------------+------------------------------------------------------+
|10100718890699676|[[oecd,1], [the,1], [with,1], [at,1]] |
|10100720363468236|[[what,3], [me,1], [sad,1], [to,1], [does,1], [the,1]]|
+-----------------+------------------------------------------------------+

Data:

df = sc.parallelize([(10100720363468236,["what", "sad", "to", "me"]),
(10100720363468236,["what", "what", "does", "the"]),
(10100718890699676,["at", "the", "oecd", "with"])]).toDF(["id", "message"])

User defined function to be applied to Window in PySpark?

Spark >= 3.0:

SPARK-24561 - User-defined window functions with pandas udf (bounded window) is a a work in progress. Please follow the related JIRA for details.

Spark >= 2.4:

SPARK-22239 - User-defined window functions with pandas udf (unbounded window) introduced support for Pandas based window functions with unbounded windows. General structure is

return_type: DataType

@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...

w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn('foo', f('bar').over(w))

Please see the doctests and the unit tests for detailed examples.

Spark < 2.4

You cannot. Window functions require UserDefinedAggregateFunction or equivalent object, not UserDefinedFunction, and it is not possible to define one in PySpark.

However, in PySpark 2.3 or later, you can define vectorized pandas_udf, which can be applied on grouped data. You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling. Furthermore function used with GroupedData.apply can return arbitrary number of rows.

You can also call Scala UDAF from PySpark Spark: How to map Python with Scala or Java User Defined Functions?.



Related Topics



Leave a reply



Submit