Pyspark RDD examples

from pyspark.sql import SparkSession
sc = SparkSession.builder.master(‘local[1]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
print(sc.version)

Output: 3.2.1

rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rddNum = rddNum.map(lambda x : x+10)
rddNum = rddNum.filter(lambda x : x % 2 == 0)
print(rddNum.reduce(lambda a,b : a+b))

Output: 80

nameRdd = sc.parallelize([‘Deepak’,’Simmi’,’Simran’,’Sukhwinder’,’Sanki’,’ShotTemper’])
rddNum = nameRdd.filter(lambda name : name.startswith(‘S’))
print(rddNum.collect())
rddNum = nameRdd.filter(lambda name : not name.startswith(‘S’))
print(rddNum.collect())

['Simmi', 'Simran', 'Sukhwinder', 'Sanki', 'ShotTemper']['Deepak']

#union example
rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10,30,21,45,23,22,77,44])
divisibleByTwo = rddNum.filter(lambda x : x%2 == 0)
divisibleByThree = rddNum.filter(lambda x : x%3 == 0)
print(divisibleByTwo.collect())
print(divisibleByThree.collect())
rddUnion = divisibleByTwo.union(divisibleByThree)
print(rddUnion.collect())

[2, 4, 6, 8, 10, 30, 22, 44][3, 6, 9, 30, 21, 45]
[2, 4, 6, 8, 10, 30, 22, 44, 3, 6, 9, 30, 21, 45]

rddLines = sc.parallelize([“I am not a good person because i did so mistakes in my life and i do not care about it”])
rddflatMap = rddLines.flatMap(lambda x : x.split(” “))
rddflatMap.collect()

['I', 'am', 'not', 'a', 'good', 'person', 'because', 'i', 'did', 'so', 'mistakes', 'in', 'my', 'life', 'and', 'i', 'do', 'not', 'care', 'about', 'it']

rddNameSalary = sc.parallelize([(‘deepak’,80),(‘deepak’,90),(‘simran’,100),(‘simran’,30),(‘kuhu’,100)])
print(rddNameSalary.collect())
print(rddNameSalary.reduceByKey(lambda v1,v2 : v1+v1).collect())
print(rddNameSalary.sortByKey(‘ascending’).collect())
grpRdd = rddNameSalary.groupByKey().collect()
for key,value in grpRdd:
print(key, list(value))

print(rddNameSalary.countByKey().items())

[('deepak', 80), ('deepak', 90), ('simran', 100), ('simran', 30), ('kuhu', 100)][('deepak', 160), ('simran', 200), ('kuhu', 100)][('deepak', 80), ('deepak', 90), ('kuhu', 100), ('simran', 100), ('simran', 30)]deepak [80, 90]simran [100, 30]kuhu [100]dict_items([('deepak', 2), ('simran', 2), ('kuhu', 1)])

rddLines = sc.parallelize([“I am not a good person because i did so mistakes in my life and i do not care about it”])
rddflatMap = rddLines.flatMap(lambda x : x.split(” “))
print(rddflatMap.collect())
mapRDD = rddflatMap.map(lambda x: (x,1))
print(mapRDD.collect())
print(mapRDD.reduceByKey(lambda x,y : x+y).collect())

['I', 'am', 'not', 'a', 'good', 'person', 'because', 'i', 'did', 'so', 'mistakes', 'in', 'my', 'life', 'and', 'i', 'do', 'not', 'care', 'about', 'it'][('I', 1), ('am', 1), ('not', 1), ('a', 1), ('good', 1), ('person', 1), ('because', 1), ('i', 1), ('did', 1), ('so', 1), ('mistakes', 1), ('in', 1), ('my', 1), ('life', 1), ('and', 1), ('i', 1), ('do', 1), ('not', 1), ('care', 1), ('about', 1), ('it', 1)][('good', 1), ('i', 2), ('in', 1), ('do', 1), ('a', 1), ('did', 1), ('mistakes', 1), ('life', 1), ('and', 1), ('am', 1), ('person', 1), ('I', 1), ('not', 2), ('because', 1), ('so', 1), ('my', 1), ('care', 1), ('about', 1), ('it', 1)]

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)