Everything about Cartesian Product in Spark

Cartesian Product join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted.

Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.

We will understand all the above points with examples in detail

We are setting spark.sql.autoBroadcastJoinThreshold  to -1 to disable broadcast.

scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res1: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res2: String = -1scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30, 20, 40, 50)data2: Seq[Int] = List(30, 20, 40, 50)scala> val df2 = data2.toDF("id2")df2: org.apache.spark.sql.DataFrame = [id2: int]

Note here we are trying to perform a non-equi join operation.

scala> val dfJoined = df1.join(df2, $"id1" >= $"id2")dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

When we see the plan that will be executed, we can see that CartesianProduct is used.

scala> dfJoined.queryExecution.executedPlanres3: org.apache.spark.sql.execution.SparkPlan =CartesianProduct (id1#3 >= id2#8):- LocalTableScan [id1#3]+- LocalTableScan [id2#8]scala> dfJoined.show+---+---+|id1|id2|+---+---+| 20| 20|| 20| 20|| 30| 30|| 30| 20|| 40| 30|| 40| 20|| 40| 40|| 40| 30|| 40| 20|| 40| 40|| 20| 20|| 20| 20|| 20| 20|| 20| 20|| 50| 30|| 50| 20|| 50| 40|| 50| 50|+---+---+

 

Stages involved in a Cartesian Product Join

This join is executed all in one stage. Even though this join is also called Shuffle-and-Replication it does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.

 

Inner workings of a cartesian product Join

  • Both datasets are read. All partitions from one of the dataset are sent to all partitions in the other dataset.
  • Once partitions from both dataset are available on one side, a nested loop join is performed.
  • If there are N records in one dataset and M records in the other dataset a nested loop is performed on N * M records.

When does Cartesian Product Join work?

  • Works in both equi and non-equi joins
  • Works only on inner like joins

When Cartesian Product Join doesn’t work?

  • Doesn’t work on non inner like joins
  • This is a very expensive join algorithm. Except load on the network and partitions are moved across the network.
  • High possibility of Out of Memory exception.

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)