Credit Card Data Analysis using PySpark auto broadcast explaination

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: AQE auto broadcast using join

Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.master(‘local[2]’)\
.appName(‘RDD_Methods_Examples’)\
.getOrCreate()
#print(spark.version)

print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”))
creditCardData = spark.read.json(“card_transactions.json”)
userIdToAmountMapping = {}

 

useridMaxSpendDF = creditCardData.groupby(‘user_id’).max(‘amount’)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”)
useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”)
cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount]
joinedData = creditCardData.join(useridMaxSpendDF,cond,”inner”)
joinedData.show()

Output:

+------+--------+-------------+--------+----------+-------+---------+----------+|amount|card_num|     category|merchant|        ts|user_id|m_user_id|max_amount|+------+--------+-------------+--------+----------+-------+---------+----------+|  1000|   C_101|entertainment|   M_100|1580163399|  U_101|    U_101|      1000||   997|   C_103|    groceries|   M_103|1582876481|  U_102|    U_102|       997||   977|   C_104|    groceries|   M_101|1579402924|  U_103|    U_103|       977||   977|   C_105|         food|   M_102|1581369586|  U_103|    U_103|       977||   977|   C_106|         food|   M_100|1580897199|  U_103|    U_103|       977||   996|   C_108|         food|   M_106|1581391534|  U_104|    U_104|       996||   996|   C_107|     children|   M_107|1580776821|  U_104|    U_104|       996|+------+--------+-------------+--------+----------+-------+---------+----------+
== Physical Plan ==AdaptiveSparkPlan (26)+- == Final Plan ==   CollectLimit (15)   +- * Project (14)      +- * BroadcastHashJoin Inner BuildRight (13)         :- * Filter (2)         :  +- Scan json  (1)         +- BroadcastQueryStage (12)            +- BroadcastExchange (11)               +- * Filter (10)                  +- * HashAggregate (9)                     +- AQEShuffleRead (8)                        +- ShuffleQueryStage (7)                           +- Exchange (6)                              +- * HashAggregate (5)                                 +- * Filter (4)                                    +- Scan json  (3)+- == Initial Plan ==   CollectLimit (25)   +- Project (24)      +- BroadcastHashJoin Inner BuildRight (23)         :- Filter (16)         :  +- Scan json  (1)         +- BroadcastExchange (22)            +- Filter (21)               +- HashAggregate (20)                  +- Exchange (19)                     +- HashAggregate (18)                        +- Filter (17)                           +- Scan json  (3)(1) Scan json Output [6]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id), IsNotNull(amount)]ReadSchema: struct<amount:bigint,card_num:string,category:string,merchant:string,ts:bigint,user_id:string>(2) Filter [codegen id : 3]Input [6]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12]Condition : (isnotnull(user_id#12) AND isnotnull(amount#7L))(3) Scan json Output [2]: [amount#35L, user_id#40]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]PushedFilters: [IsNotNull(user_id)]ReadSchema: struct<amount:bigint,user_id:string>(4) Filter [codegen id : 1]Input [2]: [amount#35L, user_id#40]Condition : isnotnull(user_id#40)(5) HashAggregate [codegen id : 1]Input [2]: [amount#35L, user_id#40]Keys [1]: [user_id#40]Functions [1]: [partial_max(amount#35L)]Aggregate Attributes [1]: [max#81L]Results [2]: [user_id#40, max#82L](6) ExchangeInput [2]: [user_id#40, max#82L]Arguments: hashpartitioning(user_id#40, 200), ENSURE_REQUIREMENTS, [id=#71](7) ShuffleQueryStageOutput [2]: [user_id#40, max#82L]Arguments: 0(8) AQEShuffleReadInput [2]: [user_id#40, max#82L]Arguments: coalesced(9) HashAggregate [codegen id : 2]Input [2]: [user_id#40, max#82L]Keys [1]: [user_id#40]Functions [1]: [max(amount#35L)]Aggregate Attributes [1]: [max(amount#35L)#25L]Results [2]: [user_id#40 AS m_user_id#32, max(amount#35L)#25L AS max_amount#29L](10) Filter [codegen id : 2]Input [2]: [m_user_id#32, max_amount#29L]Condition : isnotnull(max_amount#29L)(11) BroadcastExchangeInput [2]: [m_user_id#32, max_amount#29L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#124](12) BroadcastQueryStageOutput [2]: [m_user_id#32, max_amount#29L]Arguments: 1(13) BroadcastHashJoin [codegen id : 3]Left keys [2]: [user_id#12, amount#7L]Right keys [2]: [m_user_id#32, max_amount#29L]Join condition: None(14) Project [codegen id : 3]Output [8]: [cast(amount#7L as string) AS amount#65, card_num#8, category#9, merchant#10, cast(ts#11L as string) AS ts#69, user_id#12, m_user_id#32, cast(max_amount#29L as string) AS max_amount#72]Input [8]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12, m_user_id#32, max_amount#29L](15) CollectLimitInput [8]: [amount#65, card_num#8, category#9, merchant#10, ts#69, user_id#12, m_user_id#32, max_amount#72]Arguments: 21(16) FilterInput [6]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12]Condition : (isnotnull(user_id#12) AND isnotnull(amount#7L))(17) FilterInput [2]: [amount#35L, user_id#40]Condition : isnotnull(user_id#40)(18) HashAggregateInput [2]: [amount#35L, user_id#40]Keys [1]: [user_id#40]Functions [1]: [partial_max(amount#35L)]Aggregate Attributes [1]: [max#81L]Results [2]: [user_id#40, max#82L](19) ExchangeInput [2]: [user_id#40, max#82L]Arguments: hashpartitioning(user_id#40, 200), ENSURE_REQUIREMENTS, [id=#47](20) HashAggregateInput [2]: [user_id#40, max#82L]Keys [1]: [user_id#40]Functions [1]: [max(amount#35L)]Aggregate Attributes [1]: [max(amount#35L)#25L]Results [2]: [user_id#40 AS m_user_id#32, max(amount#35L)#25L AS max_amount#29L](21) FilterInput [2]: [m_user_id#32, max_amount#29L]Condition : isnotnull(max_amount#29L)(22) BroadcastExchangeInput [2]: [m_user_id#32, max_amount#29L]Arguments: HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, false]),false), [id=#51](23) BroadcastHashJoinLeft keys [2]: [user_id#12, amount#7L]Right keys [2]: [m_user_id#32, max_amount#29L]Join condition: None(24) ProjectOutput [8]: [cast(amount#7L as string) AS amount#65, card_num#8, category#9, merchant#10, cast(ts#11L as string) AS ts#69, user_id#12, m_user_id#32, cast(max_amount#29L as string) AS max_amount#72]Input [8]: [amount#7L, card_num#8, category#9, merchant#10, ts#11L, user_id#12, m_user_id#32, max_amount#29L](25) CollectLimitInput [8]: [amount#65, card_num#8, category#9, merchant#10, ts#69, user_id#12, m_user_id#32, max_amount#72]Arguments: 21(26) AdaptiveSparkPlanOutput [8]: [amount#65, card_num#8, category#9, merchant#10, ts#69, user_id#12, m_user_id#32, max_amount#72]Arguments: isFinalPlan=true

Popular posts from this blog

In detail Accumulators explanation in Spark (Java and Scala)

Hive joins in details with examples

RDD joins using PySpark examples