RDD examples in PySpark
Note: All examples run from PySpark Console
Creating RDD from List:
>>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]
>>> rdd1 = spark.sparkContext.parallelize(data)
>>> rdd1.glom().collect()
[[10, 20, 20], [30, 40, 10], [40, 20, 20], [20, 20, 50, 99, 88]]
glom method on rdd will return data in partitions
>>> rdd1.collect()
[10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]
Creating RDD with pre defined partitions
>>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]
>>> rdd1 = spark.sparkContext.parallelize(data, 5)
Creating RDD from file:
There are two ways to create RDD from file
Using wholeTextFiles() method:
wholeTestFiles will return pairedRdd and here key will be file name and value will be content of that file and this runs on directory in which there are multiple files
filesPairedRDD = sc.wholeTextFiles("/home/deepak/data/")
Using textFile() method:
this will return rdd which contain data from across partitions
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")
Creating empty RDD:
rdd = spark.sparkContext.emptyRDD
Creating empty RDD with partition:
rdd2 = spark.sparkContext.parallelize([],10)
Getting no of partition from RDD
print("initial partition count:"+str(rdd2.getNumPartitions()))#Outputs: initial partition count:10
repartitions():
Repartitions method is used to increase or decrease partitions
>>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]
>>> rdd1 = spark.sparkContext.parallelize(data)
>>> rdd1.glom().collect()
[[10, 20, 20], [30, 40, 10], [40, 20, 20], [20, 20, 50, 99, 88]]
>>> print(“re-partition count:”+str(rdd1.getNumPartitions()))
re-partition count:4
>>> rdd1 = rdd1.repartition(5)
>>> rdd1.glom().collect()
[[], [10, 20, 20], [40, 20, 20, 20, 20, 50, 99, 88], [30, 40, 10], []]
>>> rdd1.glom().collect()
[[], [10, 20, 20], [40, 20, 20, 20, 20, 50, 99, 88], [30, 40, 10], []]
>>> print(“re-partition count:”+str(rdd1.getNumPartitions()))
re-partition count:5
Colease(): is used to decrease partition
>>> rdd1.colease(2)
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
AttributeError: ‘RDD’ object has no attribute ‘colease’
>>> rdd1.collect
rdd1.collect( rdd1.collectAsMap( rdd1.collectWithJobGroup(
>>> rdd1.coalesce(2)
CoalescedRDD[22] at coalesce at NativeMethodAccessorImpl.java:0
>>> rdd1.coalesce(2).glom().collect()
[[10, 20, 20], [30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]]
>>> rdd1.coalesce(10).glom().collect()
[[10, 20, 20], [30, 40, 10], [], [40, 20, 20, 20, 20, 50, 99, 88], []]
>>> print(“re-partition count:”+str(rdd1.getNumPartitions()))
re-partition count:5
partitions vs coalesce:
coalesce uses existing partitions to minimize the amount of data that’s shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.
Same can be seen in above example in coalease example section
Transformation RDD examples:
map():
In this example, we will map an RDD of Strings to an RDD of Integers with each element in the mapped RDD representing the number of words in the input RDD. The final mapping would be RDD<String> -> RDD<Integer>.
data/rdd/input/sample.txt
Welcome to TutorialKart Learn Apache Spark Learn to work with RDD |
# read input text file to RDD
lines
=
sc.textFile(
"/home/arjun/workspace/spark/sample.txt"
)
# map lines to n_words
n_words
=
lines.
map
(
lambda
line :
len
(line.split()))
# collect the RDD to a list
llist
=
n_words.collect()
numbers
=
sc.parallelize([
14
,
21
,
88
,
99
,
455
])
# map lines to n_words
log_values
=
numbers.
map
(
lambda
n : math.log10(n))
# collect the RDD to a list
llist
=
log_values.collect()
flatMap – flatMap()
transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.
map – map()
transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.
In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is PairRDDFunctions
which contains key-value pairs, word of type String as Key and 1 of type Int as value.
reduceByKey – reduceByKey()
merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count.
sortByKey – sortByKey()
transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair
filter – filter
() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.