Posts

Showing posts with the label Spark Performance Tuning Questions

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“spark.sql.autoBroadcastJoin

Credit Card Data Analysis using PySpark desabling auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should not use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“s

Credit Card Data Analysis using PySpark auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: AQE auto broadcast using join Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) creditCardData = spark.read.json(“card_transactions.json”) userIdToAmountMapping = {}   us

Loading and data from Apache Cassandra v/s HDFS Storage and why Cassandra?

Apache Cassandra is a NoSQL database with a masterless ring cluster structure. While HDFS is a good fit for streaming data access, it does not work well with random access. For example, HDFS will work well when your average file size is 100 MB and you want to read the whole file. If you frequently access the nth line in a file or some other part as a record, HDFS would be too slow. Relational databases have traditionally provided a solution to that, providing low latency, random access, but they do not work well with big data. NoSQL databases such as Cassandra fill the gap by providing relational database type access but in a distributed architecture on commodity servers. To data from Cassandra as a Spark RDD. To make that happen Datastax, the company behind Cassandra, has contributed spark-cassandra-connector. This connector lets you load Cassandra tables as Spark RDDs, write Spark RDDs back to Cassandra, and execute CQL queries. Command to use to load data from Cassandra Perform the

Twitter Data streaming by using pipeline in PySpark

Twitter data analysis using PySpark along with Pipeline We are  processing  Twitter data using PySpark and we have tried to use all possible  methods  to understand Twitter data is being parsed in 2 stages which is sequential because of which we are using pipelines for these 3 stages Using fit function on pipeline then  model  is being trained then computation are being done from pyspark import SparkContext from pyspark . sql . session import SparkSession from pyspark . streaming import StreamingContext import pyspark . sql . types as tp from pyspark . ml import Pipeline from pyspark . ml . feature import StringIndexer , OneHotEncoderEstimator , VectorAssembler from pyspark . ml . feature import StopWordsRemover , Word2Vec , RegexTokenizer from pyspark . ml . classification import LogisticRegression from pyspark . sql import Row , Column import sys # define the function to get the predicted sentiment on the data received def get_prediction ( tweet_text ): t

Broadcast Nested Loop in detail in spark

Image
Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2. As you could guess, Broadcast Nested Loop is not preferred and could be quite slow. It works for both equi and non-equi joins and it is picked by default when you have a non-equi join. Example We don’t change the default values for both  spark.sql.join.preferSortMergeJoin   and  spark.sql.autoBroadcastJoinThreshold  . scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res0: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res1: String = 10485760   scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30,

Everything about Cartesian Product in Spark

Image
Cartesian Product join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted. Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join. We will understand all the above points with examples in detail We are setting  spark.sql.autoBroadcastJoinThreshold   to -1 to disable broadcast. scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")res1: String = truescala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")res2: String = -1scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)scala> val df1 = data1.toDF("id1")df1: org.apache.spark.sql.DataFrame = [id1: int]scala> val data2 = Seq(30, 20, 40, 50)data2: Seq[Int] = List(30, 2

Country Risk Data Analysis by Spark with Dataset

Here we will analyse country risk data and we will do some manipulation snd to do manipulation we will apply fxrate data to this dataset and to achieve performance benefits we will use broadcast variable Sample Data: AU,,2017Q1,Account,100.1020,2000.1040 KR,,2017Q1,Account,100.1020,2000.1040 US,,2017Q1,Account,100.1020,2000.1040 AU,,2018Q1,Account,100.1020,2000.1040 US,,2018Q1,Account,100.1020,2000.1040 AU,,2019Q1,Account,100.1020,2000.1040 KR,,2019Q1,Account,100.1020,2000.1040 AU,,2016Q1,Account,100.1020,2000.1040 KR,,2016Q1,Account,100.1020,2000.1040 AU,,2017Q1,Segment,100.1020,2000.1040 AU,,2017Q1,Segment,100.1020,2000.1040 US,,2017Q1,Account,100.1020,2000.1040 package com . dpq . country . data . driver ; import java . io . Serializable ; import java . math . BigDecimal ; import java . util . HashMap ; import java . util . Iterator ; import java . util . LinkedList ; import java . util . List ; import java . util . Map ; import org . apache . spark . SparkConf ; import or