Including null values in an Apache Spark Join
Spark provides a special NULL
safe equality operator:
numbersDf
.join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
.drop(lettersDf("numbers"))
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| null| zzz|
| | hhh|
+-------+-------+
Be careful not to use it with Spark 1.5 or earlier. Prior to Spark 1.6 it required a Cartesian product (SPARK-11111 - Fast null-safe join).
In Spark 2.3.0 or later you can use Column.eqNullSafe
in PySpark:
numbers_df = sc.parallelize([
("123", ), ("456", ), (None, ), ("", )
]).toDF(["numbers"])
letters_df = sc.parallelize([
("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
]).toDF(["numbers", "letters"])
numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
+-------+-------+-------+
|numbers|numbers|letters|
+-------+-------+-------+
| 456| 456| def|
| null| null| zzz|
| | | hhh|
| 123| 123| abc|
+-------+-------+-------+
and %<=>%
in SparkR:
numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
letters_df <- createDataFrame(data.frame(
numbers = c("123", "456", NA, ""),
letters = c("abc", "def", "zzz", "hhh")
))
head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
numbers numbers letters
1 456 456 def
2 <NA> <NA> zzz
3 hhh
4 123 123 abc
With SQL (Spark 2.2.0+) you can use IS NOT DISTINCT FROM
:
SELECT * FROM numbers JOIN letters
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers
This is can be used with DataFrame
API as well:
numbersDf.alias("numbers")
.join(lettersDf.alias("letters"))
.where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
Zero joins with null in the null-safe join
You need to do inner join here
df1.join(df2, df1.df1_id.eqNullSafe(df2.df2_id), 'inner').show()
Right now for 0 in right and there is no match in the left df, and we are doing right join that's why pyspark is keeping 0 in right df and it is becoming null in df1_id
.
An Apache Spark Join including null keys
You can use the <=>
equality operator which is null safe as shown here.
I added a schema to the dataframe creation as it seemed that without it the auto schema inference didn't give a type to the columns with only nulls and the join failed.
The resulting dataframe is exactly the one you wanted
import scala.collection.JavaConversions._
val data1 = Seq(
Row(601, null, null, "8121000868-10", "CN88"),
Row(3925, null, null, "8121000936-50", "CN88")
)
val schema1 = StructType(List(
StructField("id", IntegerType, false),
StructField("work_order_number", StringType, true),
StructField("work_order_item_number", StringType, true),
StructField("tally_number", StringType, true),
StructField("company_code", StringType, true)
))
val df1 = sparkSession.createDataFrame(data1, schema1)
val data2 = Seq(
Row(null, null, "8121000868-10", "CN88", "popo"),
Row(null, null, "8121000936-50", "CN88", "Smith")
)
val schema2 = StructType(Seq(
StructField("work_order_number", StringType, true),
StructField("work_order_item_number", StringType, true),
StructField("tally_number", StringType, true),
StructField("company_code", StringType, true),
StructField("name", StringType, false)
))
val df2 = sparkSession.createDataFrame(data2, schema2)
val final_df =
df1.join(df2, df1("tally_number") <=> df2("tally_number")
&& df1("work_order_number") <=> df2("work_order_number")
&& df1("work_order_item_number") <=> df2("work_order_item_number")
&& df1("company_code") <=> df2("company_code")
, "inner")
.select(df1("tally_number"),
df1("work_order_number"),
df1("work_order_item_number"),
df1("company_code"),
df1("id").as("tally_summary_id"),
df2("name"))
Pyspark - Join with null values in right dataset
You can join using when
or coalesce
expression like this:
from pyspark.sql import functions as F
join_cond = (
(F.coalesce(dataset_rules["A"], dataset_left["A"]) == dataset_left["A"])
& (F.coalesce(dataset_rules["B"], dataset_left["B"]) == dataset_left["B"])
)
result = dataset_left.join(dataset_rules, join_cond, "left").select(
dataset_left["*"],
dataset_rules["result_col"]
)
result.show()
#+------------+------------+------------+----------+
#| A| B| C|result_col|
#+------------+------------+------------+----------+
#|some_value_1|some_value_3|some_value_5| result_1|
#|some_value_2|some_value_4|some_value_6| result_2|
#+------------+------------+------------+----------+
Including null values in an Spark Join [Scala]
you could build up your custom join-condition like this :
val joinCondition = df1.columns.foldLeft(lit(true))((acc,c) => acc and (df1(c) === df2(c) or df1(c).isNull or df2(c).isNull))
df1.join(df2, joinCondition, "inner")
.select(df1("*"))
But as your df2
is empty in your testcase, this still results in a empty result. Can't you use union
or simply a left-join
?
Join apache spark dataframes properly with scala avoiding null values
use <=> operator in your joining column condition
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" <=> $"b.srcId")
There is a function in spark 2.1 or greater is eqNullSafe
var joinedDF = trainingDF.join(infoDF,trainingDF("srcId").eqNullSafe(infoDF("srcId")))
spark scala conditional join replace null values
Consider using coalesce
on col1
after performing the left join
. To handle both null
s and empty array
s (in the case of ArrayType) as per revised requirement in the comments section, a when/otherwise
clause is used, as shown below:
val df1 = Seq(
("foo", Some(Seq(0.1)), 10, "a"),
("bar", None, 20, "b"),
("hello", Some(Seq(0.1)), 30, "c"),
("foobar", Some(Seq()), 40, "d")
).toDF("name", "col1", "col2", "col3")
val df2 = Seq(
("lorem", Seq(0.1), "x"),
("bar", Seq(0.52), "y"),
("foobar", Seq(0.47), "z")
).toDF("name", "col1", "col7")
df1.
join(df2, Seq("name"), "left_outer").
select(
df1("name"),
coalesce(
when(lit(df1.schema("col1").dataType.typeName) === "array" && size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")),
df2("col1")
).as("col1")
).
show
/*
+------+------+
| name| col1|
+------+------+
| foo| [0.1]|
| bar|[0.52]|
| hello| [0.1]|
|foobar|[0.47]|
+------+------+
*/
UPDATE:
It appears that Spark, surprisingly, does not handle conditionA && conditionB
the way most other languages do -- even when conditionA
is false conditionB
will still be evaluated, and replacing &&
with nested when/otherwise
still would not resolve the issue. It might be due to limitations in how the internally translated case/when/else
SQL is executed.
As a result, the above when/otherwise
data-type check via array-specific function size()
fails when col1
is non-ArrayType. Given that, I would forgo the dynamic column type check and perform different queries based on whether col1
is ArrayType or not, assuming it's known upfront:
df1.
join(df2, Seq("name"), "left_outer").
select(
df1("name"),
coalesce(
when(size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")), // <-- if col1 is an array
// df1("col1"), // <-- if col1 is not an array
df2("col1")
).as("col1")
).
show
Related Topics
Protecting Against SQL Injection in Python
Insert Picture into SQL Server 2005 Image Field Using Only SQL
How to Use Structural Annotations to Set SQL Type to Date in Model First Approach
Group by Without Aggregate Function
How to Implement Referential Integrity in Subtypes
Fastest Check If Row Exists in Postgresql
SQL - Difference Between Coalesce and Isnull
SQL Server Union - What Is the Default Order by Behaviour
How to Reorder Rows in SQL Database
Case When Statement for Order by Clause
Calculate a Sum of Type Time Using SQL
How to Prevent a Database Trigger from Recursing
SQL Server Ignore Case in a Where Expression
How to Delete Duplicate Rows with SQL
How Can a Left Outer Join Return More Records Than Exist in the Left Table
Combining the Results of Two SQL Queries as Separate Columns