Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)
Input details:
#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done
Sample data:
+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103|
Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join
Solution:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.master(‘local[2]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)
print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
creditCardData = spark.read.json(“card_transactions.json”)
userIdToAmountMapping = {}
useridMaxSpendDF = creditCardData.groupby(‘user_id’).max(‘amount’)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”)
cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount]
joinedData = creditCardData.join(broadcast(useridMaxSpendDF),cond,”inner”)
joinedData.show()
-1-1+------+--------+-------------+--------+----------+-------+---------+----------+|amount|card_num| category|merchant| ts|user_id|m_user_id|max_amount|+------+--------+-------------+--------+----------+-------+---------+----------+| 1000| C_101|entertainment| M_100|1580163399| U_101| U_101| 1000|| 997| C_103| groceries| M_103|1582876481| U_102| U_102| 997|| 977| C_104| groceries| M_101|1579402924| U_103| U_103| 977|| 977| C_105| food| M_102|1581369586| U_103| U_103| 977|| 977| C_106| food| M_100|1580897199| U_103| U_103| 977|| 996| C_108| food| M_106|1581391534| U_104| U_104| 996|| 996| C_107| children| M_107|1580776821| U_104| U_104| 996|+------+--------+-------------+--------+----------+-------+---------+----------+
== Physical Plan ==AdaptiveSparkPlan (26)+- == Final Plan == CollectLimit (15) +- * Project (14) +- * BroadcastHashJoin Inner BuildRight (13) :- * Filter (2) : +- Scan json (1) +- BroadcastQueryStage (12) +- BroadcastExchange (11) +- * Filter (10) +- * HashAggregate (9) +- AQEShuffleRead (8) +- ShuffleQueryStage (7) +- Exchange (6) +- * HashAggregate (5) +- * Filter (4) +- Scan json (3)+- == Initial Plan == CollectLimit (25) +- Project (24) +- BroadcastHashJoin Inner BuildRight (23) :- Filter (16) : +- Scan json (1) +- BroadcastExchange (22) +- Filter (21) +- HashAggregate (20) +- Exchange (19) +- HashAggregate (18) +- Filter (17) +- Scan json (3)(1) Scan json Output [6]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id), IsNotNull(amount)]ReadSchema: struct<amount:bigint,card_num:string,category:string,merchant:string,ts:bigint,user_id:string>(2) Filter [codegen id : 3]Input [6]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299]Condition : (isnotnull(user_id#1299) AND isnotnull(amount#1294L))(3) Scan json Output [2]: [amount#1322L, user_id#1327]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id)]ReadSchema: struct<amount:bigint,user_id:string>(4) Filter [codegen id : 1]Input [2]: [amount#1322L, user_id#1327]Condition : isnotnull(user_id#1327)(5) HashAggregate [codegen id : 1]Input [2]: [amount#1322L, user_id#1327]Keys [1]: [user_id#1327]Functions [1]: [partial_max(amount#1322L)]Aggregate Attributes [1]: [max#1368L]Results [2]: [user_id#1327, max#1369L](6) ExchangeInput [2]: [user_id#1327, max#1369L]Arguments: hashpartitioning(user_id#1327, 200), ENSURE_REQUIREMENTS, [id=#2250](7) ShuffleQueryStageOutput [2]: [user_id#1327, max#1369L]Arguments: 0(8) AQEShuffleReadInput [2]: [user_id#1327, max#1369L]Arguments: coalesced(9) HashAggregate [codegen id : 2]Input [2]: [user_id#1327, max#1369L]Keys [1]: [user_id#1327]Functions [1]: [max(amount#1322L)]Aggregate Attributes [1]: [max(amount#1322L)#1312L]Results [2]: [user_id#1327 AS m_user_id#1319, max(amount#1322L)#1312L AS max_amount#1316L](10) Filter [codegen id : 2]Input [2]: [m_user_id#1319, max_amount#1316L]Condition : isnotnull(max_amount#1316L)(11) BroadcastExchangeInput [2]: [m_user_id#1319, max_amount#1316L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#2303](12) BroadcastQueryStageOutput [2]: [m_user_id#1319, max_amount#1316L]Arguments: 1(13) BroadcastHashJoin [codegen id : 3]Left keys [2]: [user_id#1299, amount#1294L]Right keys [2]: [m_user_id#1319, max_amount#1316L]Join condition: None(14) Project [codegen id : 3]Output [8]: [cast(amount#1294L as string) AS amount#1352, card_num#1295, category#1296, merchant#1297, cast(ts#1298L as string) AS ts#1356, user_id#1299, m_user_id#1319, cast(max_amount#1316L as string) AS max_amount#1359]Input [8]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299, m_user_id#1319, max_amount#1316L](15) CollectLimitInput [8]: [amount#1352, card_num#1295, category#1296, merchant#1297, ts#1356, user_id#1299, m_user_id#1319, max_amount#1359]Arguments: 21(16) FilterInput [6]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299]Condition : (isnotnull(user_id#1299) AND isnotnull(amount#1294L))(17) FilterInput [2]: [amount#1322L, user_id#1327]Condition : isnotnull(user_id#1327)(18) HashAggregateInput [2]: [amount#1322L, user_id#1327]Keys [1]: [user_id#1327]Functions [1]: [partial_max(amount#1322L)]Aggregate Attributes [1]: [max#1368L]Results [2]: [user_id#1327, max#1369L](19) ExchangeInput [2]: [user_id#1327, max#1369L]Arguments: hashpartitioning(user_id#1327, 200), ENSURE_REQUIREMENTS, [id=#2226](20) HashAggregateInput [2]: [user_id#1327, max#1369L]Keys [1]: [user_id#1327]Functions [1]: [max(amount#1322L)]Aggregate Attributes [1]: [max(amount#1322L)#1312L]Results [2]: [user_id#1327 AS m_user_id#1319, max(amount#1322L)#1312L AS max_amount#1316L](21) FilterInput [2]: [m_user_id#1319, max_amount#1316L]Condition : isnotnull(max_amount#1316L)(22) BroadcastExchangeInput [2]: [m_user_id#1319, max_amount#1316L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#2230](23) BroadcastHashJoinLeft keys [2]: [user_id#1299, amount#1294L]Right keys [2]: [m_user_id#1319, max_amount#1316L]Join condition: None(24) ProjectOutput [8]: [cast(amount#1294L as string) AS amount#1352, card_num#1295, category#1296, merchant#1297, cast(ts#1298L as string) AS ts#1356, user_id#1299, m_user_id#1319, cast(max_amount#1316L as string) AS max_amount#1359]Input [8]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299, m_user_id#1319, max_amount#1316L](25) CollectLimitInput [8]: [amount#1352, card_num#1295, category#1296, merchant#1297, ts#1356, user_id#1299, m_user_id#1319, max_amount#1359]Arguments: 21(26) AdaptiveSparkPlanOutput [8]: [amount#1352, card_num#1295, category#1296, merchant#1297, ts#1356, user_id#1299, m_user_id#1319, max_amount#1359]Arguments: isFinalPlan=true