Spark replacement for EXISTS and IN
SparkSQL doesn't currently have EXISTS & IN. "(Latest) Spark SQL / DataFrames and Datasets Guide / Supported Hive Features"
EXISTS & IN can always be rewritten using JOIN or LEFT SEMI JOIN. "Although Apache Spark SQL currently does not support IN or EXISTS subqueries, you can efficiently implement the semantics by rewriting queries to use LEFT SEMI JOIN." OR can always be rewritten using UNION. AND NOT can be rewritten using EXCEPT.
A table holds the rows that make some predicate (statement parameterized by column names) true:
- The DBA gives the predicates for each base table
T
with columnsT.C,...
: T(T.C,...) - A
JOIN
holds the rows that make the AND of its arguments' predicates true; for aUNION
, the OR; for anEXCEPT
, the AND NOT. SELECT DISTINCT
kept columns
FROM
T
holds the rows where EXISTS dropped columns [predicate of T].T
LEFT SEMI JOIN
U
holds the rows where EXISTS U-only columns [predicate of T AND predicate of U].T
WHERE
condition
holds the rows where predicate of T AND condition.
(Re querying generally see this answer.)
So by keeping in mind predicate expressions corresponding to SQL you can use straightforward logic rewrite rules to compose and/or reorganize queries. Eg using UNION here need not be "clumsy" either in terms of readability or execution.
Your original question indicated that you understood that you could use UNION and you have edited variants into your question that excise EXISTS and IN from your original queries. Here is another variant also excising OR.
select <...>
from A, B, C, (select ID from ...) as e
where
A.FK_1 = B.PK and
A.FK_2 = C.PK and
A.ID = e.id
union
select <...>
from A, B, C, (select ID from ...) as e
where
A.FK_1 = B.PK and
A.FK_2 = C.PK and
A.ID = e.ID
Your Solution 1 does not do what you think it does. If just one of the exists_clause
tables are empty, ie even if there are ID
matches available in the other, the FROM cross product of tables is empty and no rows are returned. ("An Unintuitive Consequence of SQL Semantics": Chapter 6 The Database Language SQL sidebar page 264 of Database Systems: The Complete Book 2nd Edition.) A FROM is not just introducing names for rows of tables, it is CROSS JOINing and/or OUTER JOINing them after which ON (for INNER JOINs) and WHERE filter some out.
Performance is typically different for different expressions returning the same rows. This depends on DBMS optimization. Many details, which the DBMS and/or programmer may be able to know and if so may or may not know and may or may not best balance, affect the best way to evaluate a query and the best way to write it. But executing two ORed subselects per row in a WHERE (as in your original queries but also your late Solution 2) is not necessarily better than running one UNION of two SELECTs (as in my query).
How to implement EXISTS condition as like SQL in spark Dataframe
LEFT SEMI JOIN
is equivalent to the EXISTS
function in Spark.
val cityDF= Seq(("Delhi","India"),("Kolkata","India"),("Mumbai","India"),("Nairobi","Kenya"),("Colombo","Srilanka")).toDF("City","Country")
val CodeDF= Seq(("011","Delhi"),("022","Mumbai"),("033","Kolkata"),("044","Chennai")).toDF("Code","City")
val finalDF= cityDF.join(CodeDF, cityDF("City") === CodeDF("City"), "left_semi")
Spark IN/EXISTS predicate in SELECT statement
Here's the correct SQL query:
import sparkSession.implicits._
Seq("france").toDF("country").createOrReplaceTempView("countries")
Seq(("user1", "france"), ("user2", "italy"), ("user2", "usa"))
.toDF("user", "country").createOrReplaceTempView("users")
val query =
s"""
|SELECT
| CASE
| WHEN u.country = 'italy' THEN 'Italy'
| ELSE (
| CASE
| WHEN u.country = c.country THEN upper(u.country)
| ELSE u.country
| END
| ) END AS country
|FROM users u
|LEFT JOIN countries c
| ON u.country = c.country
""".stripMargin
sparkSession.sql(query).show()
Result:
+-------+
|country|
+-------+
| FRANCE|
| Italy|
| usa|
+-------+
The reason behind the scene you can use IN/EXISTS
sql operators only in predicates is: logic in projections (CASE-WHEN
in our case) evaluated for each row in data set returned from selection.
With this in mind, it's not the best idea to run equivalent of CASE WHEN country IN (SELECT * FROM countries)
for each row from users
table. So, SQL prevents this on language level (sql parser engine).
Pyspark replace strings in Spark dataframe column
For Spark 1.5 or later, you can use the functions package:
from pyspark.sql.functions import *
newDf = df.withColumn('address', regexp_replace('address', 'lane', 'ln'))
Quick explanation:
- The function
withColumn
is called to add (or replace, if the name exists) a column to the data frame. - The function
regexp_replace
will generate a new column by replacing all substrings that match the pattern.
If a number exists in a string, replace the string with null - Spark
You can pattern match using regular expression with rlike
:
val df = Seq(
("Steve", "10 C"),
("Mark", "9436"),
("Brian", "Lara")
).toDF(
"FIRSTNAME", "LASTNAME"
)
// Keep original LASTNAME in new column only if it doesn't consist of any digit
val df2 = df.withColumn( "LASTNAMEFIXED", when( ! col("LASTNAME").rlike(".*[0-9]+.*"), col("LASTNAME") ) )
+---------+--------+-------------+
|FIRSTNAME|LASTNAME|LASTNAMEFIXED|
+---------+--------+-------------+
| Steve| 10 C| null|
| Mark| 9436| null|
| Brian| Lara| Lara|
+---------+--------+-------------+
Replace a column value with NULL in PySpark
Test dataset:
df = spark.createDataFrame(
[(10, '2021-08-16 00:54:43+01', 0.15, 'SMS'),
(11, '2021-08-16 00:04:29+01', 0.15, '*'),
(12, '2021-08-16 00:39:05+01', 0.15, '***')],
['_c0', 'Timestamp', 'Amount','Channel']
)
df.show(truncate=False)
# +---+----------------------+------+-------+
# |_c0|Timestamp |Amount|Channel|
# +---+----------------------+------+-------+
# |10 |2021-08-16 00:54:43+01|0.15 |SMS |
# |11 |2021-08-16 00:04:29+01|0.15 |* |
# |12 |2021-08-16 00:39:05+01|0.15 |*** |
# +---+----------------------+------+-------+
Script:
from pyspark.sql import functions as F
df = df.withColumn('Channel', F.when(~F.col('Channel').rlike(r'[\*#]+'), F.col('Channel')))
df.show(truncate=False)
# +---+----------------------+------+-------+
# |_c0|Timestamp |Amount|Channel|
# +---+----------------------+------+-------+
# |10 |2021-08-16 00:54:43+01|0.15 |SMS |
# |11 |2021-08-16 00:04:29+01|0.15 |null |
# |12 |2021-08-16 00:39:05+01|0.15 |null |
# +---+----------------------+------+-------+
Filter rows if value exists in array column
For spark >=2.4, you can use the exists
function of spark SQL.
df = df.withColumn('flag', F.expr('exists(col1, x -> get_json_object(x, "$.key2") == "val2")')) \
.filter(F.col('flag')).drop('flag')
df.show(truncate=False)
Related Topics
How to Copy a Record in a SQL Table But Swap Out the Unique Id of the New Row
How to Delete the Top 1000 Rows from a Table Using SQL Server 2008
SQL Query for Getting Data for Last 3 Months
Redundant Data in Update Statements
Combinations (Not Permutations) from Cross Join in SQL
Function-Based Indexes in SQL Server
Oracle Trigger Error Ora-04091
MySQL 'Create Schema' and 'Create Database' - Is There Any Difference
What Is the Meaning of Select ... for Xml Path(' '),1,1)
How to Select Only the First Rows for Each Unique Value of a Column
The Transaction Log for the Database Is Full
SQL Error: Ora-00942 Table or View Does Not Exist
How to Generate Ranks in MySQL
How to Search SQL Column Containing JSON Array
How to Handle Optional Parameters in SQL Query