Spark SQL Queries VS Dataframe Functions

Spark sql queries vs dataframe functions

There is no performance difference whatsoever. Both methods use exactly the same execution engine and internal data structures. At the end of the day, all boils down to personal preferences.

  • Arguably DataFrame queries are much easier to construct programmatically and provide a minimal type safety.

  • Plain SQL queries can be significantly more concise and easier to understand. They are also portable and can be used without any modifications with every supported language. With HiveContext, these can also be used to expose some functionalities which can be inaccessible in other ways (for example UDF without Spark wrappers).

Writing SQL vs using Dataframe APIs in Spark SQL

Couple more additions. Dataframe uses tungsten memory representation , catalyst optimizer used by sql as well as dataframe. With Dataset API, you have more control on the actual execution plan than with SparkSQL

Dataframe API vs Spark.sql

your dataframe transformations and spark sql querie will be translated to execution plan anyway and Catalyst will optimize it.

The main advantage of dataframe api is that you can use dataframe optimize fonction, for example : cache() , in general you will have more control of the execution plan.

I feel like it easier to test your code also, people tend to write 1 huge query ...

Apache Spark: using plain SQL queries vs using Spark SQL methods

Try .explain and check if pushdown predicate is used on your second query.

It should be in that second case. If so, it is equivalent technically in performance to passing the explicit query with pushdown already contained in the query option.

See a simulated version against mySQL, based on your approach.

CASE 1: select statement via passed query containing filter

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam").option("driver", "org.mariadb.jdbc.Driver").option("query","select * from family where rfam_acc = 'RF01527'").option("user", "rfamro").load().explain()

== Physical Plan ==
*(1) Scan JDBCRelation((select * from family where rfam_acc = 'RF01527') SPARK_GEN_SUBQ_4) [numPartitions=1] #[rfam_acc#867,rfam_id#868,auto_wiki#869L,description#870,author#871,seed_source#872,gathering_cutoff#873,trusted_cutoff#874,noise_cutoff#875,comment#876,previous_id#877,cmbuild#878,cmcalibrate#879,cmsearch#880,num_seed#881L,num_full#882L,num_genome_seq#883L,num_refseq#884L,type#885,structure_source#886,number_of_species#887L,number_3d_structures888,num_pseudonokts#889,tax_seed#890,... 11 more fields] PushedFilters: [], ReadSchema: struct<rfam_acc:string,rfam_id:string,auto_wiki:bigint,description:string,author:string,seed_sour...

Here PushedFilters is not used as a query is only used; it contains the filter in the actual passed to db query.

CASE 2: No select statement, rather using Spark SQL APIs referencing a filter

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam").option("driver", "org.mariadb.jdbc.Driver").option("dbtable", "family").option("user", "rfamro").load().select("*").filter(col("rfam_acc").equalTo("RF01527")).explain()

== Physical Plan ==
*(1) Scan JDBCRelation(family) [numPartitions=1] [rfam_acc#1149,rfam_id#1150,auto_wiki#1151L,description#1152,author#1153,seed_source#1154,gathering_cutoff#1155,trusted_cutoff#1156,noise_cutoff#1157,comment#1158,previous_id#1159,cmbuild#1160,cmcalibrate#1161,cmsearch#1162,num_seed#1163L,num_full#1164L,num_genome_seq#1165L,num_refseq#1166L,type#1167,structure_source#1168,number_of_species#1169L,number_3d_structures#1170,num_pseudonokts#1171,tax_seed#1172,... 11 more fields] PushedFilters: [*IsNotNull(rfam_acc), *EqualTo(rfam_acc,RF01527)], ReadSchema: struct<rfam_acc:string,rfam_id:string,auto_wiki:bigint,description:string,author:string,seed_sour...

PushedFilter is set to the criteria so filtering is applied in the database itself prior to returning data to Spark. Note the * on the PushedFilters, that signfies filtering at data source = database.

Summary

I ran both options and the timing was quick. They are equivalent in terms of what DB processing is done, only filtered results are returned to Spark, but via two different mechanisms that result in the same performance and results physically.



Related Topics



Leave a reply



Submit