Creating User Defined Function in Spark-Sql

About creating a User Defined Function (UDF) in Spark Scala

You don't need udf for the function in your question:

someDF.select($"number", $"word", log(lit(25) / (lit(1) + $"number")) as "newC")

If you insist on using udf though:

val caldf = udf { df: Double => math.log(25/(df+1)) }

Creating a user defined function in Spark to process a nested structure column

I found a solution by deconstructing the column since it was in an array<struct<array<double>, double>> format and following Spark UDF for StructType/Row. However, I believe there still may be a more concise way to do this.

def checkArray(clusterID: Double, 
colRN: Double,
dataStruct: Row): Double = {
// Array[(Array[Double], Double)]
val arrData: Seq[(Seq[Double], Double)] = dataStruct
.getAs[Seq[Seq[Double]]](0)
.zipWithIndex.map{
case (arr, idx) => (arr, dataStruct.getAs[Seq[Double]](1)(idx))
}

// @tailrec
def getReturnID(clusterID: Double,
colRN: Double,
arr: Seq[(Seq[Double], Double)]): Double = arr match {

case arr if arr.nonEmpty && arr(0)._1.contains(colRN) =>
arr(0)._2
case arr if arr.nonEmpty && !arr(0)._1.contains(colRN) =>
getReturnID(clusterID, colRN, arr.drop(1))
case _ => clusterID
}

getReturnID(clusterID, colRN, arrData)
}

val columnUpdate: UserDefinedFunction = udf {
(colID: Double, colRN: Double, colStructData: Row) =>

if(colStructData.getAs[Seq[Double]](0).nonEmpty) {
checkArray(colID, colRN, colStructData)
}
else {
colID
}
}

// I believe all this withColumns are unnecessary but this was the only way
// I could get a working solution
data.join(
broadcast(identifiedData
.withColumn("data_to_update", $"data_to_update")
.withColumn("array", $"data_to_update._1")
.withColumn("value", $"data_to_update._2")
.withColumn("data_struct", struct("array", "value"))
.drop("data_to_update", "array", "value")
),
Seq("user", "cat"),
"inner"
)
.withColumn("id", columnUpdate($"id", $"rn", $"data_struct"))
.show(100, false)

Creating User Defined (not temporary) Function in Spark-SQL for Azure Databricks

The CREATE FUNCTION statement in Databricks that you are referencing is actually a Hive command, not Spark, and it expects the UDF class to be a Hive UDF.

That is also the reason for the "No handler for UDF/UDAF/UDTF" error you are getting. The example you have linked implements a Spark UDF, while what you need is to implement a Hive UDF.

To create a Hive UDF, you need to implement a class that extends the class org.apache.hadoop.hive.ql.exec.UDF and implements a function called evaluate. In your case, the whole class should look like this:

class GetTimestampDifference extends UDF {

def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

//https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
//https://spark.apache.org/docs/2.4.0/sql-reference.html
//https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

interval match
{
case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
}
}

}

You then need to compile it to a JAR file, copy it somewhere into the databricks filesystem and create the permanent function using the same command as you did before (assuming you keep the namespace of the IBAN example):

CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'

SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))

Assuming you are still modifying the IBAN example project that you started with, in order to create the jar file you will have to add the following package dependency to the build.sbt file:

"org.apache.spark" %% "spark-hive" % "2.4.3"

Spark Dataframe - using User Defined Function to add a column

new_f = F.udf(Total, IntegerType()) 

assigns the name new_f to that user defined function

Create a spark dataframe column from a user defined function

It works without a hitch:

val someDF = Seq((0, "hello"), (1, "world")).toDF("id", "text")    
import org.apache.spark.sql.functions.udf
val rand = new java.util.Random(42)
val zipUdf = udf(() => 10000 + rand.nextInt(200))

someDF.withColumn("postalCode", zipUdf()).show

The output for the code above:

+---+-----+----------+
| id| text|postalCode|
+---+-----+----------+
| 0|hello| 10130|
| 1|world| 10163|
+---+-----+----------+

Using col() with withColumn is the primary reason for failure.

How to register UDF to use in SQL and DataFrame?

UDFRegistration.register variants, which take a scala.FunctionN, return an UserDefinedFunction so you can register SQL function and create DSL friendly UDF in a single step:

val timesTwoUDF = spark.udf.register("timesTwo", (x: Int) => x * 2)
spark.sql("SELECT timesTwo(1)").show
+---------------+
|UDF:timesTwo(1)|
+---------------+
| 2|
+---------------+
spark.range(1, 2).toDF("x").select(timesTwoUDF($"x")).show
+------+
|UDF(x)|
+------+
| 2|
+------+

How to run user defined function over a window in spark dataframe?

If you're using spark 3.1+, you can use percentile_approx to calculate the quantiles, and do rest of the calculations in pyspark. In case your spark version does not have that function, we can use an UDF that uses numpy.quantile for the quantile calculation. I've shown both in the code.

data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['pressure', 'ts']). \
withColumn('ts', func.col('ts').cast('timestamp')). \
withColumn('dt_hr', func.date_format('ts', 'yyyyMMddHH'))

# +--------+-------------------+----------+
# |pressure| ts| dt_hr|
# +--------+-------------------+----------+
# | 358.64|2022-01-01 00:00:00|2022010100|
# | 354.98|2022-01-01 00:10:00|2022010100|
# | 350.34|2022-01-01 00:20:00|2022010100|
# | 429.69|2022-01-01 00:30:00|2022010100|
# | 420.41|2022-01-01 00:40:00|2022010100|
# | 413.82|2022-01-01 00:50:00|2022010100|
# | 409.42|2022-01-01 01:00:00|2022010101|
# | 409.67|2022-01-01 01:10:00|2022010101|
# | 413.33|2022-01-01 01:20:00|2022010101|
# | 405.03|2022-01-01 01:30:00|2022010101|
# | 1209.42|2022-01-01 01:40:00|2022010101|
# | 405.03|2022-01-01 01:50:00|2022010101|
# +--------+-------------------+----------+

getting the quantiles (showing both methods; use whichever is available)

# spark 3.1+ has percentile_approx
pressure_quantile_sdf = data_sdf. \
groupBy('dt_hr'). \
agg(func.percentile_approx('pressure', [0.2, 0.8]).alias('quantile_20_80'))

# +----------+----------------+
# | dt_hr| quantile_20_80|
# +----------+----------------+
# |2022010100|[354.98, 420.41]|
# |2022010101|[405.03, 413.33]|
# +----------+----------------+

# lower versions use UDF
def numpy_quantile_20_80(list_col):
import numpy as np

q_20 = np.quantile(list_col, 0.2)
q_80 = np.quantile(list_col, 0.8)

return [float(q_20), float(q_80)]

numpy_quantile_20_80_udf = func.udf(numpy_quantile_20_80, ArrayType(FloatType()))

pressure_quantile_sdf = data_sdf. \
groupBy('dt_hr'). \
agg(func.collect_list('pressure').alias('pressure_list')). \
withColumn('quantile_20_80', numpy_quantile_20_80_udf(func.col('pressure_list')))

# +----------+--------------------+----------------+
# | dt_hr| pressure_list| quantile_20_80|
# +----------+--------------------+----------------+
# |2022010100|[358.64, 354.98, ...|[354.98, 420.41]|
# |2022010101|[409.42, 409.67, ...|[405.03, 413.33]|
# +----------+--------------------+----------------+

outlier calculation would be easy with the quantile info

pressure_quantile_sdf = pressure_quantile_sdf. \
withColumn('quantile_20', func.col('quantile_20_80')[0]). \
withColumn('quantile_80', func.col('quantile_20_80')[1]). \
withColumn('min_q_20', func.col('quantile_20') - 1.5 * (func.col('quantile_80') - func.col('quantile_20'))). \
withColumn('max_q_80', func.col('quantile_80') + 1.5 * (func.col('quantile_80') - func.col('quantile_20'))). \
select('dt_hr', 'min_q_20', 'max_q_80')

# +----------+------------------+------------------+
# | dt_hr| min_q_20| max_q_80|
# +----------+------------------+------------------+
# |2022010100|256.83502197265625| 518.5549926757812|
# |2022010101|392.58001708984375|425.77996826171875|
# +----------+------------------+------------------+

# outlier calc -- select columns that are required
data_sdf. \
join(pressure_quantile_sdf, 'dt_hr', 'left'). \
withColumn('is_outlier', ((func.col('pressure') > func.col('max_q_80')) | (func.col('pressure') < func.col('min_q_20'))).cast('int')). \
show()

# +----------+--------+-------------------+------------------+------------------+----------+
# | dt_hr|pressure| ts| min_q_20| max_q_80|is_outlier|
# +----------+--------+-------------------+------------------+------------------+----------+
# |2022010100| 358.64|2022-01-01 00:00:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 354.98|2022-01-01 00:10:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 350.34|2022-01-01 00:20:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 429.69|2022-01-01 00:30:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 420.41|2022-01-01 00:40:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 413.82|2022-01-01 00:50:00|256.83502197265625| 518.5549926757812| 0|
# |2022010101| 409.42|2022-01-01 01:00:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 409.67|2022-01-01 01:10:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 413.33|2022-01-01 01:20:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 405.03|2022-01-01 01:30:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 1209.42|2022-01-01 01:40:00|392.58001708984375|425.77996826171875| 1|
# |2022010101| 405.03|2022-01-01 01:50:00|392.58001708984375|425.77996826171875| 0|
# +----------+--------+-------------------+------------------+------------------+----------+


Related Topics



Leave a reply



Submit