Credit Card Data Analysis using PySpark desabling auto broadcast explaination

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should not use broadcast join

Solution:

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.master(‘local[2]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
#print(spark.version)

print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)
print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
creditCardData = spark.read.json(“card_transactions.json”)
userIdToAmountMapping = {}

useridMaxSpendDF = creditCardData.groupby(‘user_id’).max(‘amount’)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”)
cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount]
joinedData = creditCardData.join(useridMaxSpendDF,cond,”inner”)
joinedData.show()

-1-1+------+--------+-------------+--------+----------+-------+---------+----------+|amount|card_num|     category|merchant|        ts|user_id|m_user_id|max_amount|+------+--------+-------------+--------+----------+-------+---------+----------+|  1000|   C_101|entertainment|   M_100|1580163399|  U_101|    U_101|      1000||   997|   C_103|    groceries|   M_103|1582876481|  U_102|    U_102|       997||   977|   C_104|    groceries|   M_101|1579402924|  U_103|    U_103|       977||   977|   C_105|         food|   M_102|1581369586|  U_103|    U_103|       977||   977|   C_106|         food|   M_100|1580897199|  U_103|    U_103|       977||   996|   C_108|         food|   M_106|1581391534|  U_104|    U_104|       996||   996|   C_107|     children|   M_107|1580776821|  U_104|    U_104|       996|+------+--------+-------------+--------+----------+-------+---------+----------+
== Physical Plan ==AdaptiveSparkPlan (35)+- == Final Plan ==   CollectLimit (21)   +- * Project (20)      +- * SortMergeJoin Inner (19)         :- * Sort (6)         :  +- AQEShuffleRead (5)         :     +- ShuffleQueryStage (4)         :        +- Exchange (3)         :           +- * Filter (2)         :              +- Scan json  (1)         +- * Sort (18)            +- AQEShuffleRead (17)               +- ShuffleQueryStage (16)                  +- Exchange (15)                     +- * Filter (14)                        +- * HashAggregate (13)                           +- AQEShuffleRead (12)                              +- ShuffleQueryStage (11)                                 +- Exchange (10)                                    +- * HashAggregate (9)                                       +- * Filter (8)                                          +- Scan json  (7)+- == Initial Plan ==   CollectLimit (34)   +- Project (33)      +- SortMergeJoin Inner (32)         :- Sort (24)         :  +- Exchange (23)         :     +- Filter (22)         :        +- Scan json  (1)         +- Sort (31)            +- Exchange (30)               +- Filter (29)                  +- HashAggregate (28)                     +- Exchange (27)                        +- HashAggregate (26)                           +- Filter (25)                              +- Scan json  (7)(1) Scan json Output [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id), IsNotNull(amount)]ReadSchema: struct<amount:bigint,card_num:string,category:string,merchant:string,ts:bigint,user_id:string>(2) Filter [codegen id : 1]Input [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Condition : (isnotnull(user_id#1195) AND isnotnull(amount#1190L))(3) ExchangeInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: hashpartitioning(user_id#1195, amount#1190L, 200), ENSURE_REQUIREMENTS, [id=#2033](4) ShuffleQueryStageOutput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: 0(5) AQEShuffleReadInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: coalesced(6) Sort [codegen id : 4]Input [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: [user_id#1195 ASC NULLS FIRST, amount#1190L ASC NULLS FIRST], false, 0(7) Scan json Output [2]: [amount#1218L, user_id#1223]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id)]ReadSchema: struct<amount:bigint,user_id:string>(8) Filter [codegen id : 2]Input [2]: [amount#1218L, user_id#1223]Condition : isnotnull(user_id#1223)(9) HashAggregate [codegen id : 2]Input [2]: [amount#1218L, user_id#1223]Keys [1]: [user_id#1223]Functions [1]: [partial_max(amount#1218L)]Aggregate Attributes [1]: [max#1264L]Results [2]: [user_id#1223, max#1265L](10) ExchangeInput [2]: [user_id#1223, max#1265L]Arguments: hashpartitioning(user_id#1223, 200), ENSURE_REQUIREMENTS, [id=#2059](11) ShuffleQueryStageOutput [2]: [user_id#1223, max#1265L]Arguments: 1(12) AQEShuffleReadInput [2]: [user_id#1223, max#1265L]Arguments: coalesced(13) HashAggregate [codegen id : 3]Input [2]: [user_id#1223, max#1265L]Keys [1]: [user_id#1223]Functions [1]: [max(amount#1218L)]Aggregate Attributes [1]: [max(amount#1218L)#1208L]Results [2]: [user_id#1223 AS m_user_id#1215, max(amount#1218L)#1208L AS max_amount#1212L](14) Filter [codegen id : 3]Input [2]: [m_user_id#1215, max_amount#1212L]Condition : isnotnull(max_amount#1212L)(15) ExchangeInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: hashpartitioning(m_user_id#1215, max_amount#1212L, 200), ENSURE_REQUIREMENTS, [id=#2111](16) ShuffleQueryStageOutput [2]: [m_user_id#1215, max_amount#1212L]Arguments: 2(17) AQEShuffleReadInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: coalesced(18) Sort [codegen id : 5]Input [2]: [m_user_id#1215, max_amount#1212L]Arguments: [m_user_id#1215 ASC NULLS FIRST, max_amount#1212L ASC NULLS FIRST], false, 0(19) SortMergeJoin [codegen id : 6]Left keys [2]: [user_id#1195, amount#1190L]Right keys [2]: [m_user_id#1215, max_amount#1212L]Join condition: None(20) Project [codegen id : 6]Output [8]: [cast(amount#1190L as string) AS amount#1248, card_num#1191, category#1192, merchant#1193, cast(ts#1194L as string) AS ts#1252, user_id#1195, m_user_id#1215, cast(max_amount#1212L as string) AS max_amount#1255]Input [8]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195, m_user_id#1215, max_amount#1212L](21) CollectLimitInput [8]: [amount#1248, card_num#1191, category#1192, merchant#1193, ts#1252, user_id#1195, m_user_id#1215, max_amount#1255]Arguments: 21(22) FilterInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Condition : (isnotnull(user_id#1195) AND isnotnull(amount#1190L))(23) ExchangeInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: hashpartitioning(user_id#1195, amount#1190L, 200), ENSURE_REQUIREMENTS, [id=#2018](24) SortInput [6]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195]Arguments: [user_id#1195 ASC NULLS FIRST, amount#1190L ASC NULLS FIRST], false, 0(25) FilterInput [2]: [amount#1218L, user_id#1223]Condition : isnotnull(user_id#1223)(26) HashAggregateInput [2]: [amount#1218L, user_id#1223]Keys [1]: [user_id#1223]Functions [1]: [partial_max(amount#1218L)]Aggregate Attributes [1]: [max#1264L]Results [2]: [user_id#1223, max#1265L](27) ExchangeInput [2]: [user_id#1223, max#1265L]Arguments: hashpartitioning(user_id#1223, 200), ENSURE_REQUIREMENTS, [id=#2013](28) HashAggregateInput [2]: [user_id#1223, max#1265L]Keys [1]: [user_id#1223]Functions [1]: [max(amount#1218L)]Aggregate Attributes [1]: [max(amount#1218L)#1208L]Results [2]: [user_id#1223 AS m_user_id#1215, max(amount#1218L)#1208L AS max_amount#1212L](29) FilterInput [2]: [m_user_id#1215, max_amount#1212L]Condition : isnotnull(max_amount#1212L)(30) ExchangeInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: hashpartitioning(m_user_id#1215, max_amount#1212L, 200), ENSURE_REQUIREMENTS, [id=#2019](31) SortInput [2]: [m_user_id#1215, max_amount#1212L]Arguments: [m_user_id#1215 ASC NULLS FIRST, max_amount#1212L ASC NULLS FIRST], false, 0(32) SortMergeJoinLeft keys [2]: [user_id#1195, amount#1190L]Right keys [2]: [m_user_id#1215, max_amount#1212L]Join condition: None(33) ProjectOutput [8]: [cast(amount#1190L as string) AS amount#1248, card_num#1191, category#1192, merchant#1193, cast(ts#1194L as string) AS ts#1252, user_id#1195, m_user_id#1215, cast(max_amount#1212L as string) AS max_amount#1255]Input [8]: [amount#1190L, card_num#1191, category#1192, merchant#1193, ts#1194L, user_id#1195, m_user_id#1215, max_amount#1212L](34) CollectLimitInput [8]: [amount#1248, card_num#1191, category#1192, merchant#1193, ts#1252, user_id#1195, m_user_id#1215, max_amount#1255]Arguments: 21(35) AdaptiveSparkPlanOutput [8]: [amount#1248, card_num#1191, category#1192, merchant#1193, ts#1252, user_id#1195, m_user_id#1215, max_amount#1255]Arguments: isFinalPlan=true

Popular posts from this blog

In detail Accumulators explanation in Spark (Java and Scala)

Hive joins in details with examples

RDD joins using PySpark examples