Everything about Cartesian Product in Spark
Cartesian Product join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted.
Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.
We will understand all the above points with examples in detail
We are setting spark.sql.autoBroadcastJoinThreshold to -1 to disable broadcast.
scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res1: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res2: String = -1scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30, 20, 40, 50)data2: Seq[Int] = List(30, 20, 40, 50)scala> val df2 = data2.toDF("id2")df2: org.apache.spark.sql.DataFrame = [id2: int]
Note here we are trying to perform a non-equi join operation.
scala> val dfJoined = df1.join(df2, $"id1" >= $"id2")dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
When we see the plan that will be executed, we can see that CartesianProduct is used.
scala> dfJoined.queryExecution.executedPlanres3: org.apache.spark.sql.execution.SparkPlan =CartesianProduct (id1#3 >= id2#8):- LocalTableScan [id1#3]+- LocalTableScan [id2#8]scala> dfJoined.show+---+---+|id1|id2|+---+---+| 20| 20|| 20| 20|| 30| 30|| 30| 20|| 40| 30|| 40| 20|| 40| 40|| 40| 30|| 40| 20|| 40| 40|| 20| 20|| 20| 20|| 20| 20|| 20| 20|| 50| 30|| 50| 20|| 50| 40|| 50| 50|+---+---+
Stages involved in a Cartesian Product Join
This join is executed all in one stage. Even though this join is also called Shuffle-and-Replication it does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.
Inner workings of a cartesian product Join
- Both datasets are read. All partitions from one of the dataset are sent to all partitions in the other dataset.
- Once partitions from both dataset are available on one side, a nested loop join is performed.
- If there are N records in one dataset and M records in the other dataset a nested loop is performed on N * M records.
When does Cartesian Product Join work?
- Works in both equi and non-equi joins
- Works only on inner like joins
When Cartesian Product Join doesn’t work?
- Doesn’t work on non inner like joins
- This is a very expensive join algorithm. Except load on the network and partitions are moved across the network.
- High possibility of Out of Memory exception.