How to use JDBC source to write and read data in (Py)Spark?
Writing data
Include applicable JDBC driver when you submit the application or start shell. You can use for example
--packages
:bin/pyspark --packages group:name:version
or combining driver-class-path
and jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
These properties can be also set using PYSPARK_SUBMIT_ARGS
environment variable before JVM instance has been started or using conf/spark-defaults.conf
to set spark.jars.packages
or spark.jars
/ spark.driver.extraClassPath
.
Choose desired mode. Spark JDBC writer supports following modes:
append
: Append contents of this :class:DataFrame
to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
(default case): Throw an exception if data already exists.
Upserts or other fine-grained modifications are not supported
mode = ...
Prepare JDBC URI, for example:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"(Optional) Create a dictionary of JDBC arguments.
properties = {
"user": "foo",
"password": "bar"
}properties
/options
can be also used to set supported JDBC connection properties.Use
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
to save the data (see pyspark.sql.DataFrameWriter
for details).
Known issues:
Suitable driver cannot be found when driver has been included using
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)Assuming there is no driver version mismatch to solve this you can add
driver
class to theproperties
. For example:properties = {
...
"driver": "org.postgresql.Driver"
}using
df.write.format("jdbc").options(...).save()
may result in:java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
Solution unknown.
in Pyspark 1.3 you can try calling Java method directly:
df._jdf.insertIntoJDBC(url, "baz", True)
Reading data
Follow steps 1-4 from Writing data
Use
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
or sqlContext.read.format("jdbc")
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
Known issues and gotchas:
Suitable driver cannot be found - see: Writing data
Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace
dbtable
/table
argument with a valid subquery. See for example:- Does spark predicate pushdown work with JDBC?
- More than one hour to execute pyspark.sql.DataFrame.take(4)
- How to use SQL query to define table in dbtable?
By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:
- Provide partitioning
column
(must beIntegerType
),lowerBound
,upperBound
,numPartitions
. - Provide a list of mutually exclusive predicates
predicates
, one for each desired partition.
See:
- Partitioning in spark while reading from RDBMS via JDBC,
- How to optimize partitioning when migrating data from JDBC source?,
- How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
- How to partition Spark RDD when importing Postgres using JDBC?
- Provide partitioning
In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.
Where to find suitable drivers:
Maven Repository (to obtain required coordinates for
--packages
select desired version and copy data from a Gradle tab in a formcompile-group:name:version
substituting respective fields) or Maven Central Repository:- PostgreSQL
- MySQL
Other options
Depending on the database specialized source might exist, and be preferred in some cases:
- Greenplum - Pivotal Greenplum-Spark Connector
- Apache Phoenix - Apache Spark Plugin
- Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
- Amazon Redshift - Databricks Redshift connector (current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).
PySpark pyspark.sql.DataFrameReader.jdbc() doesn't accept datetime type upperbound argument as document says
It supports.
The issue here is that the spark.read.jdbc
method currently only supports parameters upper/lower bounds for integral type columns.
But you can use load
method and DataFrameReader.option
to specifiy upperBound
and lowerBound
for other column types date/timestamp :
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://server/db") \
.option("dbtable", "table_name") \
.option("user", "user") \
.option("password", "xxxx") \
.option("partitionColumn", "EVENT_CAPTURED") \
.option("lowerBound", "2016-01-01 00:00:00") \
.option("upperBound", "2016-01-10 00:00:00") \
.option("numPartitions", "8") \
.load()
Or by passing dict of options:
df = spark.read.format("jdbc") \
.options(*sql_conn_params)\
.load()
You can see all available options and examples here: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
PySpark & JDBC: When should I use spark with JDBC?
- Yes, you can install spark locally and use JDBC to connect to your databases. Here is an function to help you connect to my-sql, which you can generalize to any JDBC source by changing the JDBC connection string:
def connect_to_sql(
spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)
connection_details = {
"user": username,
"password": password,
"driver": "com.mysql.cj.jdbc.Driver",
}
df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
return df
Spark is better at handling big data than Pandas even on local machines but it comes with a performance overhead due to parallelism and distributed computing. It will definitely serve your purpose on the cluster, but local mode should be used for development only.
Rest assured, Spark (installed locally) will push query, limit and transformation limitations and even handle it better if done correctly. Search, Sort, Filter operations are going to be expensive since the DF is a non-indexed distributed data structure.
Unaware of the speed difference between Presto and Spark, have not tried a comparison.
Hope this helps.
Note: Performance improvement is not guaranteed on local machine even with optimal parallel workload. It does not provide opportunity for distribution.
Spark structured streaming from JDBC source
No, there is no such built-in support in Spark Structured Streaming. The main reason is that most of databases doesn't provided an unified interface for obtaining the changes.
It's possible to get changes from some databases using archive logs, write-ahead logs, etc. But it's database-specific. For many databases the popular choice is Debezium that can read such logs and push list of changes into a Kafka, or something similar, from which it could be consumed by Spark.
Loading data from Oracle table using spark JDBC is extremely slow
As your query has lot of filters you don't even need to bring in the whole dataset and then apply filter on it. But you can push this query down to db engine which will in turn filter the data and return back the result for Glue job.
This can be done as explained in https://stackoverflow.com/a/54375010/4326922 and below is an example for mysql which can be applied for oracle too with few changes.
query= "(select ab.id,ab.name,ab.date1,bb.tStartDate from test.test12 ab join test.test34 bb on ab.id=bb.id where ab.date1>'" + args['start_date'] + "') as testresult"
datasource0 = spark.read.format("jdbc").option("url", "jdbc:mysql://host.test.us-east-2.rds.amazonaws.com:3306/test").option("driver", "com.mysql.jdbc.Driver").option("dbtable", query).option("user", "test").option("password", "Password1234").load()
Related Topics
How to Directly Send a Python Output to Clipboard
Run a Linux System Command as a Superuser, Using a Python Script
Scraping: Ssl: Certificate_Verify_Failed Error for Http://En.Wikipedia.Org
Case Insensitive Regular Expression Without Re.Compile
Conda Reports Packagesnotfounderror: Python=3.1 for Reticulate Environment
Placing Custom Images in a Plot Window--As Custom Data Markers or to Annotate Those Markers
Why Are Scripting Languages (E.G. Perl, Python, and Ruby) Not Suitable as Shell Languages
Create a .CSV File with Values from a Python List
Pandas: Rolling Mean by Time Interval
Numpy Selecting Specific Column Index Per Row by Using a List of Indexes
How to Make a Cross-Module Variable
Priority of the Logical Operators Not, And, or in Python
How to Get Different Colored Lines for Different Plots in a Single Figure
How to Get a List of All the Duplicate Items Using Pandas in Python
What Does the Caret (^) Operator Do
Keras Not Training on Entire Dataset