Posts

Showing posts from October, 2021

Model Data Anylysis Using DataFrame and SparkSQL in java

Here we will analyze Model data using pure Spark SQL, Data Frame and will use mostly used methods with sample data package com.dpq.model.data.driver; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class ModelDataAnalysis { public static void main(String[] args) throws InterruptedException { JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Count”).setMaster(“local”)); SparkSession spark = SparkSession.builder().appName(“spark-bigquery-demo”).getOrCreate(); Dataset<Row> row = spark.read().csv(“/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv”); // way 1 to change column name row = row.withColumnRenamed(“_c0”, “CountryName”); row = row.withColumnRenamed(“_c1”, “ReportingPurpuse”); row = row.withColumnRenamed(“_c2”, “Quarter”); row = row.withColumnR

Broadcast Nested Loop in detail in spark

Image
Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2. As you could guess, Broadcast Nested Loop is not preferred and could be quite slow. It works for both equi and non-equi joins and it is picked by default when you have a non-equi join. Example We don’t change the default values for both  spark.sql.join.preferSortMergeJoin   and  spark.sql.autoBroadcastJoinThreshold  . scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res0: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res1: String = 10485760   scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30,

Everything about Cartesian Product in Spark

Image
Cartesian Product join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted. Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join. We will understand all the above points with examples in detail We are setting  spark.sql.autoBroadcastJoinThreshold   to -1 to disable broadcast. scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res1: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res2: String = -1scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30, 20, 40, 50)data2: Seq[Int] = List(30, 2

Country Risk Data Analysis by Spark with Dataset

Here we will analyse country risk data and we will do some manipulation snd to do manipulation we will apply fxrate data to this dataset and to achieve performance benefits we will use broadcast variable Sample Data: AU,,2017Q1,Account,100.1020,2000.1040 KR,,2017Q1,Account,100.1020,2000.1040 US,,2017Q1,Account,100.1020,2000.1040 AU,,2018Q1,Account,100.1020,2000.1040 US,,2018Q1,Account,100.1020,2000.1040 AU,,2019Q1,Account,100.1020,2000.1040 KR,,2019Q1,Account,100.1020,2000.1040 AU,,2016Q1,Account,100.1020,2000.1040 KR,,2016Q1,Account,100.1020,2000.1040 AU,,2017Q1,Segment,100.1020,2000.1040 AU,,2017Q1,Segment,100.1020,2000.1040 US,,2017Q1,Account,100.1020,2000.1040 package com . dpq . country . data . driver ; import java . io . Serializable ; import java . math . BigDecimal ; import java . util . HashMap ; import java . util . Iterator ; import java . util . LinkedList ; import java . util . List ; import java . util . Map ; import org . apache . spark . SparkConf ; import or

Stock Analysis with Stock dataset using Sprak

Below is sample dataset to do analysis of Stocks Input Data: here we have to find out maximum selling price of all Stocks ABCSE B6J 2009-08-14 7.93 7.94 7.70 4.55 64600 7.68 ABCSE B8J 2009-08-14 7.93 7.94 7.70 6.85 64600 7.68 ABCSE B9J 2009-08-14 7.93 7.94 7.70 8.85 64600 7.68 ABCSE A7J 2009-08-14 7.93 7.94 7.70 9.85 64600 7.68 ABCSE S7J 2009-08-14 7.93 7.94 7.70 1.85 64600 7.68 ABCSE D7J 2009-08-14 7.93 7.94 7.70 2.85 64600 7.68 ABCSE F7J 2009-08-14 7.93 7.94 7.70 3.85 64600 7.68 ABCSE G7J 2009-08-14 7.93 7.94 7.70 11.85 64600 7.68 ABCSE H7J 2009-08-14 7.93 7.94 7.70 12.85 64600 7.68 ABCSE B7J 2009-08-14 7.93 7.94 7.70 7.85 64600 7.68 ABCSE B6J 2009-08-14 7.93 7.94 7.70 4.55 64600 7.68 package com.dpq.stocks.driver;   import java.util.Iterator; import java.util.LinkedList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.