Why using a UDF in a SQL query leads to cartesian product?
Why using UDFs leads to a Cartesian product instead of a full outer join?
The reason why using UDFs require Cartesian product is quite simple. Since you pass an arbitrary function with possibly infinite domain and non-deterministic behavior the only way to determine its value is to pass arguments and evaluate. It means you simply have to check all possible pairs.
Simple equality from the other hand has a predictable behavior. If you use t1.foo = t2.bar
condition you can simply shuffle t1
and t2
rows by foo
and bar
respectively to get expected result.
And just to be precise in the relational algebra outer join is actually expressed using natural join. Anything beyond that is simply an optimization.
Any way to force an outer join over the Cartesian product
Not really, unless you want to modify Spark SQL engine.
Why was the DelimitedSplit8k udf was written with 2X (cartesian product) in SQL server?
Although I am not Jeff Moden and do not know his reasoning, I find it likely that he simply used a known pattern for number generation which he himself calls Itzik Ben Gan's cross joined CTE method in this Stack Overflow answer.
The pattern goes like this:
WITH E00(N) AS (SELECT 1 UNION ALL SELECT 1),
E02(N) AS (SELECT 1 FROM E00 a, E00 b),
E04(N) AS (SELECT 1 FROM E02 a, E02 b),
E08(N) AS (SELECT 1 FROM E04 a, E04 b),
...
In order to adapt the method for his string splitting function, he apparently found it more convenient to modify the initial CTE to be ten rows instead of two and to cut down the number of cross joining CTEs to two to just cover the 8000 rows necessary for his solution.
How do I use a UDF with a select statement?
Add schema name:
SELECT *, dbo.AgeIn15(m.Date_of_Birth) AS col_name
FROM marketing_list AS m;
SELECT *
,dbo.AgeIn15(m.Date_of_Birth) AS col_name
FROM marketing_list m
WHERE dbo.AgeIn15(m.Date_of_Birth) > 45
Optionally you can use CROSS APPLY
:
Select *
from marketing_list AS m
CROSS APPLY (SELECT dbo.AgeIn15(m.Date_of_Birth)) AS c(col_name)
Demo
SELECT *, dbo.AgeIn15(t.c) AS col_name
FROM (SELECT '2000-01-01') AS t(c);
SELECT *
FROM (SELECT '2000-01-01') AS t(c)
CROSS APPLY (SELECT dbo.AgeIn15(t.c)) AS c(col_name)
Your function should looks like:
CREATE FUNCTION dbo.AgeIn15 (@DateOfBirth DATE)
RETURNS INTEGER
AS
BEGIN
RETURN DATEDIFF(year,@DateOfBirth,GETDATE()) + 15
END
What you did doesn't make any sense:
Convert(integer,DateAdd(year,15,DateDiff(year,@DateOfBirth,GetDate())))
- Calculate
DATEDIFF
- it returns number years OK DATEADD(year, 15, number)
- it returns date 15 years + of1900-01-number
FAILCONVERT(INTEGER, DATE)
- it returns number of days starting at1900-01-01
FAIL
SQL UDF and query optimization
A subquery will have better performance, but UDF can be reused much easier in other queries as well. You can use them to encapsulate specific calculations or logics at one place. If you need to change the logic you have to change only the UDF instead of changing all queries where you integrated that subquery.
At the end you gain flexibility but loose a performance when including the function in queries with huge amount of records.
Detected cartesian product for INNER join on literal column in PySpark
The problem is, that once you persist your data, second_id
is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin
on hash partitioned second_id
.
It would be trivial to achieve the same outcome, without persistence, using udf
from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
@pandas_udf('integer', PandasUDFType.SCALAR)
def identity(x):
return x
second_df = second_df.withColumn('second_id', identity(lit(1)))
result_df = first_df.join(second_df,
first_df.first_id == second_df.second_id,
'inner')
result_df.explain()
== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
: +- *(1) Filter isnotnull(first_id#4)
: +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(second_id#129, 200)
+- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
+- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
+- *(3) Project [some_value#6]
+- *(3) Filter isnotnull(pythonUDF0#153)
+- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
+- Scan ExistingRDD[some_value#6]
However SortMergeJoin
is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.
Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.
A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.
Optimise use of multiple user defined functions in join condition
I cannot find a way to rework this into a form making use of table value functions.
This is possible. Note, that this logic requires quadratically comparing all rows from A to all rows from B. This is necessarily rather slow.
So rewrite as follows:
SELECT ...
FROM A
CROSS JOIN B
CROSS APPLY dbo.MyPredicate(A.x, B.y) p
WHERE p.SomeValue = 1
The CROSS JOIN
is the quadratic comparison. After the join you have the values from both tables available to execute the join condition.
SQL in databricks gives either cartesian warning or just no result - what's wrong with my query?
Got it sorted out, it was a question of proper connecting the dots. Since it turned out to be some kind of copy/pasting I'm pretty sure there would be a better way to write the query (just imagine having 100+ tables...).
%sql
SELECT
COUNT(*)
FROM
(
SELECT --DISTINCT -- 77.629 when using DISTINCT
f1.pr_id,
f1.grid_id,
f1.grid_field_value AS LotSerial,
f2.grid_field_value AS CodePart,
f3.grid_field_value AS ProductFamilyName,
f4.grid_field_value AS LotDescription,
f5.grid_field_value AS ManufacturingLocation,
f6.grid_field_value AS QuantityUnitMeasure,
f7.grid_field_value AS RecommendedLotDisposition,
f8.grid_field_value AS DispositionDate,
f9.grid_field_value AS DispositionComment,
f10.grid_field_value AS RecommendedBy
FROM
GRID_DATA_DETAIL_TRKW_GLBL f1,
GRID_DATA_DETAIL_TRKW_GLBL f2,
GRID_DATA_DETAIL_TRKW_GLBL f3,
GRID_DATA_DETAIL_TRKW_GLBL f4,
GRID_DATA_DETAIL_TRKW_GLBL f5,
GRID_DATA_DETAIL_TRKW_GLBL f6,
GRID_DATA_DETAIL_TRKW_GLBL f7,
GRID_DATA_DETAIL_TRKW_GLBL f8,
GRID_DATA_DETAIL_TRKW_GLBL f9,
GRID_DATA_DETAIL_TRKW_GLBL f10
WHERE
f1.pr_id = f2.pr_id
AND f1.pr_id = f3.pr_id
AND f1.pr_id = f4.pr_id
AND f1.pr_id = f5.pr_id
AND f1.pr_id = f6.pr_id
AND f1.pr_id = f7.pr_id
AND f1.pr_id = f8.pr_id
AND f1.pr_id = f9.pr_id
AND f1.pr_id = f10.pr_id
AND f2.pr_id = f3.pr_id
AND f2.pr_id = f4.pr_id
AND f2.pr_id = f5.pr_id
AND f2.pr_id = f6.pr_id
AND f2.pr_id = f7.pr_id
AND f2.pr_id = f8.pr_id
AND f2.pr_id = f9.pr_id
AND f2.pr_id = f10.pr_id
AND f1.seq_no = f2.seq_no
AND f1.seq_no = f3.seq_no
AND f1.seq_no = f4.seq_no
AND f1.seq_no = f5.seq_no
AND f1.seq_no = f6.seq_no
AND f1.seq_no = f7.seq_no
AND f1.seq_no = f8.seq_no
AND f1.seq_no = f9.seq_no
AND f1.seq_no = f10.seq_no
AND f2.seq_no = f3.seq_no
AND f2.seq_no = f4.seq_no
AND f2.seq_no = f5.seq_no
AND f2.seq_no = f6.seq_no
AND f2.seq_no = f7.seq_no
AND f2.seq_no = f8.seq_no
AND f2.seq_no = f9.seq_no
AND f2.seq_no = f10.seq_no
AND f1.grid_id = f2.grid_id
AND f1.grid_id = f3.grid_id
AND f1.grid_id = f4.grid_id
AND f1.grid_id = f5.grid_id
AND f1.grid_id = f6.grid_id
AND f1.grid_id = f7.grid_id
AND f1.grid_id = f8.grid_id
AND f1.grid_id = f9.grid_id
AND f1.grid_id = f10.grid_id
AND f2.grid_id = f3.grid_id
AND f2.grid_id = f4.grid_id
AND f2.grid_id = f5.grid_id
AND f2.grid_id = f6.grid_id
AND f2.grid_id = f7.grid_id
AND f2.grid_id = f8.grid_id
AND f2.grid_id = f9.grid_id
AND f2.grid_id = f10.grid_id
AND f1.grid_field_nm = 'Lot#/Serial#'
AND f2.grid_field_nm = 'Code/Part#'
AND f3.grid_field_nm = 'Product Family Name'
AND f4.grid_field_nm = 'Lot Description'
AND f5.grid_field_nm = 'Manufacturing Location'
AND f6.grid_field_nm = 'Quantity/Unit of Measure'
AND f7.grid_field_nm = 'Recommended Lot Disposition'
AND f8.grid_field_nm = 'Disposition Date'
AND f9.grid_field_nm = 'Disposition Comment'
AND f10.grid_field_nm = 'Recommended By'
AND f2.grid_nm = "Lot Info. Grid" -- 77.730 rows
--AND f3.grid_nm = "Lot Info. Grid" -- no difference in result/rows when adding f3.grid
--AND "Lot Info. Grid" IN (f2.grid_nm, f3.grid_nm)
) AS result
The given excel sheet contains 10.487 rows whereas I get 77.730 rows.
I think the 10k results comes from having grid_id = 2257.
How can I add an additional filter here?
I tried:
%sql
SELECT
*
FROM
result
CASE
grid_id = 2257 (
SELECT
COUNT(*)
FROM
(
SELECT
--DISTINCT -- 77.629 when using DISTINCT
So please let me know if there is any error in my logic!!
Pyspark Dataframe Join using UDF
Spark 2.2+
You have to use crossJoin
or enable cross joins in the configuration:
df1.crossJoin(df2).where(my_join_udf(df1.col_a, df2.col_b))
Spark 2.0, 2.1
Method shown below doesn't work anymore in Spark 2.x. See SPARK-19728.
Spark 1.x
Theoretically you can join and filter:
df1.join(df2).where(my_join_udf(df1.col_a, df2.col_b))
but in general you shouldn't to it all. Any type of join
which is not based on equality requires a full Cartesian product (same as the answer) which is rarely acceptable (see also Why using a UDF in a SQL query leads to cartesian product?).
Related Topics
Renaming Foreign-Key Columns in MySQL
Does 'Select' Always Order by Primary Key
Achieving Row_Number/Partition by in Ms Access
Differencebetween Group by and Order by in SQL
Continuing a Transaction After Primary Key Violation Error
SQL Group By, Top N Items for Each Group
Call Dynamic SQL from Function
How to Edit a Table in Order to Enable Cascade Delete
Sql: Order by Using a Substring Within a Specific Column... Possible
How to Read Xml Column in SQL Server 2008
Ora 00904 Error:Invalid Identifier
Oracle Insert If Row Does Not Exist
Count Returning Blank Instead of 0
Oracle After Update Trigger: Solving Ora-04091 Mutating Table Error