What is Catalyst query optimizer?

Spark SQL was designed with an optimizer called Catalyst based on the functional programming of Scala. Its two main purposes are: first, to add new optimization techniques to solve some problems with “big data” and second, to allow developers to expand and customize the functions of the optimizer.

Catalyst Spark SQL architecture and Catalyst optimizer integration

Catalyst components

Los componentes principales del optimizador de Catalyst son los siguientes:

The main components of the Catalyst optimizer are as follows:

Trees

The main data type in Catalyst is the tree. Each tree is composed of nodes, and each node has a nodetype and zero or more children. These objects are immutable and can be manipulated with functional language.

As an example, let me show you the use of the following nodes:

Merge(Attribute(x), Merge(Literal(1), Literal(2))

Where:

  • Literal(value: Int): a constant value
  • Attribute(name: String): an attribute as input row
  • Merge(left: TreeNode, right: TreeNode): mix of two expressions

Rules

Trees can be manipulated using rules, which are functions of a tree to another tree. The transformation method applies the pattern matching function recursively on all nodes of the tree transforming each pattern to the result. Below there’s an example of a rule applied to a tree.

tree.transform { case Merge(Literal(c1), Literal(c2)) => Literal(c1) + Literal(c2)}

Using Catalyst in Spark SQL

The Catalyst Optimizer in Spark offers rule-based and cost-based optimization. Rule-based optimization indicates how to execute the query from a set of defined rules. Meanwhile, cost-based optimization generates multiple execution plans and compares them to choose the lowest cost one.

Phases

The four phases of the transformation that Catalyst performs are as follows:

1. Analysis

The first phase of Spark SQL optimization is the analysis. Spark SQL starts with a relationship to be processed that can be in two ways. A serious form from an AST (abstract syntax tree) returned by an SQL parser, and on the other hand from a DataFrame object of the Spark SQL API.

2. Logic Optimization Plan

The second phase is the logical optimization plan. In this phase, rule-based optimization is applied to the logical plan. It is possible to easily add new rules.

3. Physical plan

In the physical plan phase, Spark SQL takes the logical plan and generates one or more physical plans using the physical operators that match the Spark execution engine. The plan to be executed is selected using the cost-based model (comparison between model costs).

4. Code generation

Code generation is the final phase of optimizing Spark SQL. To run on each machine, it is necessary to generate Java code bytecode.

Phases of the query plan in Spark SQL. Rounded squares represent the Catalyst trees

 

Example

The Catalyst optimizer is enabled by default as of Spark 2.0, and contains optimizations to manipulate datasets. Below is an example of the plan generated for a query of a Dataset from the Spala SQL API of Scala:

// Business objectcase class Persona(id: String, nombre: String, edad: Int)// The dataset to queryval peopleDataset  = Seq(      Persona("001", "Bob", 28),      Persona("002", "Joe", 34)).toDS// The query to executeval query = peopleDataset.groupBy("nombre").count().as("total")// Get Catalyst optimization planquery.explain(extended = true)

As a result, the detailed plan for the consultation is obtained:

== Analyzed Logical Plan ==nombre: string, count: bigintSubqueryAlias total+- Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]   +- LocalRelation [id#3, nombre#4, edad#5]== Optimized Logical Plan ==Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]+- LocalRelation [nombre#4]== Physical Plan ==*(2) HashAggregate(keys=[nombre#4], functions=[count(1)], output=[nombre#4, count#11L])+- Exchange hashpartitioning(nombre#4, 200)   +- *(1) HashAggregate(keys=[nombre#4], functions=[partial_count(1)], output=[nombre#4, count#17L])      +- LocalTableScan [nombre#4]

Conclusions

The Spark SQL Catalyst Optimizer improves developer productivity and the performance of their written queries. Catalyst automatically transforms relational queries to execute them more efficiently using techniques such as filtering, indexes and ensuring that data source joins are performed in the most efficient order. In addition, its design allows the Spark community to implement and extend the optimizer with new features.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)