Spark - repartition() vs coalesce()

Spark repartition() vs coalesce() – repartition() is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the coalesce() is used to only decrease the number of partitions in an efficient way.

One important point to note is, Spark repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible.

In RDD, you can create parallelism at the time of the creation of an RDD using parallelize(), textFile() and wholeTextFiles().

val spark:SparkSession = SparkSession.builder()    .master("local[5]")    .appName("SparkByExamples.com")    .getOrCreate()  val rdd = spark.sparkContext.parallelize(Range(0,20))  println("From local[5]"+rdd.partitions.size)  val rdd1 = spark.sparkContext.parallelize(Range(0,25), 6)  println("parallelize : "+rdd1.partitions.size)  val rddFromFile = spark.sparkContext.textFile("src/main/resources/test.txt",10)  println("TextFile : "+rddFromFile.partitions.size)

output:

From local[5] : 5Parallelize : 6TextFile : 10

spark.sparkContext.parallelize(Range(0,20),6) distributes RDD into 6 partitions and the data is distributed as below.

rdd1.saveAsTextFile("/tmp/partition")//Writes 6 part files, one for each partitionPartition 1 : 0 1 2Partition 2 : 3 4 5Partition 3 : 6 7 8 9Partition 4 : 10 11 12Partition 5 : 13 14 15Partition 6 : 16 17 18 19

RDD repartition()

Spark RDD repartition() method is used to increase or decrease the partitions. The below example decreases the partitions from 10 to 4 by moving data from all partitions.

val rdd2 = rdd1.repartition(4)  println("Repartition size : "+rdd2.partitions.size)  rdd2.saveAsTextFile("/tmp/re-partition")

This yields output Repartition size : 4 and the repartition re-distributes the data(as shown below) from all partitions which is full shuffle leading to very expensive operation when dealing with billions and trillions of data.

Partition 1 : 1 6 10 15 19Partition 2 : 2 3 7 11 16Partition 3 : 4 8 12 13 17Partition 4 : 0 5 9 14 18

RDD coalesce()

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

 val rdd3 = rdd1.coalesce(4)  println("Repartition size : "+rdd3.partitions.size)  rdd3.saveAsTextFile("/tmp/coalesce")

If you compared the below output with section 1, you will notice partition 3 has been moved to 2 and Partition 6 has moved to 5, resulting data movement from just 2 partitions.

Partition 1 : 0 1 2Partition 2 : 3 4 5 6 7 8 9Partition 4 : 10 11 12 Partition 5 : 13 14 15 16 17 18 19

Complete Example of Spark RDD repartition and coalesce

package com.sparkbyexamples.spark.rddimport org.apache.spark.sql.SparkSessionobject RDDRepartitionExample extends App {  val spark:SparkSession = SparkSession.builder()    .master("local[5]")    .appName("SparkByExamples.com")    .getOrCreate()  val rdd = spark.sparkContext.parallelize(Range(0,20))  println("From local[5]"+rdd.partitions.size)  val rdd1 = spark.sparkContext.parallelize(Range(0,20), 6)  println("parallelize : "+rdd1.partitions.size)  rdd1.partitions.foreach(f=> f.toString)  val rddFromFile = spark.sparkContext.textFile("src/main/resources/test.txt",9)  println("TextFile : "+rddFromFile.partitions.size)  rdd1.saveAsTextFile("c:/tmp/partition")  val rdd2 = rdd1.repartition(4)  println("Repartition size : "+rdd2.partitions.size)  rdd2.saveAsTextFile("c:/tmp/re-partition")  val rdd3 = rdd1.coalesce(4)  println("Repartition size : "+rdd3.partitions.size)  rdd3.saveAsTextFile("c:/tmp/coalesce")}

Conclusion

In this Spark repartition and coalesce article, you have learned how to create an RDD with partition, repartition the RDD & DataFrame using repartition() and coalesce() methods, and learned the difference between repartition and coalesce.

 

 

 

 

Popular posts from this blog

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

What is Garbage collection in Spark and its impact and resolution

Window function in PySpark with Joins example using 2 Dataframes (inner join)