PySpark RDD advance examples

1.

from pyspark.sql import SparkSession

spark = SparkSession.builder.master(‘local[1]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
print(spark.version)

Output: 3.2.1

2.

# Creating RDD

lst = [(i) for i in range(1,10)]
print(lst)

listRdd = spark.sparkContext.parallelize(lst,3);
print(listRdd.glom().collect())
print(listRdd.count())

#creating empty RDD
emptyRDD = spark.sparkContext.emptyRDD()
print(type(emptyRDD))
print(“Empty RDD: “, emptyRDD.collect())

#emptyRDD with partition
emptyPartionedRDD = spark.sparkContext.parallelize([],2)
print(type(emptyPartionedRDD))
print(“Empty emptyPartionedRDD: “, emptyPartionedRDD.glom().collect())

listDoubledRdd = listRdd.map(lambda x: x*2)
print(“Output by map function:”,listDoubledRdd.collect())

Output:

[1, 2, 3, 4, 5, 6, 7, 8, 9][[1, 2, 3], [4, 5, 6], [7, 8, 9]]9<class 'pyspark.rdd.RDD'>Empty RDD:  []<class 'pyspark.rdd.RDD'>Empty emptyPartionedRDD:  [[], []]Output by map function: [2, 4, 6, 8, 10, 12, 14, 16, 18]

3.

# Create RDD using file

fileRDD = spark.sparkContext.textFile(“/Users/dpq/Python_Practice/input/input.txt”,5)
print(type(fileRDD))
print(fileRDD.count())
print(“num of partions: “, fileRDD.getNumPartitions())
#print(fileRDD.glom().collect())
for e in fileRDD.collect():
print(e)

Output:

<class 'pyspark.rdd.RDD'>5num of partions:  5In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we don’t have to worry about the parallelism as PySpark by default provides.This Apache PySpark RDD tutorial describes the basic operations available on RDDs, such as map(), filter(), and persist() and many more. In addition, this tutorial also explains Pair RDD functions that operate on RDDs of key-value pairs such as groupByKey() and join() etc.

4. Repartition vs Colease Example

umRDD = spark.sparkContext.parallelize([(x) for x in range(1,20)],5)

print(“main RDD with 5 partitions”)
for e in numRDD.glom().collect():
print(e)
repartitionedRDD = numRDD.repartition(2)

print(“main RDD with 2 repartitions”)
for e in repartitionedRDD.glom().collect():
print(e)

print(“main RDD with 2 coaleasedRDD”)
coaleasedRDD = numRDD.coalesce(2)
for e in coaleasedRDD.glom().collect():
print(e)

repartitionedRDD = numRDD.repartition(5)

print(“main RDD with 5 repartitions”)
for e in repartitionedRDD.glom().collect():
print(e)

Output:

main RDD with 5 partitions[1, 2, 3][4, 5, 6][7, 8, 9, 10, 11, 12][13, 14, 15][16, 17, 18, 19]main RDD with 2 repartitions[1, 2, 3, 7, 8, 9, 10, 11, 12, 13, 14, 15][4, 5, 6, 16, 17, 18, 19]main RDD with 2 coaleasedRDD[1, 2, 3, 4, 5, 6][7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]main RDD with 5 repartitions[][1, 2, 3][7, 8, 9, 10, 11, 12, 13, 14, 15][4, 5, 6][16, 17, 18, 19]

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)