Pyspark RDD examples

from pyspark.sql import SparkSession
sc = SparkSession.builder.master(‘local[1]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
print(sc.version)

Output: 3.2.1

rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rddNum = rddNum.map(lambda x : x+10)
rddNum = rddNum.filter(lambda x : x % 2 == 0)
print(rddNum.reduce(lambda a,b : a+b))

Output: 80

nameRdd = sc.parallelize([‘Deepak’,’Simmi’,’Simran’,’Sukhwinder’,’Sanki’,’ShotTemper’])
rddNum = nameRdd.filter(lambda name : name.startswith(‘S’))
print(rddNum.collect())
rddNum = nameRdd.filter(lambda name : not name.startswith(‘S’))
print(rddNum.collect())

['Simmi', 'Simran', 'Sukhwinder', 'Sanki', 'ShotTemper']['Deepak']

#union example
rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10,30,21,45,23,22,77,44])
divisibleByTwo = rddNum.filter(lambda x : x%2 == 0)
divisibleByThree = rddNum.filter(lambda x : x%3 == 0)
print(divisibleByTwo.collect())
print(divisibleByThree.collect())
rddUnion = divisibleByTwo.union(divisibleByThree)
print(rddUnion.collect())

[2, 4, 6, 8, 10, 30, 22, 44][3, 6, 9, 30, 21, 45]
[2, 4, 6, 8, 10, 30, 22, 44, 3, 6, 9, 30, 21, 45]

rddLines = sc.parallelize([“I am not a good person because i did so mistakes in my life and i do not care about it”])
rddflatMap = rddLines.flatMap(lambda x : x.split(” “))
rddflatMap.collect()

['I', 'am', 'not', 'a', 'good', 'person', 'because', 'i', 'did', 'so', 'mistakes', 'in', 'my', 'life', 'and', 'i', 'do', 'not', 'care', 'about', 'it']

rddNameSalary = sc.parallelize([(‘deepak’,80),(‘deepak’,90),(‘simran’,100),(‘simran’,30),(‘kuhu’,100)])
print(rddNameSalary.collect())
print(rddNameSalary.reduceByKey(lambda v1,v2 : v1+v1).collect())
print(rddNameSalary.sortByKey(‘ascending’).collect())
grpRdd = rddNameSalary.groupByKey().collect()
for key,value in grpRdd:
print(key, list(value))

print(rddNameSalary.countByKey().items())

[('deepak', 80), ('deepak', 90), ('simran', 100), ('simran', 30), ('kuhu', 100)][('deepak', 160), ('simran', 200), ('kuhu', 100)][('deepak', 80), ('deepak', 90), ('kuhu', 100), ('simran', 100), ('simran', 30)]deepak [80, 90]simran [100, 30]kuhu [100]dict_items([('deepak', 2), ('simran', 2), ('kuhu', 1)])

rddLines = sc.parallelize([“I am not a good person because i did so mistakes in my life and i do not care about it”])
rddflatMap = rddLines.flatMap(lambda x : x.split(” “))
print(rddflatMap.collect())
mapRDD = rddflatMap.map(lambda x: (x,1))
print(mapRDD.collect())
print(mapRDD.reduceByKey(lambda x,y : x+y).collect())

['I', 'am', 'not', 'a', 'good', 'person', 'because', 'i', 'did', 'so', 'mistakes', 'in', 'my', 'life', 'and', 'i', 'do', 'not', 'care', 'about', 'it'][('I', 1), ('am', 1), ('not', 1), ('a', 1), ('good', 1), ('person', 1), ('because', 1), ('i', 1), ('did', 1), ('so', 1), ('mistakes', 1), ('in', 1), ('my', 1), ('life', 1), ('and', 1), ('i', 1), ('do', 1), ('not', 1), ('care', 1), ('about', 1), ('it', 1)][('good', 1), ('i', 2), ('in', 1), ('do', 1), ('a', 1), ('did', 1), ('mistakes', 1), ('life', 1), ('and', 1), ('am', 1), ('person', 1), ('I', 1), ('not', 2), ('because', 1), ('so', 1), ('my', 1), ('care', 1), ('about', 1), ('it', 1)]

 

Popular posts from this blog

What is Garbage collection in Spark and its impact and resolution

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Window function in PySpark with Joins example using 2 Dataframes (inner join)