Posts

Showing posts with the label Spark

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Solution: from pyspark.sql.functions import col # this can be done without using window function creditCardData = spark.read.json(“card_transactions.json”) useridMaxSpendDF = df.groupby(‘user_id’).max(‘amount’) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“max(amount)”,”max_amount”) useridMaxSpendDF=useridMaxSpendDF.withColumnRenamed(“user_id”,”m_user_id”) cond = [creditCardData.user_id == useridMaxSpendDF.m_user_id, creditCardData.amount == useridMaxSpendDF.max_amount] joinedData = c

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Here we will disable auto broadcast and then join 2 data frames and will see execution plan which should use broadcast join Solution: from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) print(spark.conf.get(“spark.sql.autoBroadcastJoin

Credit Card Data Analysis using PySpark auto broadcast explaination

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: AQE auto broadcast using join Here even if we do not use broadcast join still broadcast join happens because of AdaptiveSparkPlan from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.master(‘local[2]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() #print(spark.version) print(spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)) creditCardData = spark.read.json(“card_transactions.json”) userIdToAmountMapping = {}   us

Credit Card Data Analysis using PySpark (Get the distinct list of categories in which the user has made expenditure) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the distinct list of categories in which the user has made expenditure Solution: from pyspark.sql import functions as F cardTnDF = spark.read.json(“card_transactions.json”) #spark.read.json(“card_transactions.json”).show() #cardTnDF.groupBy(‘user_id’)[‘category’].apply(list) cardTnDF.groupby(‘user_id’).agg(F.collect_list(‘category’)).collect() Output: Row(user_id='U_102', collect_list(category)=['children', 'groceries', 'entertainment', 'c

Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their cards on each category) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user for each of their cards on each category cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.groupBy(‘user_id’,’card_num’,’category’).sum(‘amount’).collect() Output: [Row(user_id='U_104', card_num='C_107', category='cosmetics', sum(amount)=11818), Row(user_id='U_104', card_num='C_108', category='cosmetics', sum(amount)=9522), Row(user_id='U_103', card_num='C_105', ca

Credit Card Data Analysis using PySpark (Get the total amount spent by each user for each of their card) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user for each of their card Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.groupBy(‘user_id’,’card_num’).sum(‘amount’).show() Output: [Row(user_id='U_101', card_num='C_102', sum(amount)=59203), Row(user_id='U_104', card_num='C_108', sum(amount)=54728), Row(user_id='U_101', card_num='C_101', sum(amount)=66581), Row(user_id='U_104', card_num='C_107', sum(amou

Credit Card Data Analysis using PySpark (Get the total amount spent by each user) with execution plan

Input details: #● File has json records #● Each record has fields: #○ user_id #○ card_num #○ merchant #○ category #○ amount #○ ts ### Below analysis to be done Sample data: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children| M_110|1584161986| U_103| Application: Get the total amount spent by each user Solution: cardTnDF = spark.read.json(“card_transactions.json”) cardTnDF.show(3) cardTnDF.groupBy(‘user_id’).sum(‘amount’).collect() Output: +------+--------+---------+--------+----------+-------+|amount|card_num| category|merchant| ts|user_id|+------+--------+---------+--------+----------+-------+| 243| C_108| food| M_102|1579532902| U_104|| 699| C_106|cosmetics| M_103|1581759040| U_103|| 228| C_104| children|

Analyse Weather input Dataset and apply Aggregation RDD functions

#Input weather dataset fileRdd = sc.sparkContext.textFile(‘weather.csv’) fileRdd.collect() Output: ['2016-05-09,234893,34', '2019-09-08,234896,3', '2019-11-19,234895,24', '2017-04-04,234900,43', '2013-12-04,234900,47', '2019-08-28,234894,5', '2013-11-29,234897,-5', '2018-08-19,234895,40', '2019-06-06,234890,9', '2017-02-09,234900,21', '2017-03-30,234893,36', '2019-01-01,234895,-9', '2010-12-23,234898,-1', '2015-09-03,234890,13', '2011-07-19,234898,25', '2014-09-27,234897,29', '2013-11-18,234891,-9', '2012-02-09,234893,10', '2014-07-03,234897,29', '2011-11-07,234895,38', '2014-02-14,234891,24', '2012-02-18,234893,5', '2010-01-31,234896,-8', '2015-08-19,234890,-7', '2017-03-26,234891,-1', '2011-04-23,234894,23', '2014-09-15,234898,-8', '2011-06-16,234890,33', '201

RDD joins using PySpark examples

Full Outer Join: Final result will return all records from both RDDs   rdd1 = sc.parallelize([(101,’John’),(102,’Matthew’),(103,’Aakash’),(104,’Sandeep’) ,(105,’Sakura’),(106,’Abed’),(107,’Mary’),(108,’Kamla’)],2) rdd2 = sc.parallelize([(101,’USA’),(108,’UK’),(105,’Japan’),(102,’Sri Lanka’) ,(103,’India’),(104,’UAE’),(107,’Germany’),(110,’Australia’)],2) #print(rdd1.glom().collect()) #print(rdd2.glom().collect()) fullOuterJoin = rdd1.fullOuterJoin(rdd2) print(fullOuterJoin.collect()) Output: [(104, ('Sandeep', 'UAE')), (108, ('Kamla', 'UK')), (101, ('John', 'USA')), (105, ('Sakura', 'Japan')), (102, ('Matthew', 'Sri Lanka')), (106, ('Abed', None)), (110, (None, 'Australia')), (103, ('Aakash', 'India')), (107, ('Mary', 'Germany'))]   Inner Join: Final result will return matching records from both RDDs rdd1 = sc.parallelize([(101,’John’),(102,’Matthew’),(103,’Aa

PySpark RDD advance examples

1. from pyspark.sql import SparkSession spark = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(spark.version) Output: 3.2.1 2. # Creating RDD lst = [(i) for i in range(1,10)] print(lst) listRdd = spark.sparkContext.parallelize(lst,3); print(listRdd.glom().collect()) print(listRdd.count()) #creating empty RDD emptyRDD = spark.sparkContext.emptyRDD() print(type(emptyRDD)) print(“Empty RDD: “, emptyRDD.collect()) #emptyRDD with partition emptyPartionedRDD = spark.sparkContext.parallelize([],2) print(type(emptyPartionedRDD)) print(“Empty emptyPartionedRDD: “, emptyPartionedRDD.glom().collect()) listDoubledRdd = listRdd.map(lambda x: x*2) print(“Output by map function:”,listDoubledRdd.collect()) Output: [1, 2, 3, 4, 5, 6, 7, 8, 9][[1, 2, 3], [4, 5, 6], [7, 8, 9]]9<class 'pyspark.rdd.RDD'>Empty RDD: []<class 'pyspark.rdd.RDD'>Empty emptyPartionedRDD: [[], []]Output by map function: [2, 4, 6, 8, 10, 12, 14, 16, 18]

Pyspark RDD examples

from pyspark.sql import SparkSession sc = SparkSession.builder.master(‘local[1]’)\ .appName(‘RDD_Methods_Examples’)\ .getOrCreate() print(sc.version) Output: 3.2.1 rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10]) rddNum = rddNum.map(lambda x : x+10) rddNum = rddNum.filter(lambda x : x % 2 == 0) print(rddNum.reduce(lambda a,b : a+b)) Output: 80 nameRdd = sc.parallelize([‘Deepak’,’Simmi’,’Simran’,’Sukhwinder’,’Sanki’,’ShotTemper’]) rddNum = nameRdd.filter(lambda name : name.startswith(‘S’)) print(rddNum.collect()) rddNum = nameRdd.filter(lambda name : not name.startswith(‘S’)) print(rddNum.collect()) ['Simmi', 'Simran', 'Sukhwinder', 'Sanki', 'ShotTemper']['Deepak'] #union example rddNum = sc.parallelize([1,2,3,4,5,6,7,8,9,10,30,21,45,23,22,77,44]) divisibleByTwo = rddNum.filter(lambda x : x%2 == 0) divisibleByThree = rddNum.filter(lambda x : x%3 == 0) print(divisibleByTwo.collect()) print(divisibleByThree.collect()) rddUnion = divisibleByTw

Spark Join Strategies (Internals of Spark Joins & Spark’s choice of Join Strategy)

While dealing with data, we have to deal with different kinds of joins, be it inner ,  outer ,  left  or (maybe) left-semi . This article covers the different join strategies employed by Spark to perform the  join operations. Knowing spark join internals comes in handy to optimize tricky join operations, in finding root cause of some out of memory errors, and for improved performance of spark jobs(we all want that, don’t we?). Please read on to find out. Broadcast Hash Join Before beginning the Broadcast Hash join spark, let’s first understand  Hash Join, in general : As the name suggests, Hash Join is performed by first creating a Hash Table based on join_key of smaller relation and then looping over larger relation to match the hashed join_key values. Also, this is only supported for ‘=’ join. In spark, Hash Join plays a role at per node level and the strategy is used to join partitions available on the node. Now, coming to Broadcast Hash Join. In broadcast hash join, copy of one of

Covid Data Analysis with Bed availability and other details

Covid Data Analysis: I have provided small sample dataset and run same progam with 10 GB data on cluster with 10 mappers and it took around 25 secs to process data We have added Partioner just to understand how partition is partiioning data and mapper is being assigned to process that particular partition Implemented cache for performance booster Country wise total cases Country wise new cases Country wise other details like available beds, booster details etc for more details please follow below git details: https://github.com/Deepak-Bhardwaj-Architect/CovidDataAnalysis Implementation: package com.citi.covid.spark.driver; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class CovidDataAnalysis { public static void main(String[] args) throws InterruptedException { JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName

Everything about YARN and its mode

Yet another resource negotiator (YARN) is Hadoop’s compute framework that runs on top of HDFS, which is Hadoop’s storage layer. YARN follows the master slave architecture. The master daemon is called ResourceManager and the slave daemon is called NodeManager. Besides this application, life cycle management is done by ApplicationMaster, which can be spawned on any slave node and is alive for the lifetime of an application. When Spark is run on YARN, ResourceManager performs the role of Spark master and NodeManagers work as executor nodes. While running Spark with YARN, each Spark executor is run as YARN container. Spark applications on YARN run in two modes:   yarn-client: Spark Driver runs in the client process outside of YARN cluster, and ApplicationMaster is only used to negotiate resources from ResourceManager yarn-cluster : Spark Driver runs in ApplicationMaster spawned by NodeManager on a slave node The yarn-cluster mode is recommended for production deployments, while the yarn-

How to load data from Relations Database?

A lot of important data lies in relational databases that Spark needs to query. JdbcRDD is a Spark feature that allows relational tables to be loaded as RDDs. This recipe will explain how to use JdbcRDD. Spark SQL to be another option includes a data source for JDBC. This should be preferred over the current recipe as results are returned as DataFrames (to be introduced in the next chapter), which can be easily processed by Spark SQL and also joined with other data sources. Please make sure that the JDBC driver JAR is visible on the client node and all slaves nodes on which executor will run. Perform the following steps to load data from relational databases: 1. Create a table named person in MySQL using the following DDL: CREATE TABLE 'person' ( 'person_id' int(11) NOT NULL AUTO_INCREMENT, 'first_name' varchar(30) DEFAULT NULL, 'last_name' varchar(30) DEFAULT NULL, 'gender' char(1) DEFAULT NULL, PRI

Loading and data from Apache Cassandra v/s HDFS Storage and why Cassandra?

Apache Cassandra is a NoSQL database with a masterless ring cluster structure. While HDFS is a good fit for streaming data access, it does not work well with random access. For example, HDFS will work well when your average file size is 100 MB and you want to read the whole file. If you frequently access the nth line in a file or some other part as a record, HDFS would be too slow. Relational databases have traditionally provided a solution to that, providing low latency, random access, but they do not work well with big data. NoSQL databases such as Cassandra fill the gap by providing relational database type access but in a distributed architecture on commodity servers. To data from Cassandra as a Spark RDD. To make that happen Datastax, the company behind Cassandra, has contributed spark-cassandra-connector. This connector lets you load Cassandra tables as Spark RDDs, write Spark RDDs back to Cassandra, and execute CQL queries. Command to use to load data from Cassandra Perform the

Twitter Data streaming by using pipeline in PySpark

Twitter data analysis using PySpark along with Pipeline We are  processing  Twitter data using PySpark and we have tried to use all possible  methods  to understand Twitter data is being parsed in 2 stages which is sequential because of which we are using pipelines for these 3 stages Using fit function on pipeline then  model  is being trained then computation are being done from pyspark import SparkContext from pyspark . sql . session import SparkSession from pyspark . streaming import StreamingContext import pyspark . sql . types as tp from pyspark . ml import Pipeline from pyspark . ml . feature import StringIndexer , OneHotEncoderEstimator , VectorAssembler from pyspark . ml . feature import StopWordsRemover , Word2Vec , RegexTokenizer from pyspark . ml . classification import LogisticRegression from pyspark . sql import Row , Column import sys # define the function to get the predicted sentiment on the data received def get_prediction ( tweet_text ): t