Why Using a Udf in a SQL Query Leads to Cartesian Product

Why using a UDF in a SQL query leads to cartesian product?


Why using UDFs leads to a Cartesian product instead of a full outer join?

The reason why using UDFs require Cartesian product is quite simple. Since you pass an arbitrary function with possibly infinite domain and non-deterministic behavior the only way to determine its value is to pass arguments and evaluate. It means you simply have to check all possible pairs.

Simple equality from the other hand has a predictable behavior. If you use t1.foo = t2.bar condition you can simply shuffle t1 and t2 rows by foo and bar respectively to get expected result.

And just to be precise in the relational algebra outer join is actually expressed using natural join. Anything beyond that is simply an optimization.

Any way to force an outer join over the Cartesian product

Not really, unless you want to modify Spark SQL engine.

Why was the DelimitedSplit8k udf was written with 2X (cartesian product) in SQL server?

Although I am not Jeff Moden and do not know his reasoning, I find it likely that he simply used a known pattern for number generation which he himself calls Itzik Ben Gan's cross joined CTE method in this Stack Overflow answer.

The pattern goes like this:

WITH E00(N) AS (SELECT 1 UNION ALL SELECT 1),
E02(N) AS (SELECT 1 FROM E00 a, E00 b),
E04(N) AS (SELECT 1 FROM E02 a, E02 b),
E08(N) AS (SELECT 1 FROM E04 a, E04 b),
...

In order to adapt the method for his string splitting function, he apparently found it more convenient to modify the initial CTE to be ten rows instead of two and to cut down the number of cross joining CTEs to two to just cover the 8000 rows necessary for his solution.

How do I use a UDF with a select statement?

Add schema name:

SELECT *, dbo.AgeIn15(m.Date_of_Birth) AS col_name
FROM marketing_list AS m;

SELECT *
,dbo.AgeIn15(m.Date_of_Birth) AS col_name
FROM marketing_list m
WHERE dbo.AgeIn15(m.Date_of_Birth) > 45

Optionally you can use CROSS APPLY:

Select *
from marketing_list AS m
CROSS APPLY (SELECT dbo.AgeIn15(m.Date_of_Birth)) AS c(col_name)

Demo

SELECT  *, dbo.AgeIn15(t.c) AS col_name
FROM (SELECT '2000-01-01') AS t(c);

SELECT *
FROM (SELECT '2000-01-01') AS t(c)
CROSS APPLY (SELECT dbo.AgeIn15(t.c)) AS c(col_name)

Your function should looks like:

CREATE FUNCTION dbo.AgeIn15 (@DateOfBirth DATE)
RETURNS INTEGER
AS
BEGIN
RETURN DATEDIFF(year,@DateOfBirth,GETDATE()) + 15
END

What you did doesn't make any sense:

Convert(integer,DateAdd(year,15,DateDiff(year,@DateOfBirth,GetDate())))
  1. Calculate DATEDIFF - it returns number years OK
  2. DATEADD(year, 15, number) - it returns date 15 years + of 1900-01-number FAIL
  3. CONVERT(INTEGER, DATE) - it returns number of days starting at 1900-01-01 FAIL

SQL UDF and query optimization

A subquery will have better performance, but UDF can be reused much easier in other queries as well. You can use them to encapsulate specific calculations or logics at one place. If you need to change the logic you have to change only the UDF instead of changing all queries where you integrated that subquery.
At the end you gain flexibility but loose a performance when including the function in queries with huge amount of records.

Detected cartesian product for INNER join on literal column in PySpark

The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.

It would be trivial to achieve the same outcome, without persistence, using udf

from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

@pandas_udf('integer', PandasUDFType.SCALAR)
def identity(x):
return x

second_df = second_df.withColumn('second_id', identity(lit(1)))

result_df = first_df.join(second_df,
first_df.first_id == second_df.second_id,
'inner')

result_df.explain()

== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
: +- *(1) Filter isnotnull(first_id#4)
: +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(second_id#129, 200)
+- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
+- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
+- *(3) Project [some_value#6]
+- *(3) Filter isnotnull(pythonUDF0#153)
+- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
+- Scan ExistingRDD[some_value#6]

However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.

Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.

A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.

Optimise use of multiple user defined functions in join condition


I cannot find a way to rework this into a form making use of table value functions.

This is possible. Note, that this logic requires quadratically comparing all rows from A to all rows from B. This is necessarily rather slow.

So rewrite as follows:

SELECT ...
FROM A
CROSS JOIN B
CROSS APPLY dbo.MyPredicate(A.x, B.y) p
WHERE p.SomeValue = 1

The CROSS JOIN is the quadratic comparison. After the join you have the values from both tables available to execute the join condition.

SQL in databricks gives either cartesian warning or just no result - what's wrong with my query?

Got it sorted out, it was a question of proper connecting the dots. Since it turned out to be some kind of copy/pasting I'm pretty sure there would be a better way to write the query (just imagine having 100+ tables...).

%sql
SELECT
COUNT(*)
FROM
(
SELECT --DISTINCT -- 77.629 when using DISTINCT
f1.pr_id,
f1.grid_id,
f1.grid_field_value AS LotSerial,
f2.grid_field_value AS CodePart,
f3.grid_field_value AS ProductFamilyName,
f4.grid_field_value AS LotDescription,
f5.grid_field_value AS ManufacturingLocation,
f6.grid_field_value AS QuantityUnitMeasure,
f7.grid_field_value AS RecommendedLotDisposition,
f8.grid_field_value AS DispositionDate,
f9.grid_field_value AS DispositionComment,
f10.grid_field_value AS RecommendedBy
FROM
GRID_DATA_DETAIL_TRKW_GLBL f1,
GRID_DATA_DETAIL_TRKW_GLBL f2,
GRID_DATA_DETAIL_TRKW_GLBL f3,
GRID_DATA_DETAIL_TRKW_GLBL f4,
GRID_DATA_DETAIL_TRKW_GLBL f5,
GRID_DATA_DETAIL_TRKW_GLBL f6,
GRID_DATA_DETAIL_TRKW_GLBL f7,
GRID_DATA_DETAIL_TRKW_GLBL f8,
GRID_DATA_DETAIL_TRKW_GLBL f9,
GRID_DATA_DETAIL_TRKW_GLBL f10
WHERE
f1.pr_id = f2.pr_id
AND f1.pr_id = f3.pr_id
AND f1.pr_id = f4.pr_id
AND f1.pr_id = f5.pr_id
AND f1.pr_id = f6.pr_id
AND f1.pr_id = f7.pr_id
AND f1.pr_id = f8.pr_id
AND f1.pr_id = f9.pr_id
AND f1.pr_id = f10.pr_id

AND f2.pr_id = f3.pr_id
AND f2.pr_id = f4.pr_id
AND f2.pr_id = f5.pr_id
AND f2.pr_id = f6.pr_id
AND f2.pr_id = f7.pr_id
AND f2.pr_id = f8.pr_id
AND f2.pr_id = f9.pr_id
AND f2.pr_id = f10.pr_id

AND f1.seq_no = f2.seq_no
AND f1.seq_no = f3.seq_no
AND f1.seq_no = f4.seq_no
AND f1.seq_no = f5.seq_no
AND f1.seq_no = f6.seq_no
AND f1.seq_no = f7.seq_no
AND f1.seq_no = f8.seq_no
AND f1.seq_no = f9.seq_no
AND f1.seq_no = f10.seq_no

AND f2.seq_no = f3.seq_no
AND f2.seq_no = f4.seq_no
AND f2.seq_no = f5.seq_no
AND f2.seq_no = f6.seq_no
AND f2.seq_no = f7.seq_no
AND f2.seq_no = f8.seq_no
AND f2.seq_no = f9.seq_no
AND f2.seq_no = f10.seq_no

AND f1.grid_id = f2.grid_id
AND f1.grid_id = f3.grid_id
AND f1.grid_id = f4.grid_id
AND f1.grid_id = f5.grid_id
AND f1.grid_id = f6.grid_id
AND f1.grid_id = f7.grid_id
AND f1.grid_id = f8.grid_id
AND f1.grid_id = f9.grid_id
AND f1.grid_id = f10.grid_id

AND f2.grid_id = f3.grid_id
AND f2.grid_id = f4.grid_id
AND f2.grid_id = f5.grid_id
AND f2.grid_id = f6.grid_id
AND f2.grid_id = f7.grid_id
AND f2.grid_id = f8.grid_id
AND f2.grid_id = f9.grid_id
AND f2.grid_id = f10.grid_id

AND f1.grid_field_nm = 'Lot#/Serial#'
AND f2.grid_field_nm = 'Code/Part#'
AND f3.grid_field_nm = 'Product Family Name'
AND f4.grid_field_nm = 'Lot Description'
AND f5.grid_field_nm = 'Manufacturing Location'
AND f6.grid_field_nm = 'Quantity/Unit of Measure'
AND f7.grid_field_nm = 'Recommended Lot Disposition'
AND f8.grid_field_nm = 'Disposition Date'
AND f9.grid_field_nm = 'Disposition Comment'
AND f10.grid_field_nm = 'Recommended By'

AND f2.grid_nm = "Lot Info. Grid" -- 77.730 rows
--AND f3.grid_nm = "Lot Info. Grid" -- no difference in result/rows when adding f3.grid
--AND "Lot Info. Grid" IN (f2.grid_nm, f3.grid_nm)
) AS result

The given excel sheet contains 10.487 rows whereas I get 77.730 rows.
I think the 10k results comes from having grid_id = 2257.
How can I add an additional filter here?

I tried:

%sql
SELECT
*
FROM
result
CASE
grid_id = 2257 (
SELECT
COUNT(*)
FROM
(
SELECT
--DISTINCT -- 77.629 when using DISTINCT

So please let me know if there is any error in my logic!!

Pyspark Dataframe Join using UDF

Spark 2.2+

You have to use crossJoin or enable cross joins in the configuration:

df1.crossJoin(df2).where(my_join_udf(df1.col_a, df2.col_b))

Spark 2.0, 2.1

Method shown below doesn't work anymore in Spark 2.x. See SPARK-19728.

Spark 1.x

Theoretically you can join and filter:

df1.join(df2).where(my_join_udf(df1.col_a, df2.col_b))

but in general you shouldn't to it all. Any type of join which is not based on equality requires a full Cartesian product (same as the answer) which is rarely acceptable (see also Why using a UDF in a SQL query leads to cartesian product?).



Related Topics



Leave a reply



Submit