Accumulators details in PySpark
Accumulators are
- Shared by all executors to update and add information through aggregation or computative operations
- Only “added” through an associative and commutative operation
- Used to perform counters (Similar to Map-reduce counters) or sum operations
- Used with RDD and DataFrame to perform sum and counter operations
Spark by default supports creating accumulators of any numeric type and provide the capability to add custom accumulator types.
Accumulators are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the workers get propagated automatically to the driver program. But, only the driver program is allowed to access the Accumulator variable using the value property.
Accumulator creation
Using accumulator() from SparkContext class we can create an Accumulator in PySpark programming. Users can also create Accumulators for custom types using AccumulatorParam class of PySpark.
- sparkContext.accumulator() is used to define accumulator variables.
- add() function is used to add/update a value in accumulator
- value property on the accumulator variable is used to retrieve the value from the accumulator.
We can create Accumulators in PySpark for primitive types int and float. Users can also create Accumulators for custom types using AccumulatorParam class of PySpark.
Example to create an accumulator variable “accum” of type int and using it to sum all values in an RDD
Here, we have created an accumulator variable accum using spark.sparkContext.accumulator(0) with initial value 0. Later, we are iterating each element in an rdd using foreach() action and adding each element of rdd to accum variable. Finally, we are getting accumulator value using accum.value property.
Note that, In this example, rdd.foreach() is executed on workers and accum.value is called from PySpark driver program.
Accumulator creation by Fucntion:
We can also use accumulators to do a counters.