Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join

Solution:

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.master(‘local[2]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()

print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)
print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
creditCardData = spark.read.json(“card_transactions.json”)
userIdToAmountMapping = {}

useridMaxSpendDF = creditCardData.groupby(‘user_id’).max(‘amount’)

useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”)
cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount]
joinedData = creditCardData.join(broadcast(useridMaxSpendDF),cond,”inner”)
joinedData.show()

-1-1+------+--------+-------------+--------+----------+-------+---------+----------+|amount|card_num|     category|merchant|        ts|user_id|m_user_id|max_amount|+------+--------+-------------+--------+----------+-------+---------+----------+|  1000|   C_101|entertainment|   M_100|1580163399|  U_101|    U_101|      1000||   997|   C_103|    groceries|   M_103|1582876481|  U_102|    U_102|       997||   977|   C_104|    groceries|   M_101|1579402924|  U_103|    U_103|       977||   977|   C_105|         food|   M_102|1581369586|  U_103|    U_103|       977||   977|   C_106|         food|   M_100|1580897199|  U_103|    U_103|       977||   996|   C_108|         food|   M_106|1581391534|  U_104|    U_104|       996||   996|   C_107|     children|   M_107|1580776821|  U_104|    U_104|       996|+------+--------+-------------+--------+----------+-------+---------+----------+
== Physical Plan ==AdaptiveSparkPlan (26)+- == Final Plan ==   CollectLimit (15)   +- * Project (14)      +- * BroadcastHashJoin Inner BuildRight (13)         :- * Filter (2)         :  +- Scan json  (1)         +- BroadcastQueryStage (12)            +- BroadcastExchange (11)               +- * Filter (10)                  +- * HashAggregate (9)                     +- AQEShuffleRead (8)                        +- ShuffleQueryStage (7)                           +- Exchange (6)                              +- * HashAggregate (5)                                 +- * Filter (4)                                    +- Scan json  (3)+- == Initial Plan ==   CollectLimit (25)   +- Project (24)      +- BroadcastHashJoin Inner BuildRight (23)         :- Filter (16)         :  +- Scan json  (1)         +- BroadcastExchange (22)            +- Filter (21)               +- HashAggregate (20)                  +- Exchange (19)                     +- HashAggregate (18)                        +- Filter (17)                           +- Scan json  (3)(1) Scan json Output [6]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id), IsNotNull(amount)]ReadSchema: struct<amount:bigint,card_num:string,category:string,merchant:string,ts:bigint,user_id:string>(2) Filter [codegen id : 3]Input [6]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299]Condition : (isnotnull(user_id#1299) AND isnotnull(amount#1294L))(3) Scan json Output [2]: [amount#1322L, user_id#1327]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id)]ReadSchema: struct<amount:bigint,user_id:string>(4) Filter [codegen id : 1]Input [2]: [amount#1322L, user_id#1327]Condition : isnotnull(user_id#1327)(5) HashAggregate [codegen id : 1]Input [2]: [amount#1322L, user_id#1327]Keys [1]: [user_id#1327]Functions [1]: [partial_max(amount#1322L)]Aggregate Attributes [1]: [max#1368L]Results [2]: [user_id#1327, max#1369L](6) ExchangeInput [2]: [user_id#1327, max#1369L]Arguments: hashpartitioning(user_id#1327, 200), ENSURE_REQUIREMENTS, [id=#2250](7) ShuffleQueryStageOutput [2]: [user_id#1327, max#1369L]Arguments: 0(8) AQEShuffleReadInput [2]: [user_id#1327, max#1369L]Arguments: coalesced(9) HashAggregate [codegen id : 2]Input [2]: [user_id#1327, max#1369L]Keys [1]: [user_id#1327]Functions [1]: [max(amount#1322L)]Aggregate Attributes [1]: [max(amount#1322L)#1312L]Results [2]: [user_id#1327 AS m_user_id#1319, max(amount#1322L)#1312L AS max_amount#1316L](10) Filter [codegen id : 2]Input [2]: [m_user_id#1319, max_amount#1316L]Condition : isnotnull(max_amount#1316L)(11) BroadcastExchangeInput [2]: [m_user_id#1319, max_amount#1316L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#2303](12) BroadcastQueryStageOutput [2]: [m_user_id#1319, max_amount#1316L]Arguments: 1(13) BroadcastHashJoin [codegen id : 3]Left keys [2]: [user_id#1299, amount#1294L]Right keys [2]: [m_user_id#1319, max_amount#1316L]Join condition: None(14) Project [codegen id : 3]Output [8]: [cast(amount#1294L as string) AS amount#1352, card_num#1295, category#1296, merchant#1297, cast(ts#1298L as string) AS ts#1356, user_id#1299, m_user_id#1319, cast(max_amount#1316L as string) AS max_amount#1359]Input [8]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299, m_user_id#1319, max_amount#1316L](15) CollectLimitInput [8]: [amount#1352, card_num#1295, category#1296, merchant#1297, ts#1356, user_id#1299, m_user_id#1319, max_amount#1359]Arguments: 21(16) FilterInput [6]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299]Condition : (isnotnull(user_id#1299) AND isnotnull(amount#1294L))(17) FilterInput [2]: [amount#1322L, user_id#1327]Condition : isnotnull(user_id#1327)(18) HashAggregateInput [2]: [amount#1322L, user_id#1327]Keys [1]: [user_id#1327]Functions [1]: [partial_max(amount#1322L)]Aggregate Attributes [1]: [max#1368L]Results [2]: [user_id#1327, max#1369L](19) ExchangeInput [2]: [user_id#1327, max#1369L]Arguments: hashpartitioning(user_id#1327, 200), ENSURE_REQUIREMENTS, [id=#2226](20) HashAggregateInput [2]: [user_id#1327, max#1369L]Keys [1]: [user_id#1327]Functions [1]: [max(amount#1322L)]Aggregate Attributes [1]: [max(amount#1322L)#1312L]Results [2]: [user_id#1327 AS m_user_id#1319, max(amount#1322L)#1312L AS max_amount#1316L](21) FilterInput [2]: [m_user_id#1319, max_amount#1316L]Condition : isnotnull(max_amount#1316L)(22) BroadcastExchangeInput [2]: [m_user_id#1319, max_amount#1316L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#2230](23) BroadcastHashJoinLeft keys [2]: [user_id#1299, amount#1294L]Right keys [2]: [m_user_id#1319, max_amount#1316L]Join condition: None(24) ProjectOutput [8]: [cast(amount#1294L as string) AS amount#1352, card_num#1295, category#1296, merchant#1297, cast(ts#1298L as string) AS ts#1356, user_id#1299, m_user_id#1319, cast(max_amount#1316L as string) AS max_amount#1359]Input [8]: [amount#1294L, card_num#1295, category#1296, merchant#1297, ts#1298L, user_id#1299, m_user_id#1319, max_amount#1316L](25) CollectLimitInput [8]: [amount#1352, card_num#1295, category#1296, merchant#1297, ts#1356, user_id#1299, m_user_id#1319, max_amount#1359]Arguments: 21(26) AdaptiveSparkPlanOutput [8]: [amount#1352, card_num#1295, category#1296, merchant#1297, ts#1356, user_id#1299, m_user_id#1319, max_amount#1359]Arguments: isFinalPlan=true

Popular posts from this blog

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

What is Garbage collection in Spark and its impact and resolution

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)