Credit Card Data Analysis using PySpark desabling auto broadcast explaination

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should not use broadcast join

Solution:

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.master(‘local[2]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
#print(spark.version)

print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)
print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
creditCardData = spark.read.json(“card_transactions.json”)
userIdToAmountMapping = {}

useridMaxSpendDF = creditCardData.groupby(‘user_id’).max(‘amount’)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”)
cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount]
joinedData = creditCardData.join(useridMaxSpendDF,cond,”inner”)
joinedData.show()

-1-1+------+--------+-------------+--------+----------+-------+---------+----------+|amount|card_num|     category|merchant|        ts|user_id|m_user_id|max_amount|+------+--------+-------------+--------+----------+-------+---------+----------+|  1000|   C_101|entertainment|   M_100|1580163399|  U_101|    U_101|      1000||   997|   C_103|    groceries|   M_103|1582876481|  U_102|    U_102|       997||   977|   C_104|    groceries|   M_101|1579402924|  U_103|    U_103|       977||   977|   C_105|         food|   M_102|1581369586|  U_103|    U_103|       977||   977|   C_106|         food|   M_100|1580897199|  U_103|    U_103|       977||   996|   C_108|         food|   M_106|1581391534|  U_104|    U_104|       996||   996|   C_107|     children|   M_107|1580776821|  U_104|    U_104|       996|+------+--------+-------------+--------+----------+-------+---------+----------+
== Physical Plan ==AdaptiveSparkPlan (35)+- == Final Plan ==   CollectLimit (21)   +- * Project (20)      +- * SortMergeJoin Inner (19)         :- * Sort (6)         :  +- AQEShuffleRead (5)         :     +- ShuffleQueryStage (4)         :        +- Exchange (3)         :           +- * Filter (2)         :              +- Scan json  (1)         +- * Sort (18)            +- AQEShuffleRead (17)               +- ShuffleQueryStage (16)                  +- Exchange (15)                     +- * Filter (14)                        +- * HashAggregate (13)                           +- AQEShuffleRead (12)                              +- ShuffleQueryStage (11)                                 +- Exchange (10)                                    +- * HashAggregate (9)                                       +- * Filter (8)                                          +- Scan json  (7)+- == Initial Plan ==   CollectLimit (34)   +- Project (33)      +- SortMergeJoin Inner (32)         :- Sort (24)         :  +- Exchange (23)         :     +- Filter (22)         :        +- Scan json  (1)         +- Sort (31)            +- Exchange (30)               +- Filter (29)                  +- HashAggregate (28)                     +- Exchange (27)                        +- HashAggregate (26)                           +- Filter (25)                              +- Scan json  (7)(1) Scan json Output [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id), IsNotNull(amount)]ReadSchema: struct<amount:bigint,card_num:string,category:string,merchant:string,ts:bigint,user_id:string>(2) Filter [codegen id : 1]Input [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Condition : (isnotnull(user_id#1195) AND isnotnull(amount#1190L))(3) ExchangeInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: hashpartitioning(user_id#1195, amount#1190L, 200), ENSURE_REQUIREMENTS, [id=#2033](4) ShuffleQueryStageOutput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: 0(5) AQEShuffleReadInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: coalesced(6) Sort [codegen id : 4]Input [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: [user_id#1195 ASC NULLS FIRST, amount#1190L ASC NULLS FIRST], false, 0(7) Scan json Output [2]: [amount#1218L, user_id#1223]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id)]ReadSchema: struct<amount:bigint,user_id:string>(8) Filter [codegen id : 2]Input [2]: [amount#1218L, user_id#1223]Condition : isnotnull(user_id#1223)(9) HashAggregate [codegen id : 2]Input [2]: [amount#1218L, user_id#1223]Keys [1]: [user_id#1223]Functions [1]: [partial_max(amount#1218L)]Aggregate Attributes [1]: [max#1264L]Results [2]: [user_id#1223, max#1265L](10) ExchangeInput [2]: [user_id#1223, max#1265L]Arguments: hashpartitioning(user_id#1223, 200), ENSURE_REQUIREMENTS, [id=#2059](11) ShuffleQueryStageOutput [2]: [user_id#1223, max#1265L]Arguments: 1(12) AQEShuffleReadInput [2]: [user_id#1223, max#1265L]Arguments: coalesced(13) HashAggregate [codegen id : 3]Input [2]: [user_id#1223, max#1265L]Keys [1]: [user_id#1223]Functions [1]: [max(amount#1218L)]Aggregate Attributes [1]: [max(amount#1218L)#1208L]Results [2]: [user_id#1223 AS m_user_id#1215, max(amount#1218L)#1208L AS max_amount#1212L](14) Filter [codegen id : 3]Input [2]: [m_user_id#1215, max_amount#1212L]Condition : isnotnull(max_amount#1212L)(15) ExchangeInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: hashpartitioning(m_user_id#1215, max_amount#1212L, 200), ENSURE_REQUIREMENTS, [id=#2111](16) ShuffleQueryStageOutput [2]: [m_user_id#1215, max_amount#1212L]Arguments: 2(17) AQEShuffleReadInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: coalesced(18) Sort [codegen id : 5]Input [2]: [m_user_id#1215, max_amount#1212L]Arguments: [m_user_id#1215 ASC NULLS FIRST, max_amount#1212L ASC NULLS FIRST], false, 0(19) SortMergeJoin [codegen id : 6]Left keys [2]: [user_id#1195, amount#1190L]Right keys [2]: [m_user_id#1215, max_amount#1212L]Join condition: None(20) Project [codegen id : 6]Output [8]: [cast(amount#1190L as string) AS amount#1248, card_num#1191, category#1192, merchant#1193, cast(ts#1194L as string) AS ts#1252, user_id#1195, m_user_id#1215, cast(max_amount#1212L as string) AS max_amount#1255]Input [8]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195, m_user_id#1215, max_amount#1212L](21) CollectLimitInput [8]: [amount#1248, card_num#1191, category#1192, merchant#1193, ts#1252, user_id#1195, m_user_id#1215, max_amount#1255]Arguments: 21(22) FilterInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Condition : (isnotnull(user_id#1195) AND isnotnull(amount#1190L))(23) ExchangeInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: hashpartitioning(user_id#1195, amount#1190L, 200), ENSURE_REQUIREMENTS, [id=#2018](24) SortInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: [user_id#1195 ASC NULLS FIRST, amount#1190L ASC NULLS FIRST], false, 0(25) FilterInput [2]: [amount#1218L, user_id#1223]Condition : isnotnull(user_id#1223)(26) HashAggregateInput [2]: [amount#1218L, user_id#1223]Keys [1]: [user_id#1223]Functions [1]: [partial_max(amount#1218L)]Aggregate Attributes [1]: [max#1264L]Results [2]: [user_id#1223, max#1265L](27) ExchangeInput [2]: [user_id#1223, max#1265L]Arguments: hashpartitioning(user_id#1223, 200), ENSURE_REQUIREMENTS, [id=#2013](28) HashAggregateInput [2]: [user_id#1223, max#1265L]Keys [1]: [user_id#1223]Functions [1]: [max(amount#1218L)]Aggregate Attributes [1]: [max(amount#1218L)#1208L]Results [2]: [user_id#1223 AS m_user_id#1215, max(amount#1218L)#1208L AS max_amount#1212L](29) FilterInput [2]: [m_user_id#1215, max_amount#1212L]Condition : isnotnull(max_amount#1212L)(30) ExchangeInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: hashpartitioning(m_user_id#1215, max_amount#1212L, 200), ENSURE_REQUIREMENTS, [id=#2019](31) SortInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: [m_user_id#1215 ASC NULLS FIRST, max_amount#1212L ASC NULLS FIRST], false, 0(32) SortMergeJoinLeft keys [2]: [user_id#1195, amount#1190L]Right keys [2]: [m_user_id#1215, max_amount#1212L]Join condition: None(33) ProjectOutput [8]: [cast(amount#1190L as string) AS amount#1248, card_num#1191, category#1192, merchant#1193, cast(ts#1194L as string) AS ts#1252, user_id#1195, m_user_id#1215, cast(max_amount#1212L as string) AS max_amount#1255]Input [8]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195, m_user_id#1215, max_amount#1212L](34) CollectLimitInput [8]: [amount#1248, card_num#1191, category#1192, merchant#1193, ts#1252, user_id#1195, m_user_id#1215, max_amount#1255]Arguments: 21(35) AdaptiveSparkPlanOutput [8]: [amount#1248, card_num#1191, category#1192, merchant#1193, ts#1252, user_id#1195, m_user_id#1215, max_amount#1255]Arguments: isFinalPlan=true

Popular posts from this blog

What is Garbage collection in Spark and its impact and resolution

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Window function in PySpark with Joins example using 2 Dataframes (inner join)