Posts

Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their card) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user for each of their card Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.groupBy(‘user_id’,’card_num’).sum(‘amount’).show() Output: [Row(user_id='U_101', card_num='C_102', sum(amount)=59203), Row(user_id='U_104', card_num='C_108', sum(amount)=54728), Row(user_id='U_101', card_num='C_101', sum(amount)=66581), Row(user_id='U_104', card_num='C_107', sum(amou

Credit Card Data Analysis using PySpark (Get the total amount spent by each user) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.show(3) cardTnDF.groupBy(‘user_id’).sum(‘amount’).collect() Output: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children|

Analyse Weather input Dataset and apply Aggregation RDD functions

#Input weather dataset fileRdd = sc.sparkContext.textFile(‘weather.csv’) fileRdd.collect() Output: ['2016-05-09,234893,34', '2019-09-08,234896,3', '2019-11-19,234895,24', '2017-04-04,234900,43', '2013-12-04,234900,47', '2019-08-28,234894,5', '2013-11-29,234897,-5', '2018-08-19,234895,40', '2019-06-06,234890,9', '2017-02-09,234900,21', '2017-03-30,234893,36', '2019-01-01,234895,-9', '2010-12-23,234898,-1', '2015-09-03,234890,13', '2011-07-19,234898,25', '2014-09-27,234897,29', '2013-11-18,234891,-9', '2012-02-09,234893,10', '2014-07-03,234897,29', '2011-11-07,234895,38', '2014-02-14,234891,24', '2012-02-18,234893,5', '2010-01-31,234896,-8', '2015-08-19,234890,-7', '2017-03-26,234891,-1', '2011-04-23,234894,23', '2014-09-15,234898,-8', '2011-06-16,234890,33', '201

RDD joins using PySpark examples

Full Outer Join: Final result will return all records from both RDDs   rdd1 = sc.parallelize([(101,’John’),(102,’Matthew’),(103,’Aakash’),(104,’Sandeep’) ,(105,’Sakura’),(106,’Abed’),(107,’Mary’),(108,’Kamla’)],2) rdd2 = sc.parallelize([(101,’USA’),(108,’UK’),(105,’Japan’),(102,’Sri Lanka’) ,(103,’India’),(104,’UAE’),(107,’Germany’),(110,’Australia’)],2) #print(rdd1.glom().collect()) #print(rdd2.glom().collect()) fullOuterJoin = rdd1.fullOuterJoin(rdd2) print(fullOuterJoin.collect()) Output: [(104, ('Sandeep', 'UAE')), (108, ('Kamla', 'UK')), (101, ('John', 'USA')), (105, ('Sakura', 'Japan')), (102, ('Matthew', 'Sri Lanka')), (106, ('Abed', None)), (110, (None, 'Australia')), (103, ('Aakash', 'India')), (107, ('Mary', 'Germany'))]   Inner Join: Final result will return matching records from both RDDs rdd1 = sc.parallelize([(101,’John’),(102,’Matthew’),(103,’Aa

PySpark RDD advance examples

1. from pyspark.sql import SparkSession spark = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.version) Output: 3.2.1 2. # Creating RDD lst = [(i) for i in range(1,10)] print(lst) listRdd = spark.sparkContext.parallelize(lst,3); print(listRdd.glom().collect()) print(listRdd.count()) #creating empty RDD emptyRDD = spark.sparkContext.emptyRDD() print(type(emptyRDD)) print(“Empty RDD: “, emptyRDD.collect()) #emptyRDD with partition emptyPartionedRDD = spark.sparkContext.parallelize([],2) print(type(emptyPartionedRDD)) print(“Empty emptyPartionedRDD: “, emptyPartionedRDD.glom().collect()) listDoubledRdd = listRdd.map(lambda x: x*2) print(“Output by map function:”,listDoubledRdd.collect()) Output: [1, 2, 3, 4, 5, 6, 7, 8, 9][[1, 2, 3], [4, 5, 6], [7, 8, 9]]9<class 'pyspark.rdd.RDD'>Empty RDD: []<class 'pyspark.rdd.RDD'>Empty emptyPartionedRDD: [[], []]Output by map function: [2, 4, 6, 8, 10, 12, 14, 16, 18]

All advanced sorting techniques using Java 8 and Streams

package com.dpq.interview.Q; import java.math.BigInteger; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.List; public class SortingDemo { public static void main(String[] args) throws ParseException { allSortingTechniquesAfterJava8(); } public static void allSortingTechniquesAfterJava8() throws ParseException { List<Employee> list = populateList(); System.out.println(“####################################### Natiral soring by Employee Id ###############################################”); list.stream().sorted().forEach(e -> System.out.println(e)); System.out.println(“####################################### Natiral soring by Employee Id but in desending order ###############################################”); list.stream().sorted(Collections.reverseOrder()).forEach(e -> System.out.println(e)); List<Employee> sortedList = list

Java 8 - complex programs asked in interviews

package com . dpq . movie . ratings . datalayer . dao . impl ; import java . util . Arrays ; import java . util . List ; import java . util . stream . Collectors ; public class RatingDaoImpl { publicstaticvoid main ( String [] args ) { List < String > names = Arrays . asList ( “Deepak” , “Deepak” , “Kuhu” , “Kuhu” , “Garv” ) ; System . out . println ( names . stream () . collect ( Collectors . toMap ( k -> k , v -> 1 , Integer :: sum ))) ; List < Student > student = Arrays . asList ( new RatingDaoImpl () . new Student ( “Math” , 98 ) ,   new RatingDaoImpl () . new Student ( “Science” , 98 ) , new RatingDaoImpl () . new Student ( “English” , 98 ) , new RatingDaoImpl () . new Student ( “Hindi” , 98 )) ; System . out . println ( student . stream () . map ( e -> e . getMarks ()) . collect ( Collectors . summingInt ( e -> e . intValue ()))) ; List < StudentDetails > studentDetails = Arrays . asList ( new RatingDaoImpl () . new Stud

Pyspark RDD examples

from pyspark.sql import SparkSession sc = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(sc.version) Output: 3.2.1 rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10]) rddNum = rddNum.map(lambda x : x+10) rddNum = rddNum.filter(lambda x : x % 2 == 0) print(rddNum.reduce(lambda a,b : a+b)) Output: 80 nameRdd = sc.parallelize([‘Deepak’,’Simmi’,’Simran’,’Sukhwinder’,’Sanki’,’ShotTemper’]) rddNum = nameRdd.filter(lambda name : name.startswith(‘S’)) print(rddNum.collect()) rddNum = nameRdd.filter(lambda name : not name.startswith(‘S’)) print(rddNum.collect()) ['Simmi', 'Simran', 'Sukhwinder', 'Sanki', 'ShotTemper']['Deepak'] #union example rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10,30,21,45,23,22,77,44]) divisibleByTwo = rddNum.filter(lambda x : x%2 == 0) divisibleByThree = rddNum.filter(lambda x : x%3 == 0) print(divisibleByTwo.collect()) print(divisibleByThree.collect()) rddUnion = divisibleByTw

Everything about Binary Tree and its all traversal techniques (recursive and itterative) with examples

package org.dpq.ds.tree;import java.util.Stack;public class Tree<T>{ public static void main(String[] args) { TreeNode<Integer> root = new TreeNode<Integer>(1); root.setLeft(new TreeNode<Integer>(2)); root.setRight(new TreeNode<Integer>(3)); root.getLeft().setLeft(new TreeNode<Integer>(4)); root.getLeft().setRight(new TreeNode<Integer>(5)); root.getRight().setLeft(new TreeNode<Integer>(6)); root.getRight().setRight(new TreeNode<Integer>(7)); Tree<Integer> tree = new Tree<Integer>(); //Tree// 1// / \// 2 3// /\ /\// 4 5 6 7 //expected result for inorder(LNR) 4 2 5 1 6 3 7 //expected result for preorder(NLR) 1 2 4 5 3 6 7 //expected result for preorder(NLR) 4 5 2 6 7 3 1 System.out.println("recursive inorder \n"); tree.inOrder(root); System.out.println("recursive pre

COMPLEX QUERIES: ALL POSSIBLE QUERIES

–find duplicate record ALL WAYS SELECT * FROM USERS; –USING ROWNUM SELECT * FROM( SELECT USER_ID,USER_NAME,EMAIL, ROW_NUMBER() OVER (PARTITION BY USER_NAME,EMAIL ORDER BY USER_ID) AS RN FROM USERS ) WHERE RN=2; –OUTPUT:IF WE HAVE TO FETCH ONLY DUPLICATE RECORDS ONCE ALL ROWS WONT BE RETURNS, BELOW IS THE SOLUTION –RETURN ALL DUPLICATE RECORDS SELECT * FROM USERS WHERE (USER_NAME,EMAIL) IN ( SELECT USER_NAME,EMAIL FROM( SELECT USER_ID,USER_NAME,EMAIL, ROW_NUMBER() OVER (PARTITION BY USER_NAME,EMAIL ORDER BY USER_ID) AS RN FROM USERS ) WHERE RN=2 ); –USING GROUP BY SELECT * FROM USERS WHERE (USER_NAME,EMAIL) IN ( SELECT USER_NAME,EMAIL FROM( SELECT USER_NAME,EMAIL, COUNT(1) CNT FROM USERS GROUP BY USER_NAME,EMAIL ) WHERE CNT>1 ); –second last record –USING WINDOW FUNCTION SELECT * FROM( SELECT EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, HIRE_DATE, JOB_ID, SALARY, MANAGER_ID, DEPARTMENT_ID, ROW_NUMBER() OVER ( ORDER BY EMPLOYEE_ID DESC) AS RN FROM EMPLOYEES ) WHERE RN=2; –