RDD examples in PySpark

Note: All examples run from PySpark Console

Creating RDD from List:

>>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]

>>> rdd1 = spark.sparkContext.parallelize(data)

>>> rdd1.glom().collect()

[[10, 20, 20], [30, 40, 10], [40, 20, 20], [20, 20, 50, 99, 88]]

glom method on rdd will return data in partitions

>>> rdd1.collect()

[10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]

Creating RDD with pre defined partitions

>>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]

>>> rdd1 = spark.sparkContext.parallelize(data, 5)

Creating RDD from file:

There are two ways to create RDD from file

Using wholeTextFiles() method:

wholeTestFiles will return pairedRdd and here key will be file name and value will be content of that file and this runs on directory in which there are multiple files

filesPairedRDD = sc.wholeTextFiles("/home/deepak/data/")

Using textFile() method:

this will return rdd which contain data from across partitions

rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

Creating empty RDD:

rdd = spark.sparkContext.emptyRDD

Creating empty RDD with partition:

rdd2 = spark.sparkContext.parallelize([],10)

Getting no of partition from RDD

print("initial partition count:"+str(rdd2.getNumPartitions()))#Outputs: initial partition count:10

repartitions():

Repartitions method is used to increase or decrease partitions

>>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]

>>> rdd1 = spark.sparkContext.parallelize(data)

>>> rdd1.glom().collect()

[[10, 20, 20], [30, 40, 10], [40, 20, 20], [20, 20, 50, 99, 88]]

>>> print(“re-partition count:”+str(rdd1.getNumPartitions()))

re-partition count:4

>>> rdd1 = rdd1.repartition(5)

>>> rdd1.glom().collect()

[[], [10, 20, 20], [40, 20, 20, 20, 20, 50, 99, 88], [30, 40, 10], []]          

>>> rdd1.glom().collect()

[[], [10, 20, 20], [40, 20, 20, 20, 20, 50, 99, 88], [30, 40, 10], []]

>>> print(“re-partition count:”+str(rdd1.getNumPartitions()))

re-partition count:5

Colease(): is used to decrease partition

>>> rdd1.colease(2)

Traceback (most recent call last):

  File “<stdin>”, line 1, in <module>

AttributeError: ‘RDD’ object has no attribute ‘colease’

>>> rdd1.collect

rdd1.collect(             rdd1.collectAsMap(        rdd1.collectWithJobGroup(

>>> rdd1.coalesce(2)

CoalescedRDD[22] at coalesce at NativeMethodAccessorImpl.java:0

>>> rdd1.coalesce(2).glom().collect()

[[10, 20, 20], [30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88]]

>>> rdd1.coalesce(10).glom().collect()

[[10, 20, 20], [30, 40, 10], [], [40, 20, 20, 20, 20, 50, 99, 88], []]

>>> print(“re-partition count:”+str(rdd1.getNumPartitions()))

re-partition count:5

partitions vs coalesce:

coalesce uses existing partitions to minimize the amount of data that’s shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

Same can be seen in above example in coalease example section

Transformation RDD examples:

map():

In this example, we will map an RDD of Strings to an RDD of Integers with each element in the mapped RDD representing the number of words in the input RDD. The final mapping would be RDD<String> -> RDD<Integer>.

data/rdd/input/sample.txt

Welcome to TutorialKart
Learn Apache Spark
Learn to work with RDD
Example from file:
# read input text file to RDD
lines = sc.textFile("/home/arjun/workspace/spark/sample.txt")
# map lines to n_words
n_words = lines.map(lambda line : len(line.split()))
# collect the RDD to a list
llist = n_words.collect()
Example from list:
numbers = sc.parallelize([14,21,88,99,455])
  # map lines to n_words
  log_values = numbers.map(lambda n : math.log10(n))
  # collect the RDD to a list
  llist = log_values.collect()
rdd = spark.sparkContext.textFile("/tmp/test.txt")

flatMap – flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.

rdd2 = rdd.flatMap(lambda x: x.split(" "))

map – map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.

In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.

rdd3 = rdd2.map(lambda x: (x,1))

reduceByKey – reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count.

rdd5 = rdd4.reduceByKey(lambda a,b: a+b)

sortByKey – sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair

rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey()#Print rdd6 result to consoleprint(rdd6.collect())

filter – filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.

rdd4 = rdd3.filter(lambda x : 'an' in x[1])print(rdd4.collect())

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)