How to Deal with Spark Udf Input/Output of Primitive Nullable Type

SparkSQL: How to deal with null values in user defined function?

This is where Optioncomes in handy:

val extractDateAsOptionInt = udf((d: String) => d match {
case null => None
case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})

or to make it slightly more secure in general case:

import scala.util.Try

val extractDateAsOptionInt = udf((d: String) => Try(
d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)

All credit goes to Dmitriy Selivanov who've pointed out this solution as a (missing?) edit here.

Alternative is to handle null outside the UDF:

import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType

val extractDateAsInt = udf(
(d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)

df.withColumn("y",
when($"x".isNull, lit(null))
.otherwise(extractDateAsInt($"x"))
.cast(IntegerType)
)

Passing nullable columns as parameter to Spark SQL UDF

The issue is that null is not a valid value for scala Int (which is the backing value) while it is a valid value for String. Int is equivalent to java int primitive and must have a value. This means the udf can't be called when the value is null and therefore null remains.

There are two ways to solve this:

  1. Change the function to accept java.lang.Integer (which is an object and can be null)
  2. If you can't change the function, you can use when/otherwise to do something special in case of null. For example when(col("int col").isNull, someValue).otherwise(the original call)

How do i handle the null pointer exception error in the below udf

You could consider using Option's to encapsulate nulls. Nulls littered in scala code is generally considered a code smell. Perhaps, something to this effect might help.

val getValue: (String, String, String) => Option[String] = { (info, date1, date2) =>
(info, Option(date1), Option(date2)) match {
case ("11111", Some(d1), _) => Some(d1)
case ("22222", Some(d1), _) => Some(d1)
case ("33333", _, Some(d2)) => Some(d2)
case _ => None
}
}

Then you can use it in your UDF as follows,

val udfTest = udf(getValue)
df.withColumn("optional", udfTest(df("info"), df("date1"), df("date2"))).show()

Note that now instead of a String column you have an Option[String] column in your dataframe.

Here's an interesting guide to the Optional data type in scala, and interesting use cases.

Edit:

Something like this then to address you question about translating the holiday spark sql code

private val yyyyMMddFormat = new SimpleDateFormat("yyyy-MM-dd")
private val days = List("Saturday", "Sunday")
private val holidays = List(
getPreviousDay("2018-05-22"),
getPreviousDay("2018-06-01")
)

def isPreviousDayAHoliday(date: String, days: List[String], holidays: List[java.sql.Date]): Boolean = {
val previousDay = getPreviousDay(date)
val eeeeFormat = new SimpleDateFormat("EEEE")
val dayOfPreviousDay = eeeeFormat.format(previousDay)
days.contains(dayOfPreviousDay) || holidays.contains(previousDay)
}

def getPreviousDay(date: String): java.sql.Date = {
new java.sql.Date(yyyyMMddFormat.parse(date).getTime - DAYS.toMillis(1))
}

Now you can use this as a guard.

(info, Option(date1), Option(date2)) match {
case ("12391", _, Some(d2)) if isPreviousDayAHoliday(d2, days, holidays) => Some(yyyyMMddFormat.format(getPreviousDay(d2)))
case _ => None
}

Hope this helps

What are Untyped Scala UDF and Typed Scala UDF? What are their differences?

In typed scala UDF, UDF knows the types of the columns passed as argument, whereas in untyped scala UDF, UDF doesn't know the types of the columns passed as argument

When creating typed scala UDF, the types of columns passed as argument and output of the UDF are inferred from the function arguments and output types whereas when creating untyped scala UDF, there is not type inference at all, either for arguments or output.

What can be confusing is that when creating typed UDF the types are inferred from function and not explicitly passed as argument. To be more explicit, you can write typed UDF creation as follow:

val my_typed_udf = udf[Int, Int]((x: Int) => Int)

Now, let's look at the two points you raised.

To my understanding, the first one (eg udf(AnyRef, DataType)) looks more strictly typed than the second one (eg udf(AnyRef)) where the first one has its output type explicitly defined and the second one does not, hence my confusion on why it's called UnTyped.

According to spark functions scaladoc, signatures of the udf functions that transform a function to an UDF are actually, for the first one:

def udf(f: AnyRef, dataType: DataType): UserDefinedFunction 

And for the second one:

def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction

So the second one is actually more typed than the first one, as the second one takes into account the type of the function passed as argument, whereas the first one erases the type of the function.

That's why on the first one you need to define return type, because spark needs this information but can't infer it from function passed as argument as its return type is erased, whereas in the second one the return type is inferred from function passed as argument.

Also the function got passed to udf, which is (x:Int) => x, clearly has its input type defined but Spark claiming You're using untyped Scala UDF, which does not have the input type information?

What is important here is not the function, but how Spark creates an UDF from this function.

In both cases, the function to be transformed to UDF has its input and return types defined, but those types are erased and not taken into account when creating UDF using udf(AnyRef, DataType).

Getting shortestPaths in GraphFrames with Java

This seems a bug in GraphFrames 0.8.0.

See Issue #367 in github.com



Related Topics



Leave a reply



Submit