In detail Accumulators explanation in Spark (Java and Scala)

Accumulators are

  • Shared variables in Spark
  • Only “added” through an associative and commutative operation
  • Used to perform counters (Similar to Map-reduce counters) or sum operations

Spark by default supports creating accumulators of any numeric type and provide the capability to add custom accumulator types.

Following accumulators can be created by Developers

  • named accumulators – can be seen on Spark web UI under the “Accumulator” tab, On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task
  • unnamed accumulators – there are not shown to UI so for all practical purposes it is advisable to use name accumulators

Spark by default provides accumulator methods for long, double and collection types

For example, you can create long accumulator on spark-shell using


scala> val accum = sc.longAccumulator("SumAccumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(SumAccumulator), value: 0)

The above statement creates a named accumulator “SumAccumulator”. Now, Let’s see how to add up the elements from an array to this accumulator.

scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum.add(x))----------scala> accum.valueres2: Long = 6

Each of these accumulator classes has several methods, among these, add() method call from tasks running on the cluster. Tasks can’t read the values from the accumulator and only the driver program can read accumulators value using the value() method.

Long Accumulator

longAccumulator() methods from SparkContext returns LongAccumulator

Syntax

//Long Accumulatordef longAccumulator : org.apache.spark.util.LongAccumulatordef longAccumulator(name : scala.Predef.String) : org.apache.spark.util.LongAccumulator

You can create named accumulators for long type using SparkContext.longAccumulator(v) and for unnamed use signature that doesn’t take an argument.

  val spark = SparkSession.builder()    .appName("SparkByExample")    .master("local")    .getOrCreate()  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")    val rdd = spark.sparkContext.parallelize(Array(1, 2, 3))  rdd.foreach(x => longAcc.add(x))  println(longAcc.value)

LongAccumulator class provides following methods

  • isZero
  • copy
  • reset
  • add
  • count
  • sum
  • avg
  • merge
  • value

Double Accumulator

For named double type using SparkContext.doubleAccumulator(v) and for unnamed use signature that doesn’t take an argument.

Syntax

//Double Accumulatordef doubleAccumulator : org.apache.spark.util.DoubleAccumulatordef doubleAccumulator(name : scala.Predef.String) : org.apache.spark.util.DoubleAccumulator

DoubleAccumulator class also provides methods similar to LongAccumulator

Collection Accumulator

For named collection type using SparkContext.collectionAccumulator(v) and for unnamed use signature that doesn’t take an argument.

Syntax

//Collection Accumulatordef collectionAccumulator[T] : org.apache.spark.util.CollectionAccumulator[T]def collectionAccumulator[T](name : scala.Predef.String) : org.apache.spark.util.CollectionAccumulator[T]

CollectionAccumulator class provides following methods

  • isZero
  • copyAndReset
  • copy
  • reset
  • add
  • merge
  • value

Note: Each of these accumulator classes has several methods, among these, add() method call from tasks running on the cluster. Tasks can’t read the values from the accumulator and only the driver program can read accumulators value using the value() method.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)