Does Spark Predicate Pushdown Work with Jdbc

Does spark predicate pushdown work with JDBC?

Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE clause. Moreover it looks like it is limited to the logical conjunction (no IN and OR I am afraid) and simple predicates.

Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count() or sqlContext.sql("SELECT COUNT(*) FROM df") is translated to SELECT 1 FROM df and requires both substantial data transfer and processing using Spark.

Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table argument. It is less convenient than a predicate pushdown but otherwise works pretty well:

n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)

Note:

This behavior may be improved in the future, once Data Source API v2 is ready:

  • SPARK-15689
  • SPIP: Data Source API V2

How to prevent predicate pushdown?

A JIRA ticket has been opened for this issue. You can follow it here :
https://issues.apache.org/jira/browse/SPARK-24288

Spark Predicate pushdown not working on date

Not all filters can be pushed down. In general most filters that contain function calls like substring or unix_timestamp cannot be pushed down. The complete logic which filters will be pushed down is implemented in DataSourceStrategy.

A way to work around this limitation in this case would be to store the values of eventDTLocal as unix timestamps instead of strings in the parquet file and then filter on the specific milliseconds.

#create some test data
data = [(52.5151923, 13.3824107, 1618760421000),
(1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
.write.mode("overwrite").parquet("dataWithUnixTime")

#get the first and last millisecond of the day
#the timezone has probably to be adjusted
from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1

#run the query
df = spark.read.parquet("dataWithUnixTime") \
.filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")

The physical plan of df


== Physical Plan ==
*(1) Project [lat#9, lng#10, eventDTLocal#11L]
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
+- *(1) ColumnarToRow
+- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>

now contains the pushed filters GreaterThanOrEqual and LessThanOrEqual for the date column.

SPARK datasource V2 API clarification for Filter Push Down

Filter Pushdown in Data Source V2 API

In Data Source V2 API, only data sources with DataSourceReaders with SupportsPushDownFilters interface support Filter Pushdown performance optimization.

Whether a data source supports filter pushdown in Data Source V2 API is just a matter of checking out the underlying DataSourceReader.

For MySQL it'd be the JDBC data source which is represented by the JdbcRelationProvider that does not seem to support Data Source V2 API (via ReadSupport). In other words, I doubt that MySQL is supported by a Data Source V2 API data source and so no filter pushdown in the new Data Source V2 API is expected.

Filter Pushdown in Data Source V1 API

That does not preclude filter pushdown optimization to be used via some other non-Data Source V2 APIs, i.e. Data Source V1 API.

In the case of the JDBC data source the filter pushdown is indeed supported by the former PrunedFilteredScan contract (which nota bene is used by the JDBCRelation only). That's however Data Source V1 API.



Related Topics



Leave a reply



Submit