Posted by

Rakesh Thakoordyal

on July 5, 2018

Share to

More Posts...

How to Improve Spark SQL Query Times

by Rakesh Thakoordyal | July 5, 2018

Superior Machine Learning and Artificial Intelligence solutions thrive in their ability to find those “golden nuggets” of value-added insights inside deep lakes of heterogeneous data. To do this, you often have to cruise through terabytes or petabytes of data, running complicated SQL queries with aggregations, analytical functions, subselects, and numerous table joins – just to name a few. Running such queries against big data sets can be frustrating when it comes to run-time. It’s not uncommon to hear the “start your query tonight and check it in the morning” story.  

Such performance issues are a problem with any database as you scale. Thus, improving query performance usually boils down to one of two options:

  1. Optimize your SQL query
  2. Modify your database configuration

I’ve spent quite a bit of time over the last few weeks working with Spark SQL performance issues. I wanted to share what I’ve learned in case it may help others doing similar work. This applies to running Spark SQL against parquet files backed by a Hive meta-store on an Azure HD Insight Cluster, either in code or in Jupyter; particularly for queries that are either just not completing or are taking a considerable amount of time to complete.

There are a number foundational/architectural concepts that come into play when faced with Spark performance issues. Specifically, I’m referring to things like:

  • Partitioning
  • Sorting
  • Bucketing
  • Storage I/O
  • Network I/O
  • Throttling

I won’t dive into these concepts today, but it’s important to be aware of them and how they can impact your reads and writes. These are decisions that are typically made at the time of table creation, and can impact how data is physically stored on disk, which in turn will have an impact on query run times.

Note: You should always make sure that your query returns an accurate result set (for either a full or scoped down input data set) before considering performance improvements.

Optimize Your Query

From what I’ve seen, this should always be your first play. Often times, optimization is the only piece which you have control over, and so you have to be sure that you’ve tuned your query as best you can before sending it off to Spark. Here are some tips that I’ve found useful when it comes to query tuning. Note that these tips are not limited to a Spark SQL application; they can be used for any SQL-based application regardless of data size:  

  • Table joins can be expensive. Join on numeric fields whenever possible and cast strings to integer if used in join condition (if possible)
  • Using the substring operation is expensive. If you are doing this a lot, try moving the associated query into a Common Table Expression (CTE), otherwise known as a “WITH” section. This is especially true if the substring is used in a join condition. In the example below, I’ve moved the substring operation in the product table to a CTE. In the original version of the query join condition between product and other tables would be done repetitively causing runtime to explode. By creating a version of product with these operations already done (via a CTE), the substringing only has to be done once. You could then refer to the CTE in the “main” part of the query. Here is an example:

with product_mod AS
(SELECT pg.*,
   	Cast(Substring(prod_group,5,4) AS  INTEGER) AS pg_5_4,
   	Cast(Substring(prod_group,1,4) AS  INTEGER) AS pg_1_4,
   	Cast(Substring(prod_group,1,12) AS INTEGER) AS pg_1_12,
   	Cast(Substring(prod_group,1,8) AS  INTEGER) AS pg_1_8,
   	Substring(prod_group,5,4) AS pg_5_4_s,
   	Substring(prod_group,1,4) AS pg_1_4_s,
   	Substring(prod_group,1,12) AS pg_1_12_s,
   	Substring(prod_group,1,8) AS pg_1_8_s
FROM   product)
  • If your query involves recalculating a complicated subset of data multiple times, move this calculation into a CTE
  • If you find that CTEs are not helping, try creating separate dataframes per join to the common table. At the end, union the tables to get the full data set:

--type 1 or type 2 transactions
df1 = spark.sql("""
select colA, colB, colC,... colN     	 
FROM  	transaction t
JOIN product prod
ON    	(
         	(t.type = 1 OR t.type = 2)
         	AND t.product_code = prod_item.product_code
         	and (prod.item_type = 1 or prod.item_type = 2)

--type 3 transactions
df2 = spark.sql("""
select colA, colB, colC,... colN     	 
FROM  	transaction t
JOIN product prod
ON    	(t.type = 3 AND prod.product_code = t.product_code and prod.item_type = 3)

--type 4 transactions
df3 = spark.sql("""
select colA, colB, colC,... colN     	 
FROM  	transaction t
JOIN product prod
ON    	(t.type = 4 AND prod.product_code = t.product_code and prod.item_type = 4)

--union all dataframes to get full data set
df4 = spark.sql("""
select * from df1
union all
select * from df2
union all
select * from df3
  • Divide a large query into multiple queries
  • Limit large fact tables as much as possible (i.e., by a date field), but be sure that it will not negatively impact your final result set. Check with downstream users/applications.
  • Limit the number of fields in the select clause
  • Use a hint(s) in your query when joining to smaller tables. These “smaller tables” are typically dimensions that do not cost a lot to copy to each node. This reduces the amount of shuffling required to join the big fact tables to smaller dimensions.  
    • In Spark SQL, the query planner will automatically use broadcast hints if the data is backed by a meta-store (like Hive)
    • Automatic broadcasting is also dependent on collecting stats on tables when they are inserted/updated
    • You can manually instruct a query to broadcast (i.e., if you were querying a set of parquet files sitting in a repository not backed by a meta store):

--/*+ BROADCAST (product), BROADCAST(store) */
t.trans_id, t.trans_timestamp, t.customer_id, prod.product_name, s.store_name, s.region
from transaction_detail td
join product prod on (td.product_code = prod.product_code)
join store s on (td.store_code = s.store_code)
    • There are other types of hint(s) available in Spark SQL that may be more applicable to your use case (i.e., MAPJOIN and BROADCASTJOIN). Refer to this page for more details on using hints in spark:

Tweak Spark Parameters

These depend on cluster configuration/node types/available resources on the cluster:

  • Add more executors:  --num-executors 10
  • Add more memory to executors: --executor-memory 5g
  • Add more cores per executor:  --executor-cores 10
  • Add more driver memory: --driver-memory 2g
  • Set --conf spark.sql.shuffle.partitions=x, where x should be same number as numPartitions in your Spark “config.json” file (if this value is set in config.json)
  • Scale up the cluster but remember to scale back down afterward to avoid excessive costs
  • If the query is stalling when adding/removing broadcast variables you can try to disable broadcasting via --conf spark.sql.autoBroadcastJoinThreshold=-1. This will drop performance but should at least complete the job

When it comes to querying large data sets, Spark’s in-memory data processing capabilities bring about tremendous benefits, as compared to more traditional data processing frameworks. It’s one thing to be able to import and store data from files, databases, real-time streaming applications, web logs, etc, but the real “win” comes from being able to extract meaningful insights that drive business value from such data lakes. This has to be done in an acceptable amount of time.

Although Spark is inherently fast, it’s not a magic wand. Traditional database aspects such as partitioning, indexing, sorting, and distribution all play a role in achieving acceptable query run times as your database scales. This article touched on some of the “lower-hanging fruit” options, such as tuning a Spark SQL query and modifying Spark run-time parameters to hopefully speed up your big data queries.