Posts

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

How to join 2 data frames in PySpark(inner join example)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“spark.sql.autoBroadcastJoin

Credit Card Data Analysis using PySpark desabling auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should not use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“s

Credit Card Data Analysis using PySpark auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: AQE auto broadcast using join Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) creditCardData = spark.read.json(“card_transactions.json”) userIdToAmountMapping = {}   us

Credit Card Data Analysis using PySpark (Get the category in which the user has made the maximum expenditure) without using PySpark window function

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the category in which the user has made the maximum expenditure Solution: from pyspark.sql.functions import col #5. Get the category in which the user has made the maximum expenditure # this can be done by using window function from pyspark.sql.window import Window from pyspark.sql.functions import col, row_number df = spark.read.json(“card_transactions.json”) #getting max amount spent by user print(df.groupby(‘user_id’).max(‘amount’).collect()) windowDept = Window.partition

Credit Card Data Analysis using PySpark (Get the category in which the user has made the maximum expenditure) using window function

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the category in which the user has made the maximum expenditure Solution: from pyspark.sql.window import Window from pyspark.sql.functions import col, row_number df = spark.read.json(“card_transactions.json”) #getting max amount spent by user print(df.groupby(‘user_id’).max(‘amount’).collect()) windowDept = Window.partitionBy(“user_id”).orderBy(col(“amount”).desc()) df.withColumn(“row”,row_number().over(windowDept)) \ .filter(col(“row”) == 1).drop(“row”) \ .show() Output: [R

Credit Card Data Analysis using PySpark (Get the distinct list of categories in which the user has made expenditure) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the distinct list of categories in which the user has made expenditure Solution: from pyspark.sql import functions as F cardTnDF = spark.read.json(“card_transactions.json”) #spark.read.json(“card_transactions.json”).show() #cardTnDF.groupBy(‘user_id’)[‘category’].apply(list) cardTnDF.groupby(‘user_id’).agg(F.collect_list(‘category’)).collect() Output: Row(user_id='U_102', collect_list(category)=['children', 'groceries', 'entertainment', 'c

Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their cards on each category) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user for each of their cards on each category cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.groupBy(‘user_id’,’card_num’,’category’).sum(‘amount’).collect() Output: [Row(user_id='U_104', card_num='C_107', category='cosmetics', sum(amount)=11818), Row(user_id='U_104', card_num='C_108', category='cosmetics', sum(amount)=9522), Row(user_id='U_103', card_num='C_105', ca