What is Catalyst query optimizer?
Spark SQL was designed with an optimizer called Catalyst based on the functional programming of Scala. Its two main purposes are: first, to add new optimization techniques to solve some problems with “big data” and second, to allow developers to expand and customize the functions of the optimizer.
Catalyst Spark SQL architecture and Catalyst optimizer integration
Catalyst components
Los componentes principales del optimizador de Catalyst son los siguientes:
The main components of the Catalyst optimizer are as follows:
Trees
The main data type in Catalyst is the tree. Each tree is composed of nodes, and each node has a nodetype and zero or more children. These objects are immutable and can be manipulated with functional language.
As an example, let me show you the use of the following nodes:
Merge(Attribute(x), Merge(Literal(1), Literal(2))
Where:
- Literal(value: Int): a constant value
- Attribute(name: String): an attribute as input row
- Merge(left: TreeNode, right: TreeNode): mix of two expressions
Rules
Trees can be manipulated using rules, which are functions of a tree to another tree. The transformation method applies the pattern matching function recursively on all nodes of the tree transforming each pattern to the result. Below there’s an example of a rule applied to a tree.
tree.transform { case Merge(Literal(c1), Literal(c2)) => Literal(c1) + Literal(c2)}
Using Catalyst in Spark SQL
The Catalyst Optimizer in Spark offers rule-based and cost-based optimization. Rule-based optimization indicates how to execute the query from a set of defined rules. Meanwhile, cost-based optimization generates multiple execution plans and compares them to choose the lowest cost one.
Phases
The four phases of the transformation that Catalyst performs are as follows:
1. Analysis
The first phase of Spark SQL optimization is the analysis. Spark SQL starts with a relationship to be processed that can be in two ways. A serious form from an AST (abstract syntax tree) returned by an SQL parser, and on the other hand from a DataFrame object of the Spark SQL API.
2. Logic Optimization Plan
The second phase is the logical optimization plan. In this phase, rule-based optimization is applied to the logical plan. It is possible to easily add new rules.
3. Physical plan
In the physical plan phase, Spark SQL takes the logical plan and generates one or more physical plans using the physical operators that match the Spark execution engine. The plan to be executed is selected using the cost-based model (comparison between model costs).
4. Code generation
Code generation is the final phase of optimizing Spark SQL. To run on each machine, it is necessary to generate Java code bytecode.
Phases of the query plan in Spark SQL. Rounded squares represent the Catalyst trees
Example
The Catalyst optimizer is enabled by default as of Spark 2.0, and contains optimizations to manipulate datasets. Below is an example of the plan generated for a query of a Dataset from the Spala SQL API of Scala:
// Business objectcase class Persona(id: String, nombre: String, edad: Int)// The dataset to queryval peopleDataset = Seq( Persona("001", "Bob", 28), Persona("002", "Joe", 34)).toDS// The query to executeval query = peopleDataset.groupBy("nombre").count().as("total")// Get Catalyst optimization planquery.explain(extended = true)
As a result, the detailed plan for the consultation is obtained:
== Analyzed Logical Plan ==nombre: string, count: bigintSubqueryAlias total+- Aggregate [nombre#4], [nombre#4, count(1) AS count#11L] +- LocalRelation [id#3, nombre#4, edad#5]== Optimized Logical Plan ==Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]+- LocalRelation [nombre#4]== Physical Plan ==*(2) HashAggregate(keys=[nombre#4], functions=[count(1)], output=[nombre#4, count#11L])+- Exchange hashpartitioning(nombre#4, 200) +- *(1) HashAggregate(keys=[nombre#4], functions=[partial_count(1)], output=[nombre#4, count#17L]) +- LocalTableScan [nombre#4]
Conclusions
The Spark SQL Catalyst Optimizer improves developer productivity and the performance of their written queries. Catalyst automatically transforms relational queries to execute them more efficiently using techniques such as filtering, indexes and ensuring that data source joins are performed in the most efficient order. In addition, its design allows the Spark community to implement and extend the optimizer with new features.