Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their card) with execution plan

Input details:

#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done

Sample data:

+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant|        ts|user_id|+------+--------+---------+--------+----------+-------+|   243|   C_108|     food|   M_102|1579532902|  U_104||   699|   C_106|cosmetics|   M_103|1581759040|  U_103||   228|   C_104| children|   M_110|1584161986|  U_103|

Application: Get the total amount spent by each user for each of their card

Solution:

cardTnDF = spark.read.json(“card_transactions.json”)
cardTnDF.groupBy(‘user_id’,’card_num’).sum(‘amount’).show()

Output:
[Row(user_id='U_101', card_num='C_102', sum(amount)=59203), Row(user_id='U_104', card_num='C_108', sum(amount)=54728), Row(user_id='U_101', card_num='C_101', sum(amount)=66581), Row(user_id='U_104', card_num='C_107', sum(amount)=56211), Row(user_id='U_102', card_num='C_103', sum(amount)=126475), Row(user_id='U_103', card_num='C_104', sum(amount)=43209), Row(user_id='U_103', card_num='C_105', sum(amount)=44366), Row(user_id='U_103', card_num='C_106', sum(amount)=43964)]
== Physical Plan ==AdaptiveSparkPlan (10)+- == Final Plan ==   * HashAggregate (6)   +- AQEShuffleRead (5)      +- ShuffleQueryStage (4)         +- Exchange (3)            +- * HashAggregate (2)               +- Scan json  (1)+- == Initial Plan ==   HashAggregate (9)   +- Exchange (8)      +- HashAggregate (7)         +- Scan json  (1)(1) Scan json Output [3]: [amount#443L, card_num#444, user_id#448]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]ReadSchema: struct<amount:bigint,card_num:string,user_id:string>(2) HashAggregate [codegen id : 1]Input [3]: [amount#443L, card_num#444, user_id#448]Keys [2]: [user_id#448, card_num#444]Functions [1]: [partial_sum(amount#443L)]Aggregate Attributes [1]: [sum#466L]Results [3]: [user_id#448, card_num#444, sum#467L](3) ExchangeInput [3]: [user_id#448, card_num#444, sum#467L]Arguments: hashpartitioning(user_id#448, card_num#444, 200), ENSURE_REQUIREMENTS, [id=#604](4) ShuffleQueryStageOutput [3]: [user_id#448, card_num#444, sum#467L]Arguments: 0(5) AQEShuffleReadInput [3]: [user_id#448, card_num#444, sum#467L]Arguments: coalesced(6) HashAggregate [codegen id : 2]Input [3]: [user_id#448, card_num#444, sum#467L]Keys [2]: [user_id#448, card_num#444]Functions [1]: [sum(amount#443L)]Aggregate Attributes [1]: [sum(amount#443L)#461L]Results [3]: [user_id#448, card_num#444, sum(amount#443L)#461L AS sum(amount)#462L](7) HashAggregateInput [3]: [amount#443L, card_num#444, user_id#448]Keys [2]: [user_id#448, card_num#444]Functions [1]: [partial_sum(amount#443L)]Aggregate Attributes [1]: [sum#466L]Results [3]: [user_id#448, card_num#444, sum#467L](8) ExchangeInput [3]: [user_id#448, card_num#444, sum#467L]Arguments: hashpartitioning(user_id#448, card_num#444, 200), ENSURE_REQUIREMENTS, [id=#594](9) HashAggregateInput [3]: [user_id#448, card_num#444, sum#467L]Keys [2]: [user_id#448, card_num#444]Functions [1]: [sum(amount#443L)]Aggregate Attributes [1]: [sum(amount#443L)#461L]Results [3]: [user_id#448, card_num#444, sum(amount#443L)#461L AS sum(amount)#462L](10) AdaptiveSparkPlanOutput [3]: [user_id#448, card_num#444, sum(amount)#462L]Arguments: isFinalPlan=true

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)