How to use Scala UDF accepting Map[String, String] in PySpark
I can see the problem with how you are calling the function.
You need to change the following line:
_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2()
Column(_f2.apply(_to_seq(sc, [lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN")], _to_java_column)))
As, the function can be called using 'map' method in scala, there is an equivalent method 'create_map' in pyspark. Only thing you need to do is:
from pyspark.sql.functions import create_map
_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2()
Column(_f2.apply(_to_seq(sc, [create_map(lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN"))], _to_java_column)))
That way, you will be able to call the function and solve ClassCastExceptions.
How to use Scala UDF in PySpark?
The question you've linked is using a Scala object
. Scala object
is a singleton and you can use apply
method directly.
Here you use a nullary function which returns an object of UserDefinedFunction
class co you have to call the function first:
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
User defined function to be applied to Window in PySpark?
Spark >= 3.0:
SPARK-24561 - User-defined window functions with pandas udf (bounded window) is a a work in progress. Please follow the related JIRA for details.
Spark >= 2.4:
SPARK-22239 - User-defined window functions with pandas udf (unbounded window) introduced support for Pandas based window functions with unbounded windows. General structure is
return_type: DataType
@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...
w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('foo', f('bar').over(w))
Please see the doctests and the unit tests for detailed examples.
Spark < 2.4
You cannot. Window functions require UserDefinedAggregateFunction
or equivalent object, not UserDefinedFunction
, and it is not possible to define one in PySpark.
However, in PySpark 2.3 or later, you can define vectorized pandas_udf
, which can be applied on grouped data. You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling
. Furthermore function used with GroupedData.apply
can return arbitrary number of rows.
You can also call Scala UDAF from PySpark Spark: How to map Python with Scala or Java User Defined Functions?.
How to run a python user-defined function on the partitions of RDDs using mapPartitions?
The type of parameter you get in your lambda
inside mapPartitions
is iterator, but looking on your function documentation you need numpy.ndarray
there. You can convert it easily if your dataset is small enough to be handler by one executor. Try this one:
data.mapPartitions(
lambda i: classic_sta_lta_py(np.ndarray(list(i)), 2, 30)
)
Related Topics
How to Change Size of Title's Text on Action Bar
Android - How to Create Clickable Listview
How to Draw an Arrowhead (In Android)
How to Close Another App in Android
Polyline Is Not on the Roads: It Goes Straight from One Point to Other
Firebase Realtime Database Search by Word in Between the Query
Using Asynctask with Passing a Value
Google Maps API and Custom Polyline Route Between Markers
Simple Date Format Returns Wrong Date Intermittently
Android Buildscript Repositories: Jcenter VS Mavencentral
How to Activate "Share" Button in Android App
Why Do Variable Names Often Start with the Letter 'M'
Xmpp with Java Asmack Library Supporting X-Facebook-Platform
Google Gson Linkedtreemap Class Cast to Myclass
How Does the Fetchmode Work in Spring Data JPA
Authenticated Http Proxy with Java
Passing a String by Reference in Java
Calculate Distance in Meters When You Know Longitude and Latitude in Java