All about Broadcast Variable and How to use it

In this post , we will see – How to use Broadcast Variable in Spark . Broadcast variables can be tricky if the concepts behind are not clearly understood. This creates errors while using any Broadcast variables down the line.

Broadcast variables are used to implement map-side join, i.e. a join using a map.

e.g.. Lookup tables or data are distributed across nodes in a Distributed cluster using broadcast . And they are then used inside map (to do the join implicitly).
When you broadcast some data , the data gets copied to All the executors only once (So we avoid copying the same data again & again for tasks otherwise). Hence the broadcast makes your Spark application faster when you have a large value to use in tasks or there are more no. of tasks than executors.

To use any Broadcast variables correctly , note the below points and cross-check against your usage .

  • Broadcast Type errors – A broadcast variable is not necessarily an RDD or a Collection. It’s just whatever type you assign to it. You just think of a broadcast variable as a local variable that is local to every machine. Every worker will have a copy of whatever you’ve broadcasted so you don’t need to worry about assigning it to specific RDD values. It is simply a data structure like a global variable that is read (and not written to) by all the workers.

Consider the example

var2=sc.broadcast(var1)

In this case , Type of Broadcast var2 is whatever var1 Type is !

A broadcast variable can contain any class (Integer or any object etc.). It is by no means a scala collection.

The best time to use and RDD is when you have a fairly large object that you’re going to need for most values in the RDD.

  • Broadcast Join Errors – You should not use Standard broadcasts to handle distributed data structures. To perform broadcast join on any dataframe , the dataframe needs to be prepped for broadcasting beforehand. See the example below –

 

import org.apache.spark.sql.functions.broadcastval productType: DataFrame = <SOME_DATAFRAME>val tmpDf: DataFrame = broadcast(productType.withColumnRenamed("PROD_ID", "PROD_ID_SOLD").as("soldProducts"))dataTable.as("somedf").join(broadcast(tmpDf), $"somedf.SOLD_PROD" === $"productType.PROD_ID_SOLD", "inner")

Note that – distributed data structure broadcast value is evaluated locally before join is called. This will ensure to make the tmpDf to broadcast afterwards.

  • Broadcast Join Plans – If you want to see the Plan of the Broadcast join , use “explain. Example as reference –
Df1.join(    broadcast(Df2),    Df1("col1") <=> Df2("col2")).explain()
  • To release a broadcast variable, first unpersist it and then destroy it.
broadcastVar.unpersistbroadcastVar.destroy

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)