Adaptive Query Execution(AQE)
Spark is one of the vastly used frameworks in Data Engineering to process huge data. As there is huge sets of data, Spark has to process it very intelligently. The R&D team of Spark development is putting continuous efforts to make the processing better. By default Spark framework comes with Catalyst optimizer. Catalyst Optimizer supports both Cost-based and Rule-based optimizations. Cost-based optimization uses the statistics of data like count, max, min and etc.. in order to optimize the processing by making Spark to make better plan to execute the job. Some examples of these cost-based optimization techniques are selecting the right join type (broadcast hash join vs. sort merge join), selecting the correct build side in a hash-join, or adjusting the join order in a multi-way join. But however the statistics used to choose the plan might change in while processing and if there is imperfect cardinality might lead inefficient plans. But this problem will not be there in Spark 3.0. In Spark 3.0 a new feature added called, Adaptive Query Execution(AQE). AQE will use the run time statistics to reoptimize the query and adjusting the query plans.
How Adaptive Query Execution framework works?
As the name suggests it reoptimizes the query plan while execution in progress, but AQE framework should know when to optimize the plan. Spark is designed for parallel processing. The physical plan will be divided into multiple phases. Each phase is called a Stage. Spark will create a stage if there is a shuffle operation or a broadcast exchange in the execution. If the stage in parallel processes completed then only Spark can provide this intermediate data to next stage. So this provides the framework a chance to reoptimize the query plan when statistics of all the parallel processes available before starting the successive stages.
AQE firstly starts all the leaf stages - stages that do not depend on any other stages. Once these are completed, it will mark them as completed in the Physical plan and also updates the logical plan accordingly with the runtime statistics received from the completed stages. Now, based on these new data statistics Spark runs the logical optimizer, physical planner, physical optimization rules like coalescing partitions, skew joins handling etc. Now it gets a new optimized plan with some stages completed. AQE framework searches for and executes the stages whose child stages have been materialized. This process will be repeated until entire query is finished.
In Spark 3.0, the AQE framework provides three features:
- Dynamically coalescing shuffle partitions
- Dynamically switching join strategies
- Dynamically optimizing skew joins
Dynamically coalescing shuffle partitions
Shuffles are costly operation in Spark as they involve data movement over the network, which will impact performance of job execution. Shuffling of data depends on partition number. The best way of deciding is based on data size. But data sizes on partitions will vary stage to stage makes the developer hard to decide the number of partitions. 1). If there are too few partitions, then data size in each partition will be high and that leads to spillage of data on to disk. As disk I/O is also a costly operation, it will impact the performance. 2). If there are more partitions, then size of the data on each partition will be less and there will be lot of network data transfers, which will again impacts performance.
We can solve this problem by setting large number of partitions initially and then we can reduce that number during the execution of the job by grouping adjacent small partitions into one.
Assume that we are performing an aggregation operation in SQL by gouping the data. The input data tbl is rather small so there are only two partitions before grouping. The initial shuffle partition number is set to five, so after local grouping, the partially grouped data is shuffled into five partitions. Without AQE, Spark will start five tasks to do the final aggregation. However, there are three very small partitions here, and it would be a waste to start a separate task for each of them.
But using AQE framework, it coalesces the three small partitions into one, which makes the final aggregation to be performed on just three partitions rather than 5 partitions.
Dynamically switching join strategies
Spark supports various types of joins. Out of the join types broad hash is one of joins, which improves the performance of jobs. Whenever any dataset in a join operation can well fit in memory then we can broadcast the dataset to executors so that executors will have entire dataset per mapper basis. So for this reason spark plans broadcast hash join if the estimated join size is lower than the broadcast-size threshold. But a number of things can make this size estimation go wrong — such as the presence of a very selective filter — or the join relation being a series of complex operators other than just a scan.
To avoid this problem, AQE replans the join strategy at runtime based on the most accurate join relation size.
As can be seen in the following example, the right side of the join is found to be way smaller than the estimate and also small enough to be broadcast, so after the AQE reoptimization the statically planned sort merge join is now converted to a broadcast hash join.
For the broadcast hash join converted at runtime, we may further optimize the regular shuffle to a localized shuffle (i.e., shuffle that reads on a per mapper basis instead of a per reducer basis) to reduce the network traffic.
Dynamically optimizing skew joins
Data skew happens when the data is distributed unevenly among the partitions. If the skewness of the data is relative high then that can lead worst performance in Spark.
Now Spark come AQE framework which detects this kind of skew joins by using runtime shuffled data statistics and reoptimizes the plan. AQE splits the skewed data into multiple smaller subpartitions, which will be joined with the corresponding the partition from the other side of the join.
Let’s take this example of table A join table B, in which table A has a partition A0 significantly bigger than its other partitions.
The skew join optimization will thus split partition A0 into two subpartitions and join each of them to the corresponding partition B0 of table B.
Without this optimization, Spark has to work on the skewed partition A0 as is, which takes significantly longer time compared to the time to process other partitions. After this optimization Spark will take roughly the same amount of time to process these two subpartitions.
How to enable Adaptive Query Execution(AQE) in Spark
AQE can be enabled by setting SQL config spark.sql.adaptive.enabled to true (default false in Spark 3.0), and applies if the query meets the following criteria:
- It is not a streaming query
- It contains at least one exchange (usually when there’s a join, aggregate or window operator) or one subquery
Please comment your thoughts and feedback about this post to improve our blog quality. Also please let us know if you want us to add any Data Engineering concepts that are missing on our blog..