Does spark predicate pushdown work with JDBC?
Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE
clause. Moreover it looks like it is limited to the logical conjunction (no IN
and OR
I am afraid) and simple predicates.
Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count()
or sqlContext.sql("SELECT COUNT(*) FROM df")
is translated to SELECT 1 FROM df
and requires both substantial data transfer and processing using Spark.
Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table
argument. It is less convenient than a predicate pushdown but otherwise works pretty well:
n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)
Note:
This behavior may be improved in the future, once Data Source API v2 is ready:
- SPARK-15689
- SPIP: Data Source API V2
How to prevent predicate pushdown?
A JIRA ticket has been opened for this issue. You can follow it here :
https://issues.apache.org/jira/browse/SPARK-24288
Spark Predicate pushdown not working on date
Not all filters can be pushed down. In general most filters that contain function calls like substring
or unix_timestamp
cannot be pushed down. The complete logic which filters will be pushed down is implemented in DataSourceStrategy.
A way to work around this limitation in this case would be to store the values of eventDTLocal
as unix timestamps instead of strings in the parquet file and then filter on the specific milliseconds.
#create some test data
data = [(52.5151923, 13.3824107, 1618760421000),
(1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
.write.mode("overwrite").parquet("dataWithUnixTime")
#get the first and last millisecond of the day
#the timezone has probably to be adjusted
from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1
#run the query
df = spark.read.parquet("dataWithUnixTime") \
.filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")
The physical plan of df
== Physical Plan ==
*(1) Project [lat#9, lng#10, eventDTLocal#11L]
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
+- *(1) ColumnarToRow
+- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>
now contains the pushed filters GreaterThanOrEqual
and LessThanOrEqual
for the date column.
SPARK datasource V2 API clarification for Filter Push Down
Filter Pushdown in Data Source V2 API
In Data Source V2 API, only data sources with DataSourceReaders with SupportsPushDownFilters interface support Filter Pushdown performance optimization.
Whether a data source supports filter pushdown in Data Source V2 API is just a matter of checking out the underlying DataSourceReader
.
For MySQL it'd be the JDBC data source which is represented by the JdbcRelationProvider that does not seem to support Data Source V2 API (via ReadSupport). In other words, I doubt that MySQL is supported by a Data Source V2 API data source and so no filter pushdown in the new Data Source V2 API is expected.
Filter Pushdown in Data Source V1 API
That does not preclude filter pushdown optimization to be used via some other non-Data Source V2 APIs, i.e. Data Source V1 API.
In the case of the JDBC data source the filter pushdown is indeed supported by the former PrunedFilteredScan
contract (which nota bene is used by the JDBCRelation only). That's however Data Source V1 API.
Related Topics
How to Perform HTML Decoding/Encoding Using Python/Django
Convert Image from Pil to Opencv Format
Calculating Arithmetic Mean (One Type of Average) in Python
Angles Between Two N-Dimensional Vectors in Python
Matplotlib: Overlay Plots with Different Scales
How to Enumerate an Object's Properties in Python
Schedule Python Script - Windows 7
Right Way to Reverse a Pandas Dataframe
Python: Syntaxerror: Eol While Scanning String Literal
How to Give a Pandas/Matplotlib Bar Graph Custom Colors
Check If String Contains Only Whitespace
What Is the Most Efficient Way to Get First and Last Line of a Text File
2D List Has Weird Behavor When Trying to Modify a Single Value
Python Selenium Chrome Webdriver
How to Select Python Version in Pycharm
What's the Best Way to Find the Inverse of Datetime.Isocalendar()