Input details:
#● File has json records
#● Each record has fields:
#○ user_id
#○ card_num
#○ merchant
#○ category
#○ amount
#○ ts
### Below analysis to be done
Sample data:
+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103|
Application: Get the total amount spent by each user
Solution:
cardTnDF = spark.read.json(“card_transactions.json”)
cardTnDF.show(3)
cardTnDF.groupBy(‘user_id’).sum(‘amount’).collect()
Output:
+------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103|+------+--------+---------+--------+----------+-------+only showing top 3 rows
Out[3]:
[Row(user_id='U_102', sum(amount)=126475), Row(user_id='U_104', sum(amount)=110939), Row(user_id='U_101', sum(amount)=125784), Row(user_id='U_103', sum(amount)=131539)]
Execution Plan:
== Physical Plan ==AdaptiveSparkPlan (10)+- == Final Plan == * HashAggregate (6) +- AQEShuffleRead (5) +- ShuffleQueryStage (4) +- Exchange (3) +- * HashAggregate (2) +- Scan json (1)+- == Initial Plan == HashAggregate (9) +- Exchange (8) +- HashAggregate (7) +- Scan json (1)(1) Scan json Output [2]: [amount#277L, user_id#282]Batched: falseLocation: InMemoryFileIndex [file:/Users/dpq/Practice/card_transactions.json]ReadSchema: struct<amount:bigint,user_id:string>(2) HashAggregate [codegen id : 1]Input [2]: [amount#277L, user_id#282]Keys [1]: [user_id#282]Functions [1]: [partial_sum(amount#277L)]Aggregate Attributes [1]: [sum#330L]Results [2]: [user_id#282, sum#331L](3) ExchangeInput [2]: [user_id#282, sum#331L]Arguments: hashpartitioning(user_id#282, 200), ENSURE_REQUIREMENTS, [id=#469](4) ShuffleQueryStageOutput [2]: [user_id#282, sum#331L]Arguments: 0(5) AQEShuffleReadInput [2]: [user_id#282, sum#331L]Arguments: coalesced(6) HashAggregate [codegen id : 2]Input [2]: [user_id#282, sum#331L]Keys [1]: [user_id#282]Functions [1]: [sum(amount#277L)]Aggregate Attributes [1]: [sum(amount#277L)#326L]Results [2]: [user_id#282, sum(amount#277L)#326L AS sum(amount)#327L](7) HashAggregateInput [2]: [amount#277L, user_id#282]Keys [1]: [user_id#282]Functions [1]: [partial_sum(amount#277L)]Aggregate Attributes [1]: [sum#330L]Results [2]: [user_id#282, sum#331L](8) ExchangeInput [2]: [user_id#282, sum#331L]Arguments: hashpartitioning(user_id#282, 200), ENSURE_REQUIREMENTS, [id=#459](9) HashAggregateInput [2]: [user_id#282, sum#331L]Keys [1]: [user_id#282]Functions [1]: [sum(amount#277L)]Aggregate Attributes [1]: [sum(amount#277L)#326L]Results [2]: [user_id#282, sum(amount#277L)#326L AS sum(amount)#327L](10) AdaptiveSparkPlanOutput [2]: [user_id#282, sum(amount)#327L]Arguments: isFinalPlan=true