Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their cards on each category) with execution plan

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: Get the total amount spent by each user for each of their cards on each category

cardTnDF = spark.read.json(“card_transactions.json”)
cardTnDF.groupBy(‘user_id’,’card_num’,’category’).sum(‘amount’).collect()

Output:

[Row(user_id='U_104', card_num='C_107', category='cosmetics', sum(amount)=11818), Row(user_id='U_104', card_num='C_108', category='cosmetics', sum(amount)=9522), Row(user_id='U_103', card_num='C_105', category='children', sum(amount)=5151), Row(user_id='U_103', card_num='C_104', category='groceries', sum(amount)=8536), Row(user_id='U_101', card_num='C_102', category='groceries', sum(amount)=8074), Row(user_id='U_103', card_num='C_104', category='food', sum(amount)=10510), Row(user_id='U_101', card_num='C_101', category='children', sum(amount)=11695), Row(user_id='U_103', card_num='C_105', category='entertainment', sum(amount)=12966), Row(user_id='U_101', card_num='C_101', category='entertainment', sum(amount)=14478), Row(user_id='U_103', card_num='C_106', category='groceries', sum(amount)=6389), Row(user_id='U_103', card_num='C_105', category='food', sum(amount)=6937), Row(user_id='U_104', card_num='C_107', category='groceries', sum(amount)=12774), Row(user_id='U_104', card_num='C_107', category='entertainment', sum(amount)=7106), Row(user_id='U_101', card_num='C_101', category='groceries', sum(amount)=13529), Row(user_id='U_101', card_num='C_102', category='children', sum(amount)=14437), Row(user_id='U_101', card_num='C_101', category='food', sum(amount)=11525), Row(user_id='U_103', card_num='C_106', category='food', sum(amount)=11578), Row(user_id='U_104', card_num='C_107', category='children', sum(amount)=10151), Row(user_id='U_102', card_num='C_103', category='cosmetics', sum(amount)=25226), Row(user_id='U_101', card_num='C_102', category='food', sum(amount)=11946), Row(user_id='U_101', card_num='C_102', category='entertainment', sum(amount)=15598), Row(user_id='U_102', card_num='C_103', category='children', sum(amount)=29825), Row(user_id='U_104', card_num='C_108', category='food', sum(amount)=9905), Row(user_id='U_104', card_num='C_108', category='entertainment', sum(amount)=11241), Row(user_id='U_104', card_num='C_107', category='food', sum(amount)=14362), Row(user_id='U_103', card_num='C_105', category='cosmetics', sum(amount)=9336), Row(user_id='U_103', card_num='C_106', category='cosmetics', sum(amount)=8833), Row(user_id='U_104', card_num='C_108', category='children', sum(amount)=12042), Row(user_id='U_102', card_num='C_103', category='groceries', sum(amount)=24745), Row(user_id='U_103', card_num='C_106', category='children', sum(amount)=10154), Row(user_id='U_103', card_num='C_104', category='cosmetics', sum(amount)=8316), Row(user_id='U_104', card_num='C_108', category='groceries', sum(amount)=12018), Row(user_id='U_101', card_num='C_101', category='cosmetics', sum(amount)=15354), Row(user_id='U_103', card_num='C_104', category='entertainment', sum(amount)=5794), Row(user_id='U_102', card_num='C_103', category='food', sum(amount)=14314), Row(user_id='U_103', card_num='C_104', category='children', sum(amount)=10053), Row(user_id='U_102', card_num='C_103', category='entertainment', sum(amount)=32365), Row(user_id='U_101', card_num='C_102', category='cosmetics', sum(amount)=9148), Row(user_id='U_103', card_num='C_105', category='groceries', sum(amount)=9976), Row(user_id='U_103', card_num='C_106', category='entertainment', sum(amount)=7010)]
== Physical Plan ==AdaptiveSparkPlan (10)+- == Final Plan ==   * HashAggregate (6)   +- AQEShuffleRead (5)      +- ShuffleQueryStage (4)         +- Exchange (3)            +- * HashAggregate (2)               +- Scan json  (1)+- == Initial Plan ==   HashAggregate (9)   +- Exchange (8)      +- HashAggregate (7)         +- Scan json  (1)(1) Scan json Output [4]: [amount#480L, card_num#481, category#482, user_id#485]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]ReadSchema: struct<amount:bigint,card_num:string,category:string,user_id:string>(2) HashAggregate [codegen id : 1]Input [4]: [amount#480L, card_num#481, category#482, user_id#485]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [partial_sum(amount#480L)]Aggregate Attributes [1]: [sum#504L]Results [4]: [user_id#485, card_num#481, category#482, sum#505L](3) ExchangeInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: hashpartitioning(user_id#485, card_num#481, category#482, 200), ENSURE_REQUIREMENTS, [id=#643](4) ShuffleQueryStageOutput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: 0(5) AQEShuffleReadInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: coalesced(6) HashAggregate [codegen id : 2]Input [4]: [user_id#485, card_num#481, category#482, sum#505L]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [sum(amount#480L)]Aggregate Attributes [1]: [sum(amount#480L)#498L]Results [4]: [user_id#485, card_num#481, category#482, sum(amount#480L)#498L AS sum(amount)#499L](7) HashAggregateInput [4]: [amount#480L, card_num#481, category#482, user_id#485]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [partial_sum(amount#480L)]Aggregate Attributes [1]: [sum#504L]Results [4]: [user_id#485, card_num#481, category#482, sum#505L](8) ExchangeInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: hashpartitioning(user_id#485, card_num#481, category#482, 200), ENSURE_REQUIREMENTS, [id=#633](9) HashAggregateInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [sum(amount#480L)]Aggregate Attributes [1]: [sum(amount#480L)#498L]Results [4]: [user_id#485, card_num#481, category#482, sum(amount#480L)#498L AS sum(amount)#499L](10) AdaptiveSparkPlanOutput [4]: [user_id#485, card_num#481, category#482, sum(amount)#499L]Arguments: isFinalPlan=true

 

Popular posts from this blog

What is Garbage collection in Spark and its impact and resolution

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Window function in PySpark with Joins example using 2 Dataframes (inner join)