Posts

Showing posts with the label PySpark

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

How to join 2 data frames in PySpark(inner join example)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“spark.sql.autoBroadcastJoin

Credit Card Data Analysis using PySpark desabling auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should not use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“s

Credit Card Data Analysis using PySpark auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: AQE auto broadcast using join Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) creditCardData = spark.read.json(“card_transactions.json”) userIdToAmountMapping = {}   us

Credit Card Data Analysis using PySpark (Get the category in which the user has made the maximum expenditure) without using PySpark window function

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the category in which the user has made the maximum expenditure Solution: from pyspark.sql.functions import col #5. Get the category in which the user has made the maximum expenditure # this can be done by using window function from pyspark.sql.window import Window from pyspark.sql.functions import col, row_number df = spark.read.json(“card_transactions.json”) #getting max amount spent by user print(df.groupby(‘user_id’).max(‘amount’).collect()) windowDept = Window.partition

Credit Card Data Analysis using PySpark (Get the category in which the user has made the maximum expenditure) using window function

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the category in which the user has made the maximum expenditure Solution: from pyspark.sql.window import Window from pyspark.sql.functions import col, row_number df = spark.read.json(“card_transactions.json”) #getting max amount spent by user print(df.groupby(‘user_id’).max(‘amount’).collect()) windowDept = Window.partitionBy(“user_id”).orderBy(col(“amount”).desc()) df.withColumn(“row”,row_number().over(windowDept)) \ .filter(col(“row”) == 1).drop(“row”) \ .show() Output: [R

Credit Card Data Analysis using PySpark (Get the distinct list of categories in which the user has made expenditure) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the distinct list of categories in which the user has made expenditure Solution: from pyspark.sql import functions as F cardTnDF = spark.read.json(“card_transactions.json”) #spark.read.json(“card_transactions.json”).show() #cardTnDF.groupBy(‘user_id’)[‘category’].apply(list) cardTnDF.groupby(‘user_id’).agg(F.collect_list(‘category’)).collect() Output: Row(user_id='U_102', collect_list(category)=['children', 'groceries', 'entertainment', 'c

Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their cards on each category) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user for each of their cards on each category cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.groupBy(‘user_id’,’card_num’,’category’).sum(‘amount’).collect() Output: [Row(user_id='U_104', card_num='C_107', category='cosmetics', sum(amount)=11818), Row(user_id='U_104', card_num='C_108', category='cosmetics', sum(amount)=9522), Row(user_id='U_103', card_num='C_105', ca

Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their card) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user for each of their card Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.groupBy(‘user_id’,’card_num’).sum(‘amount’).show() Output: [Row(user_id='U_101', card_num='C_102', sum(amount)=59203), Row(user_id='U_104', card_num='C_108', sum(amount)=54728), Row(user_id='U_101', card_num='C_101', sum(amount)=66581), Row(user_id='U_104', card_num='C_107', sum(amou

Credit Card Data Analysis using PySpark (Get the total amount spent by each user) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.show(3) cardTnDF.groupBy(‘user_id’).sum(‘amount’).collect() Output: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children|

Analyse Weather input Dataset and apply Aggregation RDD functions

#Input weather dataset fileRdd = sc.sparkContext.textFile(‘weather.csv’) fileRdd.collect() Output: ['2016-05-09,234893,34', '2019-09-08,234896,3', '2019-11-19,234895,24', '2017-04-04,234900,43', '2013-12-04,234900,47', '2019-08-28,234894,5', '2013-11-29,234897,-5', '2018-08-19,234895,40', '2019-06-06,234890,9', '2017-02-09,234900,21', '2017-03-30,234893,36', '2019-01-01,234895,-9', '2010-12-23,234898,-1', '2015-09-03,234890,13', '2011-07-19,234898,25', '2014-09-27,234897,29', '2013-11-18,234891,-9', '2012-02-09,234893,10', '2014-07-03,234897,29', '2011-11-07,234895,38', '2014-02-14,234891,24', '2012-02-18,234893,5', '2010-01-31,234896,-8', '2015-08-19,234890,-7', '2017-03-26,234891,-1', '2011-04-23,234894,23', '2014-09-15,234898,-8', '2011-06-16,234890,33', '201

RDD joins using PySpark examples

Full Outer Join: Final result will return all records from both RDDs   rdd1 = sc.parallelize([(101,’John’),(102,’Matthew’),(103,’Aakash’),(104,’Sandeep’) ,(105,’Sakura’),(106,’Abed’),(107,’Mary’),(108,’Kamla’)],2) rdd2 = sc.parallelize([(101,’USA’),(108,’UK’),(105,’Japan’),(102,’Sri Lanka’) ,(103,’India’),(104,’UAE’),(107,’Germany’),(110,’Australia’)],2) #print(rdd1.glom().collect()) #print(rdd2.glom().collect()) fullOuterJoin = rdd1.fullOuterJoin(rdd2) print(fullOuterJoin.collect()) Output: [(104, ('Sandeep', 'UAE')), (108, ('Kamla', 'UK')), (101, ('John', 'USA')), (105, ('Sakura', 'Japan')), (102, ('Matthew', 'Sri Lanka')), (106, ('Abed', None)), (110, (None, 'Australia')), (103, ('Aakash', 'India')), (107, ('Mary', 'Germany'))]   Inner Join: Final result will return matching records from both RDDs rdd1 = sc.parallelize([(101,’John’),(102,’Matthew’),(103,’Aa

PySpark RDD advance examples

1. from pyspark.sql import SparkSession spark = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.version) Output: 3.2.1 2. # Creating RDD lst = [(i) for i in range(1,10)] print(lst) listRdd = spark.sparkContext.parallelize(lst,3); print(listRdd.glom().collect()) print(listRdd.count()) #creating empty RDD emptyRDD = spark.sparkContext.emptyRDD() print(type(emptyRDD)) print(“Empty RDD: “, emptyRDD.collect()) #emptyRDD with partition emptyPartionedRDD = spark.sparkContext.parallelize([],2) print(type(emptyPartionedRDD)) print(“Empty emptyPartionedRDD: “, emptyPartionedRDD.glom().collect()) listDoubledRdd = listRdd.map(lambda x: x*2) print(“Output by map function:”,listDoubledRdd.collect()) Output: [1, 2, 3, 4, 5, 6, 7, 8, 9][[1, 2, 3], [4, 5, 6], [7, 8, 9]]9<class 'pyspark.rdd.RDD'>Empty RDD: []<class 'pyspark.rdd.RDD'>Empty emptyPartionedRDD: [[], []]Output by map function: [2, 4, 6, 8, 10, 12, 14, 16, 18]

Pyspark RDD examples

from pyspark.sql import SparkSession sc = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(sc.version) Output: 3.2.1 rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10]) rddNum = rddNum.map(lambda x : x+10) rddNum = rddNum.filter(lambda x : x % 2 == 0) print(rddNum.reduce(lambda a,b : a+b)) Output: 80 nameRdd = sc.parallelize([‘Deepak’,’Simmi’,’Simran’,’Sukhwinder’,’Sanki’,’ShotTemper’]) rddNum = nameRdd.filter(lambda name : name.startswith(‘S’)) print(rddNum.collect()) rddNum = nameRdd.filter(lambda name : not name.startswith(‘S’)) print(rddNum.collect()) ['Simmi', 'Simran', 'Sukhwinder', 'Sanki', 'ShotTemper']['Deepak'] #union example rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10,30,21,45,23,22,77,44]) divisibleByTwo = rddNum.filter(lambda x : x%2 == 0) divisibleByThree = rddNum.filter(lambda x : x%3 == 0) print(divisibleByTwo.collect()) print(divisibleByThree.collect()) rddUnion = divisibleByTw

RDD examples in PySpark

Note: All examples run from PySpark Console Creating RDD from List: >>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88] >>> rdd1 = spark.sparkContext.parallelize(data) >>> rdd1.glom().collect() [[10, 20, 20], [30, 40, 10], [40, 20, 20], [20, 20, 50, 99, 88]] glom method on rdd will return data in partitions >>> rdd1.collect() [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88] Creating RDD with pre defined partitions >>> data = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50, 99, 88] >>> rdd1 = spark.sparkContext.parallelize(data, 5) Creating RDD from file: There are two ways to create RDD from file Using wholeTextFiles() method: wholeTestFiles will return pairedRdd and here key will be file name and value will be content of that file and this runs on directory in which there are multiple files filesPairedRDD = sc.wholeTextFiles("/home/deepak/data/") Using textFile() method: this will return rdd which

All about Broadcast Variable and How to use it

In this post , we will see – How to use Broadcast Variable in Spark . Broadcast variables can be tricky if the concepts behind are not clearly understood. This creates errors while using any Broadcast variables down the line. Broadcast variables are used to implement map-side join, i.e. a join using a map. e.g.. Lookup tables or data are distributed across nodes in a Distributed cluster using broadcast . And they are then used inside map (to do the join implicitly). When you broadcast some data , the data gets copied to All the executors only once (So we avoid copying the same data again & again for tasks otherwise). Hence the broadcast  makes  your Spark application faster when you have a large value to use in tasks or there are more no. of tasks than executors. To use any Broadcast variables correctly , note the below points and cross-check against your  usage  . Broadcast Type errors – A broadcast variable is not necessarily an RDD or a Collection. It’s just whatever type you as