Spark Replacement for Exists and In

Spark replacement for EXISTS and IN

SparkSQL doesn't currently have EXISTS & IN. "(Latest) Spark SQL / DataFrames and Datasets Guide / Supported Hive Features"

EXISTS & IN can always be rewritten using JOIN or LEFT SEMI JOIN. "Although Apache Spark SQL currently does not support IN or EXISTS subqueries, you can efficiently implement the semantics by rewriting queries to use LEFT SEMI JOIN." OR can always be rewritten using UNION. AND NOT can be rewritten using EXCEPT.

A table holds the rows that make some predicate (statement parameterized by column names) true:

  • The DBA gives the predicates for each base table T with columns T.C,...: T(T.C,...)
  • A JOIN holds the rows that make the AND of its arguments' predicates true; for a UNION, the OR; for an EXCEPT, the AND NOT.
  • SELECT DISTINCTkept columnsFROMT holds the rows where EXISTS dropped columns [predicate of T].
  • TLEFT SEMI JOINU holds the rows where EXISTS U-only columns [predicate of T AND predicate of U].
  • TWHEREcondition holds the rows where predicate of T AND condition.

(Re querying generally see this answer.)

So by keeping in mind predicate expressions corresponding to SQL you can use straightforward logic rewrite rules to compose and/or reorganize queries. Eg using UNION here need not be "clumsy" either in terms of readability or execution.

Your original question indicated that you understood that you could use UNION and you have edited variants into your question that excise EXISTS and IN from your original queries. Here is another variant also excising OR.

    select <...>    
from A, B, C, (select ID from ...) as e
where
A.FK_1 = B.PK and
A.FK_2 = C.PK and
A.ID = e.id
union
select <...>
from A, B, C, (select ID from ...) as e
where
A.FK_1 = B.PK and
A.FK_2 = C.PK and
A.ID = e.ID

Your Solution 1 does not do what you think it does. If just one of the exists_clause tables are empty, ie even if there are ID matches available in the other, the FROM cross product of tables is empty and no rows are returned. ("An Unintuitive Consequence of SQL Semantics": Chapter 6 The Database Language SQL sidebar page 264 of Database Systems: The Complete Book 2nd Edition.) A FROM is not just introducing names for rows of tables, it is CROSS JOINing and/or OUTER JOINing them after which ON (for INNER JOINs) and WHERE filter some out.

Performance is typically different for different expressions returning the same rows. This depends on DBMS optimization. Many details, which the DBMS and/or programmer may be able to know and if so may or may not know and may or may not best balance, affect the best way to evaluate a query and the best way to write it. But executing two ORed subselects per row in a WHERE (as in your original queries but also your late Solution 2) is not necessarily better than running one UNION of two SELECTs (as in my query).

How to implement EXISTS condition as like SQL in spark Dataframe

LEFT SEMI JOIN is equivalent to the EXISTS function in Spark.

val cityDF= Seq(("Delhi","India"),("Kolkata","India"),("Mumbai","India"),("Nairobi","Kenya"),("Colombo","Srilanka")).toDF("City","Country")

df1

val CodeDF= Seq(("011","Delhi"),("022","Mumbai"),("033","Kolkata"),("044","Chennai")).toDF("Code","City")

df2

val finalDF= cityDF.join(CodeDF, cityDF("City") === CodeDF("City"), "left_semi")

df3

Spark IN/EXISTS predicate in SELECT statement

Here's the correct SQL query:

import sparkSession.implicits._

Seq("france").toDF("country").createOrReplaceTempView("countries")
Seq(("user1", "france"), ("user2", "italy"), ("user2", "usa"))
.toDF("user", "country").createOrReplaceTempView("users")

val query =
s"""
|SELECT
| CASE
| WHEN u.country = 'italy' THEN 'Italy'
| ELSE (
| CASE
| WHEN u.country = c.country THEN upper(u.country)
| ELSE u.country
| END
| ) END AS country
|FROM users u
|LEFT JOIN countries c
| ON u.country = c.country
""".stripMargin
sparkSession.sql(query).show()

Result:

+-------+
|country|
+-------+
| FRANCE|
| Italy|
| usa|
+-------+

The reason behind the scene you can use IN/EXISTS sql operators only in predicates is: logic in projections (CASE-WHEN in our case) evaluated for each row in data set returned from selection.
With this in mind, it's not the best idea to run equivalent of CASE WHEN country IN (SELECT * FROM countries) for each row from users table. So, SQL prevents this on language level (sql parser engine).

Pyspark replace strings in Spark dataframe column

For Spark 1.5 or later, you can use the functions package:

from pyspark.sql.functions import *
newDf = df.withColumn('address', regexp_replace('address', 'lane', 'ln'))

Quick explanation:

  • The function withColumn is called to add (or replace, if the name exists) a column to the data frame.
  • The function regexp_replace will generate a new column by replacing all substrings that match the pattern.

If a number exists in a string, replace the string with null - Spark

You can pattern match using regular expression with rlike:

val df = Seq(
("Steve", "10 C"),
("Mark", "9436"),
("Brian", "Lara")
).toDF(
"FIRSTNAME", "LASTNAME"
)

// Keep original LASTNAME in new column only if it doesn't consist of any digit
val df2 = df.withColumn( "LASTNAMEFIXED", when( ! col("LASTNAME").rlike(".*[0-9]+.*"), col("LASTNAME") ) )

+---------+--------+-------------+
|FIRSTNAME|LASTNAME|LASTNAMEFIXED|
+---------+--------+-------------+
| Steve| 10 C| null|
| Mark| 9436| null|
| Brian| Lara| Lara|
+---------+--------+-------------+

Replace a column value with NULL in PySpark

Test dataset:

df = spark.createDataFrame(
[(10, '2021-08-16 00:54:43+01', 0.15, 'SMS'),
(11, '2021-08-16 00:04:29+01', 0.15, '*'),
(12, '2021-08-16 00:39:05+01', 0.15, '***')],
['_c0', 'Timestamp', 'Amount','Channel']
)
df.show(truncate=False)
# +---+----------------------+------+-------+
# |_c0|Timestamp |Amount|Channel|
# +---+----------------------+------+-------+
# |10 |2021-08-16 00:54:43+01|0.15 |SMS |
# |11 |2021-08-16 00:04:29+01|0.15 |* |
# |12 |2021-08-16 00:39:05+01|0.15 |*** |
# +---+----------------------+------+-------+

Script:

from pyspark.sql import functions as F

df = df.withColumn('Channel', F.when(~F.col('Channel').rlike(r'[\*#]+'), F.col('Channel')))

df.show(truncate=False)
# +---+----------------------+------+-------+
# |_c0|Timestamp |Amount|Channel|
# +---+----------------------+------+-------+
# |10 |2021-08-16 00:54:43+01|0.15 |SMS |
# |11 |2021-08-16 00:04:29+01|0.15 |null |
# |12 |2021-08-16 00:39:05+01|0.15 |null |
# +---+----------------------+------+-------+

Filter rows if value exists in array column

For spark >=2.4, you can use the exists function of spark SQL.

df = df.withColumn('flag', F.expr('exists(col1, x -> get_json_object(x, "$.key2") == "val2")')) \
.filter(F.col('flag')).drop('flag')
df.show(truncate=False)


Related Topics



Leave a reply



Submit