Credit Card Data Analysis using PySpark auto broadcast explaination

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: AQE auto broadcast using join

Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.master(‘local[2]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
#print(spark.version)

print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
creditCardData = spark.read.json(“card_transactions.json”)
userIdToAmountMapping = {}

 

useridMaxSpendDF = creditCardData.groupby(‘user_id’).max(‘amount’)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”)
cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount]
joinedData = creditCardData.join(useridMaxSpendDF,cond,”inner”)
joinedData.show()

Output:

+------+--------+-------------+--------+----------+-------+---------+----------+|amount|card_num|     category|merchant|        ts|user_id|m_user_id|max_amount|+------+--------+-------------+--------+----------+-------+---------+----------+|  1000|   C_101|entertainment|   M_100|1580163399|  U_101|    U_101|      1000||   997|   C_103|    groceries|   M_103|1582876481|  U_102|    U_102|       997||   977|   C_104|    groceries|   M_101|1579402924|  U_103|    U_103|       977||   977|   C_105|         food|   M_102|1581369586|  U_103|    U_103|       977||   977|   C_106|         food|   M_100|1580897199|  U_103|    U_103|       977||   996|   C_108|         food|   M_106|1581391534|  U_104|    U_104|       996||   996|   C_107|     children|   M_107|1580776821|  U_104|    U_104|       996|+------+--------+-------------+--------+----------+-------+---------+----------+
== Physical Plan ==AdaptiveSparkPlan (26)+- == Final Plan ==   CollectLimit (15)   +- * Project (14)      +- * BroadcastHashJoin Inner BuildRight (13)         :- * Filter (2)         :  +- Scan json  (1)         +- BroadcastQueryStage (12)            +- BroadcastExchange (11)               +- * Filter (10)                  +- * HashAggregate (9)                     +- AQEShuffleRead (8)                        +- ShuffleQueryStage (7)                           +- Exchange (6)                              +- * HashAggregate (5)                                 +- * Filter (4)                                    +- Scan json  (3)+- == Initial Plan ==   CollectLimit (25)   +- Project (24)      +- BroadcastHashJoin Inner BuildRight (23)         :- Filter (16)         :  +- Scan json  (1)         +- BroadcastExchange (22)            +- Filter (21)               +- HashAggregate (20)                  +- Exchange (19)                     +- HashAggregate (18)                        +- Filter (17)                           +- Scan json  (3)(1) Scan json Output [6]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id), IsNotNull(amount)]ReadSchema: struct<amount:bigint,card_num:string,category:string,merchant:string,ts:bigint,user_id:string>(2) Filter [codegen id : 3]Input [6]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12]Condition : (isnotnull(user_id#12) AND isnotnull(amount#7L))(3) Scan json Output [2]: [amount#35L, user_id#40]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id)]ReadSchema: struct<amount:bigint,user_id:string>(4) Filter [codegen id : 1]Input [2]: [amount#35L, user_id#40]Condition : isnotnull(user_id#40)(5) HashAggregate [codegen id : 1]Input [2]: [amount#35L, user_id#40]Keys [1]: [user_id#40]Functions [1]: [partial_max(amount#35L)]Aggregate Attributes [1]: [max#81L]Results [2]: [user_id#40, max#82L](6) ExchangeInput [2]: [user_id#40, max#82L]Arguments: hashpartitioning(user_id#40, 200), ENSURE_REQUIREMENTS, [id=#71](7) ShuffleQueryStageOutput [2]: [user_id#40, max#82L]Arguments: 0(8) AQEShuffleReadInput [2]: [user_id#40, max#82L]Arguments: coalesced(9) HashAggregate [codegen id : 2]Input [2]: [user_id#40, max#82L]Keys [1]: [user_id#40]Functions [1]: [max(amount#35L)]Aggregate Attributes [1]: [max(amount#35L)#25L]Results [2]: [user_id#40 AS m_user_id#32, max(amount#35L)#25L AS max_amount#29L](10) Filter [codegen id : 2]Input [2]: [m_user_id#32, max_amount#29L]Condition : isnotnull(max_amount#29L)(11) BroadcastExchangeInput [2]: [m_user_id#32, max_amount#29L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#124](12) BroadcastQueryStageOutput [2]: [m_user_id#32, max_amount#29L]Arguments: 1(13) BroadcastHashJoin [codegen id : 3]Left keys [2]: [user_id#12, amount#7L]Right keys [2]: [m_user_id#32, max_amount#29L]Join condition: None(14) Project [codegen id : 3]Output [8]: [cast(amount#7L as string) AS amount#65, card_num#8, category#9, merchant#10, cast(ts#11L as string) AS ts#69, user_id#12, m_user_id#32, cast(max_amount#29L as string) AS max_amount#72]Input [8]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12, m_user_id#32, max_amount#29L](15) CollectLimitInput [8]: [amount#65, card_num#8, category#9, merchant#10, ts#69, user_id#12, m_user_id#32, max_amount#72]Arguments: 21(16) FilterInput [6]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12]Condition : (isnotnull(user_id#12) AND isnotnull(amount#7L))(17) FilterInput [2]: [amount#35L, user_id#40]Condition : isnotnull(user_id#40)(18) HashAggregateInput [2]: [amount#35L, user_id#40]Keys [1]: [user_id#40]Functions [1]: [partial_max(amount#35L)]Aggregate Attributes [1]: [max#81L]Results [2]: [user_id#40, max#82L](19) ExchangeInput [2]: [user_id#40, max#82L]Arguments: hashpartitioning(user_id#40, 200), ENSURE_REQUIREMENTS, [id=#47](20) HashAggregateInput [2]: [user_id#40, max#82L]Keys [1]: [user_id#40]Functions [1]: [max(amount#35L)]Aggregate Attributes [1]: [max(amount#35L)#25L]Results [2]: [user_id#40 AS m_user_id#32, max(amount#35L)#25L AS max_amount#29L](21) FilterInput [2]: [m_user_id#32, max_amount#29L]Condition : isnotnull(max_amount#29L)(22) BroadcastExchangeInput [2]: [m_user_id#32, max_amount#29L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#51](23) BroadcastHashJoinLeft keys [2]: [user_id#12, amount#7L]Right keys [2]: [m_user_id#32, max_amount#29L]Join condition: None(24) ProjectOutput [8]: [cast(amount#7L as string) AS amount#65, card_num#8, category#9, merchant#10, cast(ts#11L as string) AS ts#69, user_id#12, m_user_id#32, cast(max_amount#29L as string) AS max_amount#72]Input [8]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12, m_user_id#32, max_amount#29L](25) CollectLimitInput [8]: [amount#65, card_num#8, category#9, merchant#10, ts#69, user_id#12, m_user_id#32, max_amount#72]Arguments: 21(26) AdaptiveSparkPlanOutput [8]: [amount#65, card_num#8, category#9, merchant#10, ts#69, user_id#12, m_user_id#32, max_amount#72]Arguments: isFinalPlan=true

Popular posts from this blog

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

What is Garbage collection in Spark and its impact and resolution

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)