Posts

Showing posts with the label Interview Questions

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

How to join 2 data frames in PySpark(inner join example)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“spark.sql.autoBroadcastJoin

Credit Card Data Analysis using PySpark desabling auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should not use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“s

Credit Card Data Analysis using PySpark auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: AQE auto broadcast using join Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) creditCardData = spark.read.json(“card_transactions.json”) userIdToAmountMapping = {}   us

Credit Card Data Analysis using PySpark (Get the total amount spent by each user) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.show(3) cardTnDF.groupBy(‘user_id’).sum(‘amount’).collect() Output: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children|

Analyse Weather input Dataset and apply Aggregation RDD functions

#Input weather dataset fileRdd = sc.sparkContext.textFile(‘weather.csv’) fileRdd.collect() Output: ['2016-05-09,234893,34', '2019-09-08,234896,3', '2019-11-19,234895,24', '2017-04-04,234900,43', '2013-12-04,234900,47', '2019-08-28,234894,5', '2013-11-29,234897,-5', '2018-08-19,234895,40', '2019-06-06,234890,9', '2017-02-09,234900,21', '2017-03-30,234893,36', '2019-01-01,234895,-9', '2010-12-23,234898,-1', '2015-09-03,234890,13', '2011-07-19,234898,25', '2014-09-27,234897,29', '2013-11-18,234891,-9', '2012-02-09,234893,10', '2014-07-03,234897,29', '2011-11-07,234895,38', '2014-02-14,234891,24', '2012-02-18,234893,5', '2010-01-31,234896,-8', '2015-08-19,234890,-7', '2017-03-26,234891,-1', '2011-04-23,234894,23', '2014-09-15,234898,-8', '2011-06-16,234890,33', '201

PySpark RDD advance examples

1. from pyspark.sql import SparkSession spark = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.version) Output: 3.2.1 2. # Creating RDD lst = [(i) for i in range(1,10)] print(lst) listRdd = spark.sparkContext.parallelize(lst,3); print(listRdd.glom().collect()) print(listRdd.count()) #creating empty RDD emptyRDD = spark.sparkContext.emptyRDD() print(type(emptyRDD)) print(“Empty RDD: “, emptyRDD.collect()) #emptyRDD with partition emptyPartionedRDD = spark.sparkContext.parallelize([],2) print(type(emptyPartionedRDD)) print(“Empty emptyPartionedRDD: “, emptyPartionedRDD.glom().collect()) listDoubledRdd = listRdd.map(lambda x: x*2) print(“Output by map function:”,listDoubledRdd.collect()) Output: [1, 2, 3, 4, 5, 6, 7, 8, 9][[1, 2, 3], [4, 5, 6], [7, 8, 9]]9<class 'pyspark.rdd.RDD'>Empty RDD: []<class 'pyspark.rdd.RDD'>Empty emptyPartionedRDD: [[], []]Output by map function: [2, 4, 6, 8, 10, 12, 14, 16, 18]

All advanced sorting techniques using Java 8 and Streams

package com.dpq.interview.Q; import java.math.BigInteger; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.List; public class SortingDemo { public static void main(String[] args) throws ParseException { allSortingTechniquesAfterJava8(); } public static void allSortingTechniquesAfterJava8() throws ParseException { List<Employee> list = populateList(); System.out.println(“####################################### Natiral soring by Employee Id ###############################################”); list.stream().sorted().forEach(e -> System.out.println(e)); System.out.println(“####################################### Natiral soring by Employee Id but in desending order ###############################################”); list.stream().sorted(Collections.reverseOrder()).forEach(e -> System.out.println(e)); List<Employee> sortedList = list

Java 8 - complex programs asked in interviews

package com . dpq . movie . ratings . datalayer . dao . impl ; import java . util . Arrays ; import java . util . List ; import java . util . stream . Collectors ; public class RatingDaoImpl { publicstaticvoid main ( String [] args ) { List < String > names = Arrays . asList ( “Deepak” , “Deepak” , “Kuhu” , “Kuhu” , “Garv” ) ; System . out . println ( names . stream () . collect ( Collectors . toMap ( k -> k , v -> 1 , Integer :: sum ))) ; List < Student > student = Arrays . asList ( new RatingDaoImpl () . new Student ( “Math” , 98 ) ,   new RatingDaoImpl () . new Student ( “Science” , 98 ) , new RatingDaoImpl () . new Student ( “English” , 98 ) , new RatingDaoImpl () . new Student ( “Hindi” , 98 )) ; System . out . println ( student . stream () . map ( e -> e . getMarks ()) . collect ( Collectors . summingInt ( e -> e . intValue ()))) ; List < StudentDetails > studentDetails = Arrays . asList ( new RatingDaoImpl () . new Stud

Pyspark RDD examples

from pyspark.sql import SparkSession sc = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(sc.version) Output: 3.2.1 rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10]) rddNum = rddNum.map(lambda x : x+10) rddNum = rddNum.filter(lambda x : x % 2 == 0) print(rddNum.reduce(lambda a,b : a+b)) Output: 80 nameRdd = sc.parallelize([‘Deepak’,’Simmi’,’Simran’,’Sukhwinder’,’Sanki’,’ShotTemper’]) rddNum = nameRdd.filter(lambda name : name.startswith(‘S’)) print(rddNum.collect()) rddNum = nameRdd.filter(lambda name : not name.startswith(‘S’)) print(rddNum.collect()) ['Simmi', 'Simran', 'Sukhwinder', 'Sanki', 'ShotTemper']['Deepak'] #union example rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10,30,21,45,23,22,77,44]) divisibleByTwo = rddNum.filter(lambda x : x%2 == 0) divisibleByThree = rddNum.filter(lambda x : x%3 == 0) print(divisibleByTwo.collect()) print(divisibleByThree.collect()) rddUnion = divisibleByTw

Everything about Binary Tree and its all traversal techniques (recursive and itterative) with examples

package org.dpq.ds.tree;import java.util.Stack;public class Tree<T>{ public static void main(String[] args) { TreeNode<Integer> root = new TreeNode<Integer>(1); root.setLeft(new TreeNode<Integer>(2)); root.setRight(new TreeNode<Integer>(3)); root.getLeft().setLeft(new TreeNode<Integer>(4)); root.getLeft().setRight(new TreeNode<Integer>(5)); root.getRight().setLeft(new TreeNode<Integer>(6)); root.getRight().setRight(new TreeNode<Integer>(7)); Tree<Integer> tree = new Tree<Integer>(); //Tree// 1// / \// 2 3// /\ /\// 4 5 6 7 //expected result for inorder(LNR) 4 2 5 1 6 3 7 //expected result for preorder(NLR) 1 2 4 5 3 6 7 //expected result for preorder(NLR) 4 5 2 6 7 3 1 System.out.println("recursive inorder \n"); tree.inOrder(root); System.out.println("recursive pre

COMPLEX QUERIES: ALL POSSIBLE QUERIES

–find duplicate record ALL WAYS SELECT * FROM USERS; –USING ROWNUM SELECT * FROM( SELECT USER_ID,USER_NAME,EMAIL, ROW_NUMBER() OVER (PARTITION BY USER_NAME,EMAIL ORDER BY USER_ID) AS RN FROM USERS ) WHERE RN=2; –OUTPUT:IF WE HAVE TO FETCH ONLY DUPLICATE RECORDS ONCE ALL ROWS WONT BE RETURNS, BELOW IS THE SOLUTION –RETURN ALL DUPLICATE RECORDS SELECT * FROM USERS WHERE (USER_NAME,EMAIL) IN ( SELECT USER_NAME,EMAIL FROM( SELECT USER_ID,USER_NAME,EMAIL, ROW_NUMBER() OVER (PARTITION BY USER_NAME,EMAIL ORDER BY USER_ID) AS RN FROM USERS ) WHERE RN=2 ); –USING GROUP BY SELECT * FROM USERS WHERE (USER_NAME,EMAIL) IN ( SELECT USER_NAME,EMAIL FROM( SELECT USER_NAME,EMAIL, COUNT(1) CNT FROM USERS GROUP BY USER_NAME,EMAIL ) WHERE CNT>1 ); –second last record –USING WINDOW FUNCTION SELECT * FROM( SELECT EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, HIRE_DATE, JOB_ID, SALARY, MANAGER_ID, DEPARTMENT_ID, ROW_NUMBER() OVER ( ORDER BY EMPLOYEE_ID DESC) AS RN FROM EMPLOYEES ) WHERE RN=2; –

COMPLEX QUERY: Prefer the account id with the least value in case of same number of unique patients

Note : Prefer the account id with the least value in case of same number of unique patients Table Name : PATIENT_LOGS Approach : First convert the date to month format since we need the output specific to each month. Then group together all data based on each month and account id so you get the total no of patients belonging to each account per month basis. Then rank this data as per no of patients in descending order and account id in ascending order so in case there are same no of patients present under multiple account if then the ranking will prefer the account if with lower value. Finally, choose upto 2 records only per month to arrive at the final output. SOLUTION: SELECT ACCOUNT_ID, MONTH, PATINET_PER_MONTH FROM( SELECT ACCOUNT_ID,MONTH,PATINET_PER_MONTH, ROW_NUMBER() OVER (PARTITION BY MONTH ORDER BY ACCOUNT_ID DESC) AS RN FROM ( SELECT ACCOUNT_ID,MONTH, COUNT(2) PATINET_PER_MONTH FROM( select DISTINCT ACCOUNT_ID, PATIENT_ID,TO_CHAR(date1,’MONTH’) MONTH from patient_logs ) GROU