About creating a User Defined Function (UDF) in Spark Scala
You don't need udf for the function in your question:
someDF.select($"number", $"word", log(lit(25) / (lit(1) + $"number")) as "newC")
If you insist on using udf though:
val caldf = udf { df: Double => math.log(25/(df+1)) }
Creating a user defined function in Spark to process a nested structure column
I found a solution by deconstructing the column since it was in an array<struct<array<double>, double>>
format and following Spark UDF for StructType/Row. However, I believe there still may be a more concise way to do this.
def checkArray(clusterID: Double,
colRN: Double,
dataStruct: Row): Double = {
// Array[(Array[Double], Double)]
val arrData: Seq[(Seq[Double], Double)] = dataStruct
.getAs[Seq[Seq[Double]]](0)
.zipWithIndex.map{
case (arr, idx) => (arr, dataStruct.getAs[Seq[Double]](1)(idx))
}
// @tailrec
def getReturnID(clusterID: Double,
colRN: Double,
arr: Seq[(Seq[Double], Double)]): Double = arr match {
case arr if arr.nonEmpty && arr(0)._1.contains(colRN) =>
arr(0)._2
case arr if arr.nonEmpty && !arr(0)._1.contains(colRN) =>
getReturnID(clusterID, colRN, arr.drop(1))
case _ => clusterID
}
getReturnID(clusterID, colRN, arrData)
}
val columnUpdate: UserDefinedFunction = udf {
(colID: Double, colRN: Double, colStructData: Row) =>
if(colStructData.getAs[Seq[Double]](0).nonEmpty) {
checkArray(colID, colRN, colStructData)
}
else {
colID
}
}
// I believe all this withColumns are unnecessary but this was the only way
// I could get a working solution
data.join(
broadcast(identifiedData
.withColumn("data_to_update", $"data_to_update")
.withColumn("array", $"data_to_update._1")
.withColumn("value", $"data_to_update._2")
.withColumn("data_struct", struct("array", "value"))
.drop("data_to_update", "array", "value")
),
Seq("user", "cat"),
"inner"
)
.withColumn("id", columnUpdate($"id", $"rn", $"data_struct"))
.show(100, false)
Creating User Defined (not temporary) Function in Spark-SQL for Azure Databricks
The CREATE FUNCTION statement in Databricks that you are referencing is actually a Hive command, not Spark, and it expects the UDF class to be a Hive UDF.
That is also the reason for the "No handler for UDF/UDAF/UDTF" error you are getting. The example you have linked implements a Spark UDF, while what you need is to implement a Hive UDF.
To create a Hive UDF, you need to implement a class that extends the class org.apache.hadoop.hive.ql.exec.UDF and implements a function called evaluate. In your case, the whole class should look like this:
class GetTimestampDifference extends UDF {
def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {
//https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
//https://spark.apache.org/docs/2.4.0/sql-reference.html
//https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement
interval match
{
case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
}
}
}
You then need to compile it to a JAR file, copy it somewhere into the databricks filesystem and create the permanent function using the same command as you did before (assuming you keep the namespace of the IBAN example):
CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'
SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))
Assuming you are still modifying the IBAN example project that you started with, in order to create the jar file you will have to add the following package dependency to the build.sbt file:
"org.apache.spark" %% "spark-hive" % "2.4.3"
Spark Dataframe - using User Defined Function to add a column
new_f = F.udf(Total, IntegerType())
assigns the name new_f to that user defined function
Create a spark dataframe column from a user defined function
It works without a hitch:
val someDF = Seq((0, "hello"), (1, "world")).toDF("id", "text")
import org.apache.spark.sql.functions.udf
val rand = new java.util.Random(42)
val zipUdf = udf(() => 10000 + rand.nextInt(200))
someDF.withColumn("postalCode", zipUdf()).show
The output for the code above:
+---+-----+----------+
| id| text|postalCode|
+---+-----+----------+
| 0|hello| 10130|
| 1|world| 10163|
+---+-----+----------+
Using col()
with withColumn
is the primary reason for failure.
How to register UDF to use in SQL and DataFrame?
UDFRegistration.register
variants, which take a scala.FunctionN
, return an UserDefinedFunction
so you can register SQL function and create DSL friendly UDF in a single step:
val timesTwoUDF = spark.udf.register("timesTwo", (x: Int) => x * 2)
spark.sql("SELECT timesTwo(1)").show
+---------------+
|UDF:timesTwo(1)|
+---------------+
| 2|
+---------------+
spark.range(1, 2).toDF("x").select(timesTwoUDF($"x")).show
+------+
|UDF(x)|
+------+
| 2|
+------+
How to run user defined function over a window in spark dataframe?
If you're using spark 3.1+, you can use percentile_approx
to calculate the quantiles, and do rest of the calculations in pyspark. In case your spark version does not have that function, we can use an UDF that uses numpy.quantile
for the quantile calculation. I've shown both in the code.
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['pressure', 'ts']). \
withColumn('ts', func.col('ts').cast('timestamp')). \
withColumn('dt_hr', func.date_format('ts', 'yyyyMMddHH'))
# +--------+-------------------+----------+
# |pressure| ts| dt_hr|
# +--------+-------------------+----------+
# | 358.64|2022-01-01 00:00:00|2022010100|
# | 354.98|2022-01-01 00:10:00|2022010100|
# | 350.34|2022-01-01 00:20:00|2022010100|
# | 429.69|2022-01-01 00:30:00|2022010100|
# | 420.41|2022-01-01 00:40:00|2022010100|
# | 413.82|2022-01-01 00:50:00|2022010100|
# | 409.42|2022-01-01 01:00:00|2022010101|
# | 409.67|2022-01-01 01:10:00|2022010101|
# | 413.33|2022-01-01 01:20:00|2022010101|
# | 405.03|2022-01-01 01:30:00|2022010101|
# | 1209.42|2022-01-01 01:40:00|2022010101|
# | 405.03|2022-01-01 01:50:00|2022010101|
# +--------+-------------------+----------+
getting the quantiles (showing both methods; use whichever is available)
# spark 3.1+ has percentile_approx
pressure_quantile_sdf = data_sdf. \
groupBy('dt_hr'). \
agg(func.percentile_approx('pressure', [0.2, 0.8]).alias('quantile_20_80'))
# +----------+----------------+
# | dt_hr| quantile_20_80|
# +----------+----------------+
# |2022010100|[354.98, 420.41]|
# |2022010101|[405.03, 413.33]|
# +----------+----------------+
# lower versions use UDF
def numpy_quantile_20_80(list_col):
import numpy as np
q_20 = np.quantile(list_col, 0.2)
q_80 = np.quantile(list_col, 0.8)
return [float(q_20), float(q_80)]
numpy_quantile_20_80_udf = func.udf(numpy_quantile_20_80, ArrayType(FloatType()))
pressure_quantile_sdf = data_sdf. \
groupBy('dt_hr'). \
agg(func.collect_list('pressure').alias('pressure_list')). \
withColumn('quantile_20_80', numpy_quantile_20_80_udf(func.col('pressure_list')))
# +----------+--------------------+----------------+
# | dt_hr| pressure_list| quantile_20_80|
# +----------+--------------------+----------------+
# |2022010100|[358.64, 354.98, ...|[354.98, 420.41]|
# |2022010101|[409.42, 409.67, ...|[405.03, 413.33]|
# +----------+--------------------+----------------+
outlier calculation would be easy with the quantile info
pressure_quantile_sdf = pressure_quantile_sdf. \
withColumn('quantile_20', func.col('quantile_20_80')[0]). \
withColumn('quantile_80', func.col('quantile_20_80')[1]). \
withColumn('min_q_20', func.col('quantile_20') - 1.5 * (func.col('quantile_80') - func.col('quantile_20'))). \
withColumn('max_q_80', func.col('quantile_80') + 1.5 * (func.col('quantile_80') - func.col('quantile_20'))). \
select('dt_hr', 'min_q_20', 'max_q_80')
# +----------+------------------+------------------+
# | dt_hr| min_q_20| max_q_80|
# +----------+------------------+------------------+
# |2022010100|256.83502197265625| 518.5549926757812|
# |2022010101|392.58001708984375|425.77996826171875|
# +----------+------------------+------------------+
# outlier calc -- select columns that are required
data_sdf. \
join(pressure_quantile_sdf, 'dt_hr', 'left'). \
withColumn('is_outlier', ((func.col('pressure') > func.col('max_q_80')) | (func.col('pressure') < func.col('min_q_20'))).cast('int')). \
show()
# +----------+--------+-------------------+------------------+------------------+----------+
# | dt_hr|pressure| ts| min_q_20| max_q_80|is_outlier|
# +----------+--------+-------------------+------------------+------------------+----------+
# |2022010100| 358.64|2022-01-01 00:00:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 354.98|2022-01-01 00:10:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 350.34|2022-01-01 00:20:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 429.69|2022-01-01 00:30:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 420.41|2022-01-01 00:40:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 413.82|2022-01-01 00:50:00|256.83502197265625| 518.5549926757812| 0|
# |2022010101| 409.42|2022-01-01 01:00:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 409.67|2022-01-01 01:10:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 413.33|2022-01-01 01:20:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 405.03|2022-01-01 01:30:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 1209.42|2022-01-01 01:40:00|392.58001708984375|425.77996826171875| 1|
# |2022010101| 405.03|2022-01-01 01:50:00|392.58001708984375|425.77996826171875| 0|
# +----------+--------+-------------------+------------------+------------------+----------+
Related Topics
Duplicate Postgresql Schema Including Sequences
How to Force MySQL to Perform Subquery First
How to Count All Rows with The Same Id with Count
Selecting Multiple Rows by Id, Is There a Faster Way Than Where In
Generating Seed Code from Existing Database in ASP.NET Mvc
Using Sqldf and Rpostgresql Together
Restoring a Database from .Bak File on Another Machine
SQL Query of Multi-Member File on As400
Aggregate Function Over a Given Time Interval
MySQL Correlated Subquery in Join Syntax
Renaming Multiple Columns in One Statement with Postgresql
How to Detect and Remove a Column That Contains Only Null Values
Excel Vlookup Incorporating SQL Table
Activerecord Query, Order by Association, Last of Has_Many