Including Null Values in an Apache Spark Join

Including null values in an Apache Spark Join

Spark provides a special NULL safe equality operator:

numbersDf
.join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
.drop(lettersDf("numbers"))
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| null| zzz|
| | hhh|
+-------+-------+

Be careful not to use it with Spark 1.5 or earlier. Prior to Spark 1.6 it required a Cartesian product (SPARK-11111 - Fast null-safe join).

In Spark 2.3.0 or later you can use Column.eqNullSafe in PySpark:

numbers_df = sc.parallelize([
("123", ), ("456", ), (None, ), ("", )
]).toDF(["numbers"])

letters_df = sc.parallelize([
("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
]).toDF(["numbers", "letters"])

numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
+-------+-------+-------+
|numbers|numbers|letters|
+-------+-------+-------+
| 456| 456| def|
| null| null| zzz|
| | | hhh|
| 123| 123| abc|
+-------+-------+-------+

and %<=>% in SparkR:

numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
letters_df <- createDataFrame(data.frame(
numbers = c("123", "456", NA, ""),
letters = c("abc", "def", "zzz", "hhh")
))

head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
  numbers numbers letters
1 456 456 def
2 <NA> <NA> zzz
3 hhh
4 123 123 abc

With SQL (Spark 2.2.0+) you can use IS NOT DISTINCT FROM:

SELECT * FROM numbers JOIN letters 
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers

This is can be used with DataFrame API as well:

numbersDf.alias("numbers")
.join(lettersDf.alias("letters"))
.where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")

Zero joins with null in the null-safe join

You need to do inner join here

df1.join(df2, df1.df1_id.eqNullSafe(df2.df2_id), 'inner').show()

Right now for 0 in right and there is no match in the left df, and we are doing right join that's why pyspark is keeping 0 in right df and it is becoming null in df1_id.

An Apache Spark Join including null keys

You can use the <=> equality operator which is null safe as shown here.

I added a schema to the dataframe creation as it seemed that without it the auto schema inference didn't give a type to the columns with only nulls and the join failed.

The resulting dataframe is exactly the one you wanted

import scala.collection.JavaConversions._

val data1 = Seq(
Row(601, null, null, "8121000868-10", "CN88"),
Row(3925, null, null, "8121000936-50", "CN88")
)

val schema1 = StructType(List(
StructField("id", IntegerType, false),
StructField("work_order_number", StringType, true),
StructField("work_order_item_number", StringType, true),
StructField("tally_number", StringType, true),
StructField("company_code", StringType, true)
))

val df1 = sparkSession.createDataFrame(data1, schema1)

val data2 = Seq(
Row(null, null, "8121000868-10", "CN88", "popo"),
Row(null, null, "8121000936-50", "CN88", "Smith")
)

val schema2 = StructType(Seq(
StructField("work_order_number", StringType, true),
StructField("work_order_item_number", StringType, true),
StructField("tally_number", StringType, true),
StructField("company_code", StringType, true),
StructField("name", StringType, false)
))

val df2 = sparkSession.createDataFrame(data2, schema2)


val final_df =
df1.join(df2, df1("tally_number") <=> df2("tally_number")
&& df1("work_order_number") <=> df2("work_order_number")
&& df1("work_order_item_number") <=> df2("work_order_item_number")
&& df1("company_code") <=> df2("company_code")
, "inner")
.select(df1("tally_number"),
df1("work_order_number"),
df1("work_order_item_number"),
df1("company_code"),
df1("id").as("tally_summary_id"),
df2("name"))

Pyspark - Join with null values in right dataset

You can join using when or coalesce expression like this:

from pyspark.sql import functions as F

join_cond = (
(F.coalesce(dataset_rules["A"], dataset_left["A"]) == dataset_left["A"])
& (F.coalesce(dataset_rules["B"], dataset_left["B"]) == dataset_left["B"])
)

result = dataset_left.join(dataset_rules, join_cond, "left").select(
dataset_left["*"],
dataset_rules["result_col"]
)

result.show()
#+------------+------------+------------+----------+
#| A| B| C|result_col|
#+------------+------------+------------+----------+
#|some_value_1|some_value_3|some_value_5| result_1|
#|some_value_2|some_value_4|some_value_6| result_2|
#+------------+------------+------------+----------+

Including null values in an Spark Join [Scala]

you could build up your custom join-condition like this :

val joinCondition = df1.columns.foldLeft(lit(true))((acc,c) => acc and  (df1(c) === df2(c) or df1(c).isNull or df2(c).isNull))

df1.join(df2, joinCondition, "inner")
.select(df1("*"))

But as your df2 is empty in your testcase, this still results in a empty result. Can't you use union or simply a left-join?

Join apache spark dataframes properly with scala avoiding null values

use <=> operator in your joining column condition

var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" <=> $"b.srcId") 

There is a function in spark 2.1 or greater is eqNullSafe

var joinedDF = trainingDF.join(infoDF,trainingDF("srcId").eqNullSafe(infoDF("srcId")))

spark scala conditional join replace null values

Consider using coalesce on col1 after performing the left join. To handle both nulls and empty arrays (in the case of ArrayType) as per revised requirement in the comments section, a when/otherwise clause is used, as shown below:

val df1 = Seq(
("foo", Some(Seq(0.1)), 10, "a"),
("bar", None, 20, "b"),
("hello", Some(Seq(0.1)), 30, "c"),
("foobar", Some(Seq()), 40, "d")
).toDF("name", "col1", "col2", "col3")

val df2 = Seq(
("lorem", Seq(0.1), "x"),
("bar", Seq(0.52), "y"),
("foobar", Seq(0.47), "z")
).toDF("name", "col1", "col7")

df1.
join(df2, Seq("name"), "left_outer").
select(
df1("name"),
coalesce(
when(lit(df1.schema("col1").dataType.typeName) === "array" && size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")),
df2("col1")
).as("col1")
).
show
/*
+------+------+
| name| col1|
+------+------+
| foo| [0.1]|
| bar|[0.52]|
| hello| [0.1]|
|foobar|[0.47]|
+------+------+
*/

UPDATE:

It appears that Spark, surprisingly, does not handle conditionA && conditionB the way most other languages do -- even when conditionA is false conditionB will still be evaluated, and replacing && with nested when/otherwise still would not resolve the issue. It might be due to limitations in how the internally translated case/when/else SQL is executed.

As a result, the above when/otherwise data-type check via array-specific function size() fails when col1 is non-ArrayType. Given that, I would forgo the dynamic column type check and perform different queries based on whether col1 is ArrayType or not, assuming it's known upfront:

df1.
join(df2, Seq("name"), "left_outer").
select(
df1("name"),
coalesce(
when(size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")), // <-- if col1 is an array
// df1("col1"), // <-- if col1 is not an array
df2("col1")
).as("col1")
).
show


Related Topics



Leave a reply



Submit