Broadcast Nested Loop in detail in spark

Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2.

As you could guess, Broadcast Nested Loop is not preferred and could be quite slow. It works for both equi and non-equi joins and it is picked by default when you have a non-equi join.

Example

We don’t change the default values for both spark.sql.join.preferSortMergeJoin  and spark.sql.autoBroadcastJoinThreshold .

scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res0: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res1: String = 10485760

 

scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30, 20, 40, 50)data2: Seq[Int] = List(30, 20, 40, 50)scala> val df2 = data2.toDF("id2")df2: org.apache.spark.sql.DataFrame = [id2: int]

 

Note here we are trying to perform a non-equi join operation.

scala> val dfJoined = df1.join(df2, $"id1" >= $"id2")dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

When we see the plan that will be executed, we can see that BroadcastNestedLoopJoin is used.

scala> dfJoined.queryExecution.executedPlanres2: org.apache.spark.sql.execution.SparkPlan =BroadcastNestedLoopJoin BuildRight, Inner, (id1#3 >= id2#8):- LocalTableScan [id1#3]+- BroadcastExchange IdentityBroadcastMode   +- LocalTableScan [id2#8]scala> dfJoined.show+---+---+|id1|id2|+---+---+| 20| 20|| 20| 20|| 30| 30|| 30| 20|| 40| 30|| 40| 20|| 40| 40|| 40| 30|| 40| 20|| 40| 40|| 20| 20|| 20| 20|| 20| 20|| 20| 20|| 50| 30|| 50| 20|| 50| 40|| 50| 50|+---+---+

 

Stages involved in a Broadcast Nested Loop Join

Broadcast Nested Loop join does not involve a shuffle or a sort. Smallest dataset of the two will be broadcasted to all partitions and a nested loop is performed between the 2 datasets to perform the join. Every record from dataset 1 is attempted to join with every record from dataset 2.

Internal workings of Broadcast Nested Loop Join

There are 2 phases in a Broadcast Nested Loop Join.

Broadcast phase

  • Smallest dataset is broadcasted to all executors or tasks processing the bigger dataset
  • Left side will be broadcasted in a right outer join.
  • Right side in a left outer, left semi, left anti or existence join will be broadcasted.
  • Either side can be broadcasted in an inner-like join.

Nested Loop Join phase

  • Once the dataset is broadcasted, every record from one dataset is attempted to join with every record from another dataset in a nested loop.
  • Since this join is used for non-equi conditions, the iteration can not stop as soon as a match is encountered like in Sort Merge Join. The iteration will go through the entire dataset.
  • Note that a sort is not involved in this join.

When does Broadcast Nested Loop Join work?

  • Works for both equi and non-equi joins
  • Works for all join types

When Broadcast Nested Loop Join doesn’t work?

  • This join is slow
  • This join will not work when either sides are big enough for broadcasting and you could see Out Of Memory exceptions.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)