Posts

Showing posts with the label Scala

Broadcast Nested Loop in detail in spark

Image
Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2. As you could guess, Broadcast Nested Loop is not preferred and could be quite slow. It works for both equi and non-equi joins and it is picked by default when you have a non-equi join. Example We don’t change the default values for both  spark.sql.join.preferSortMergeJoin   and  spark.sql.autoBroadcastJoinThreshold  . scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res0: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res1: String = 10485760   scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30,

Everything about Cartesian Product in Spark

Image
Cartesian Product join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted. Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join. We will understand all the above points with examples in detail We are setting  spark.sql.autoBroadcastJoinThreshold   to -1 to disable broadcast. scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res1: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res2: String = -1scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30, 20, 40, 50)data2: Seq[Int] = List(30, 2

Everything in detail about "Shuffle Hash join" in Spark

Image
Shuffle Hash Join, as the name indicates works by shuffling both datasets. So the same keys from both sides end up in the same partition or task. Once the data is shuffled, the smallest of the two will be hashed into buckets and a hash join is performed within the partition. Shuffle Hash Join is different from Broadcast Hash Join because the entire dataset is not broadcasted instead both datasets are shuffled and then the smallest side data is hashed and bucketed and hash joined with the bigger side in all the partitions. Shuffle Hash Join is divided into 2 phases. Shuffle phase  – both datasets are shuffled Hash Join phase  – smaller side data is hashed and bucketed and hash joined with he bigger side in all the partitions. Sorting is not needed with Shuffle Hash Joins inside the partitions. Example spark.sql.join.preferSortMergeJoin should be set to false and spark.sql.autoBroadcastJoinThreshold should be set to lower value so Spark can choose to use Shuffle Hash Join over Sort Merge

Everything about "Broadcast hash join" in spark

Image
Broadcast Hash Join in Spark works by broadcasting the small dataset to all the executors and once the data is broadcasted a standard hash join is performed in all the executors. Broadcast Hash Join happens in 2 phases. Broadcast phase  – small dataset is broadcasted to all executors Hash Join phase  – small dataset is hashed in all the executors and joined with the partitioned big dataset. Broadcast Hash Join doesn’t involve a sort operation and it is one of the reasons it is the  fastest join algorithm. We will see in detail how it works with an example. Example spark.sql.autoBroadcastJoinThreshold   – max size of dataframe that can be broadcasted. The default is 10 MB. Which means only datasets below 10 MB can be broadcasted. We have 2 DataFrames df1 and df2 with one column in each – id1 and id2 respectively. We are doing a simple join on id1 and id2.   scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res1: String = 10485760scala> val data1 = Seq(10, 20,

Understand "Shuffle Sort Merge Join" in spark

Image
Shuffle Sort Merge Join, as the name indicates, involves a sort operation. Shuffle Sort Merge Join has 3 phases. Shuffle Phase  – both datasets are shuffled Sort Phase  – records are sorted by key on both sides Merge Phase  – iterate over both sides and join based on the join key. Shuffle Sort Merge Join is preferred when both datasets are big and can not fit in memory – with or without shuffle. We will understand all the above points with examples and elaborate DAGs Example spark.sql.join.preferSortMergeJoin   by default is set to true as this is preferred when datasets are big on both sides. Spark will pick Broadcast Hash Join if a dataset is small. In our case both datasets are small so to force a Sort Merge join we are setting  spark.sql.autoBroadcastJoinThreshold   to -1 and this will disable Broadcast Hash Join. scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res1: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res2:

In detail Accumulators explanation in Spark (Java and Scala)

Accumulators are Shared variables in Spark Only “added” through an associative and commutative operation Used to perform counters (Similar to Map-reduce counters) or sum operations Spark by default supports creating accumulators of any numeric type and provide the capability to add custom accumulator types. Following accumulators can be created by Developers named accumulators – can be seen on Spark web UI under the “Accumulator” tab, On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task unnamed accumulators – there are not shown to UI so for all practical purposes it is advisable to use name accumulators Spark by default provides accumulator methods for long, double and collection types For example, you can create long accumulator on spark-shell using scala > val accum = sc . longAccumulator ( "SumAccumulator" ) a

What is the difference between foreach and foreachPartition in Spark?

foreach() and foreachPartition() are action function and not transform function. Both functions, since they are actions, they don’t return a RDD back. foreach() Use foreach() when you want to apply a function on every element in a RDD. But note, you are not transforming the elements in the RDD. With foreach() you are usually changing the state of something outside the RDD based on the elements in the RDD. Thereby causing side effects. For eg. you can use foreach() to update a column in a database table for every element in RDD. A common use case to use foreach() is to update an accumulator for every element in RDD. scala> val acc = sc.longAccumulator("sample-accumulator")scala> sc.parallelize(Array(10, 20, 30, 40)).foreach(element => acc.add(element)) In the above example foreach function is applied 4 times. Once per element in RDD. foreachPartition() foreachPartition() is very similar to mapPartitions() as it is also used to perform initialization once per partition

Getting started with Spark's Word Count program by Scala

Hadoop MapReduce’s word count becomes very simple with the Spark shell. In this recipe, we are going to create a simple 1-line text file, upload it to the Hadoop distributed file system (HDFS), and use Spark to count occurrences of words. Let’s see how: 1. Create the words directory by using the following command: $ mkdir words 2. Get into the words directory: $ cd words 3. Create a sh.txt text file and enter “to be or not to be” in it: $ echo “to be or not to be” > sh.txt 4. Start the Spark shell: $ spark-shell 5. Load the words directory as RDD: Scala> val words = sc.textFile(“hdfs://localhost:9000/user/hduser/ words”) 6. Count the number of lines ( result: 1): Scala> words.count 7. Divide the line (or lines) into multiple words: Scala> val wordsFlatMap = words.flatMap(_.split(“\\W+”)) 8. Convert word to (word,1)—that is, output 1 as the value for each occurrence of word as a key: Scala> val wordsMap = wordsFlatMap.map( w => (w,1)) 9. Use the reduceByKey method to a