Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their cards on each category) with execution plan

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: Get the total amount spent by each user for each of their cards on each category

cardTnDF = spark.read.json(“card_transactions.json”)
cardTnDF.groupBy(‘user_id’,’card_num’,’category’).sum(‘amount’).collect()

Output:

[Row(user_id='U_104', card_num='C_107', category='cosmetics', sum(amount)=11818), Row(user_id='U_104', card_num='C_108', category='cosmetics', sum(amount)=9522), Row(user_id='U_103', card_num='C_105', category='children', sum(amount)=5151), Row(user_id='U_103', card_num='C_104', category='groceries', sum(amount)=8536), Row(user_id='U_101', card_num='C_102', category='groceries', sum(amount)=8074), Row(user_id='U_103', card_num='C_104', category='food', sum(amount)=10510), Row(user_id='U_101', card_num='C_101', category='children', sum(amount)=11695), Row(user_id='U_103', card_num='C_105', category='entertainment', sum(amount)=12966), Row(user_id='U_101', card_num='C_101', category='entertainment', sum(amount)=14478), Row(user_id='U_103', card_num='C_106', category='groceries', sum(amount)=6389), Row(user_id='U_103', card_num='C_105', category='food', sum(amount)=6937), Row(user_id='U_104', card_num='C_107', category='groceries', sum(amount)=12774), Row(user_id='U_104', card_num='C_107', category='entertainment', sum(amount)=7106), Row(user_id='U_101', card_num='C_101', category='groceries', sum(amount)=13529), Row(user_id='U_101', card_num='C_102', category='children', sum(amount)=14437), Row(user_id='U_101', card_num='C_101', category='food', sum(amount)=11525), Row(user_id='U_103', card_num='C_106', category='food', sum(amount)=11578), Row(user_id='U_104', card_num='C_107', category='children', sum(amount)=10151), Row(user_id='U_102', card_num='C_103', category='cosmetics', sum(amount)=25226), Row(user_id='U_101', card_num='C_102', category='food', sum(amount)=11946), Row(user_id='U_101', card_num='C_102', category='entertainment', sum(amount)=15598), Row(user_id='U_102', card_num='C_103', category='children', sum(amount)=29825), Row(user_id='U_104', card_num='C_108', category='food', sum(amount)=9905), Row(user_id='U_104', card_num='C_108', category='entertainment', sum(amount)=11241), Row(user_id='U_104', card_num='C_107', category='food', sum(amount)=14362), Row(user_id='U_103', card_num='C_105', category='cosmetics', sum(amount)=9336), Row(user_id='U_103', card_num='C_106', category='cosmetics', sum(amount)=8833), Row(user_id='U_104', card_num='C_108', category='children', sum(amount)=12042), Row(user_id='U_102', card_num='C_103', category='groceries', sum(amount)=24745), Row(user_id='U_103', card_num='C_106', category='children', sum(amount)=10154), Row(user_id='U_103', card_num='C_104', category='cosmetics', sum(amount)=8316), Row(user_id='U_104', card_num='C_108', category='groceries', sum(amount)=12018), Row(user_id='U_101', card_num='C_101', category='cosmetics', sum(amount)=15354), Row(user_id='U_103', card_num='C_104', category='entertainment', sum(amount)=5794), Row(user_id='U_102', card_num='C_103', category='food', sum(amount)=14314), Row(user_id='U_103', card_num='C_104', category='children', sum(amount)=10053), Row(user_id='U_102', card_num='C_103', category='entertainment', sum(amount)=32365), Row(user_id='U_101', card_num='C_102', category='cosmetics', sum(amount)=9148), Row(user_id='U_103', card_num='C_105', category='groceries', sum(amount)=9976), Row(user_id='U_103', card_num='C_106', category='entertainment', sum(amount)=7010)]
== Physical Plan ==AdaptiveSparkPlan (10)+- == Final Plan ==   * HashAggregate (6)   +- AQEShuffleRead (5)      +- ShuffleQueryStage (4)         +- Exchange (3)            +- * HashAggregate (2)               +- Scan json  (1)+- == Initial Plan ==   HashAggregate (9)   +- Exchange (8)      +- HashAggregate (7)         +- Scan json  (1)(1) Scan json Output [4]: [amount#480L, card_num#481, category#482, user_id#485]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]ReadSchema: struct<amount:bigint,card_num:string,category:string,user_id:string>(2) HashAggregate [codegen id : 1]Input [4]: [amount#480L, card_num#481, category#482, user_id#485]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [partial_sum(amount#480L)]Aggregate Attributes [1]: [sum#504L]Results [4]: [user_id#485, card_num#481, category#482, sum#505L](3) ExchangeInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: hashpartitioning(user_id#485, card_num#481, category#482, 200), ENSURE_REQUIREMENTS, [id=#643](4) ShuffleQueryStageOutput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: 0(5) AQEShuffleReadInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: coalesced(6) HashAggregate [codegen id : 2]Input [4]: [user_id#485, card_num#481, category#482, sum#505L]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [sum(amount#480L)]Aggregate Attributes [1]: [sum(amount#480L)#498L]Results [4]: [user_id#485, card_num#481, category#482, sum(amount#480L)#498L AS sum(amount)#499L](7) HashAggregateInput [4]: [amount#480L, card_num#481, category#482, user_id#485]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [partial_sum(amount#480L)]Aggregate Attributes [1]: [sum#504L]Results [4]: [user_id#485, card_num#481, category#482, sum#505L](8) ExchangeInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Arguments: hashpartitioning(user_id#485, card_num#481, category#482, 200), ENSURE_REQUIREMENTS, [id=#633](9) HashAggregateInput [4]: [user_id#485, card_num#481, category#482, sum#505L]Keys [3]: [user_id#485, card_num#481, category#482]Functions [1]: [sum(amount#480L)]Aggregate Attributes [1]: [sum(amount#480L)#498L]Results [4]: [user_id#485, card_num#481, category#482, sum(amount#480L)#498L AS sum(amount)#499L](10) AdaptiveSparkPlanOutput [4]: [user_id#485, card_num#481, category#482, sum(amount)#499L]Arguments: isFinalPlan=true

 

Popular posts from this blog

In detail Accumulators explanation in Spark (Java and Scala)

Hive joins in details with examples

RDD joins using PySpark examples