Does Sparksql Support Subquery

Does SparkSQL support subquery?

Planned features:

  • SPARK-23945 (Column.isin() should accept a single-column DataFrame as input).
  • SPARK-18455 (General support for correlated subquery processing).

Spark 2.0+

Spark SQL should support both correlated and uncorrelated subqueries. See SubquerySuite for details. Some examples include:

select * from l where exists (select * from r where l.a = r.c)
select * from l where not exists (select * from r where l.a = r.c)

select * from l where l.a in (select c from r)
select * from l where a not in (select c from r)

Unfortunately as for now (Spark 2.0) it is impossible to express the same logic using DataFrame DSL.

Spark < 2.0

Spark supports subqueries in the FROM clause (same as Hive <= 0.12).

SELECT col FROM (SELECT *  FROM t1 WHERE bar) t2

It simply doesn't support subqueries in the WHERE clause.Generally speaking arbitrary subqueries (in particular correlated subqueries) couldn't be expressed using Spark without promoting to Cartesian join.

Since subquery performance is usually a significant issue in a typical relational system and every subquery can be expressed using JOIN there is no loss-of-function here.

spark SQL scala DSL subquery support

Your comment is correct. Your question is a little vague. However, I take your point and find also the concepts fine and also subject to this sort of question, so there you go.

So, this is now possible for the DataFrame API, not DataSet or DSL as you state.

 SELECT A.dep_id,
A.employee_id,
A.age,
(SELECT MAX(age)
FROM employee B
WHERE A.dep_id = B.dep_id) max_age
FROM employee A
ORDER BY 1,2

An example - borrowed from the Internet, shows clearly the distinction between DS and DF implying that a SPARK SQL correlated sub-query (not shown here of course) does also not happen against a DataSet - by deduction:

sql("SELECT COUNT(*) FROM src").show()
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val stringsDS = sqlDF.map {case Row(key: Int, value: String) => s"Key: $key, Value: $value"}
stringsDS.show()

The SQL runs against some source like Hive or Parquet or against SPARK TempViews, not against a DS. From a DF you can go to the DS and then enjoy the more typesafe approach, but only with the limited interface on select. I did a good search to find something that disproves this, but this is not the case. DS and DF are sort of interchangeable anyway as I have stated I think to you earlier. But, I see you are very thorough!

Moreover, there are at least 2 techniques for converting the Nested-Correlated=Subqueries to "normal" JOINs which is what SPARK and indeed other Optimizers do in the background. E.g. RewriteCorrelatedScalarSubquery and PullupCorrelatedPredicate.

But for a DSL, which you allude to, you can re-write your query by hand to achieve the same, by using JOIN, LEFT JOIN, OUTER JOIN, whatever the case may be. Except that is not so obvious for all oddly enough.

Does Spark support subqqueries?

Spark 2.0.0+:

since 2.0.0 Spark supports a full range of subqueries. See Does SparkSQL support subquery? for details.

Spark < 2.0.0

Does Spark support subqqueries?

Generally speaking it does. Constructs like SELECT * FROM (SELECT * FROM foo WHERE bar = 1) as tmp perfectly valid queries in the Spark SQL.

As far as I can tell from the Catalyst parser source it doesn't support inner queries in a NOT IN clause:

| termExpression ~ (NOT ~ IN ~ "(" ~> rep1sep(termExpression, ",")) <~ ")" ^^ {
case e1 ~ e2 => Not(In(e1, e2))
}

It is still possible to use outer join followed by filter to obtain the same effect.

SparkSQL subquery and performance

I think that:

CACHE TABLE tbl  as in sql("CACHE TABLE tbl")

is what you need to be executed after your:

...createOrReplaceTempView....

but before the larger queries of course.

In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.

Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.

Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.

You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.

As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.

Running subqueries in pyspark using where or filter statement

You need to collect the max time into a numerical variable in Python before putting it in the filter:

tst.where(F.col('time') > tst_sub.select(F.max('time')).head()[0]).show()
+------+----+
|sample|time|
+------+----+
| 1| 5|
| 1| 6|
+------+----+

Correlated sub query column in SPARK SQL is not allowed as part of a non-equality predicate

I did this with SCALA so you will need to convert but in a far easier way I think. I added a key and did at key level, you can adapt and aggr that out. But principle is far simpler. No correlated sub queries required. Just relational calculus. Used number for dates, etc.

// SCALA 
// Slightly ambiguous on hols vs. weekend, as you stated treated as 1

import spark.implicits._
import org.apache.spark.sql.functions._

val dfE = Seq(
("NIC", 1, false, false),
("NIC", 2, false, false),
("NIC", 3, true, false),
("NIC", 4, true, true),
("NIC", 5, false, false),
("NIC", 6, false, false),
("XYZ", 1, false, true)
).toDF("e","d","w", "h")
//dfE.show(false)

val dfE2 = dfE.withColumn("wh", when ($"w" or $"h", 1) otherwise (0)).drop("w").drop("h")
//dfE2.show()

//Assuming more dfD's can exist
val dfD = Seq(
("NIC", 1, 4, "k1"),
("NIC", 2, 3, "k2"),
("NIC", 1, 1, "k3"),
("NIC", 7, 10, "k4")
).toDF("e","pd","dd", "k")
//dfD.show(false)

dfE2.createOrReplaceTempView("E2")
dfD.createOrReplaceTempView("D1")

// This done per record, if over identical keys, then strip k and aggr otherwise, I added k for checking each entry
// Point is it is far easier. Key means synthetic grouping by.

val q=sqlContext.sql(""" SELECT d1.k, d1.e, d1.pd, d1.dd, sum(e2.wh)
FROM D1, E2
WHERE D1.e = E2.e
AND E2.d >= D1.pd
AND E2.d <= D1.dd
GROUP BY d1.k, d1.e, d1.pd, d1.dd
ORDER BY d1.k, d1.e, d1.pd, d1.dd
""")
q.show

returns:

 +---+---+---+---+-------+
| k| e| pd| dd|sum(wh)|
+---+---+---+---+-------+
| k1|NIC| 1| 4| 2|
| k2|NIC| 2| 3| 1|
| k3|NIC| 1| 1| 0|
+---+---+---+---+-------+

I think a simple performance improvement can be made. No correlated stuff req'd in fact.

Can use AND E2.d BETWEEN D1.pd AND D1.dd if you want.



Related Topics



Leave a reply



Submit