Model Data Anylysis Using DataFrame and SparkSQL in java

Here we will analyze Model data using pure Spark SQL, Data Frame and will use mostly used methods with sample data

package com.dpq.model.data.driver;

import java.util.Arrays;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class ModelDataAnalysis {

public static void main(String[] args) throws InterruptedException {

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Count”).setMaster(“local”));

SparkSession spark = SparkSession.builder().appName(“spark-bigquery-demo”).getOrCreate();

Dataset<Row> row = spark.read().csv(“/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv”);

// way 1 to change column name

row = row.withColumnRenamed(“_c0”, “CountryName”);

row = row.withColumnRenamed(“_c1”, “ReportingPurpuse”);

row = row.withColumnRenamed(“_c2”, “Quarter”);

row = row.withColumnRenamed(“_c3”, “Grain”);

row = row.withColumnRenamed(“_c4”, “PP0”);

row = row.withColumnRenamed(“_c5”, “PP1”);

row = row.persist();

row.show();

Arrays.stream(row.columns()).forEach(column -> System.out.println(column));

/**Seq<String> sourceColumns = JavaConverters.asScalaBuffer( Arrays.asList(row.columns()) );

Column [] arr = {new Column(“COUNTRY”),new Column(“REPORTING_PERIOD”),

new Column(“QUATER”), new Column(“GRAIN”), new Column(“PROJECTED_1ST_MONTH”),

new Column(“PROJECTED_2ND_MONTH”)};

Seq<Column> destinationColumns = JavaConverters.asScalaBuffer( Arrays.asList( arr) );

// way 2 to change ALL column name

row = row.withColumn(sourceColumns , destinationColumns); **/

row.dropDuplicates().show();

System.out.println(row.count());

System.out.println(row.dropDuplicates().count());

   

//removing duplicates

row = row.dropDuplicates();

//creating table Model_Output on top of file which loaded

row.createOrReplaceTempView(“Model_Output”);

// way 2 to change column nameS

Dataset<Row> sqlDF = spark.sql(“SELECT CountryName as COUNTRY, ReportingPurpuse AS RPT_PRD, “

+ “Quarter AS QUATER, Grain AS GRAIN, PP0 AS PRE_PROJECTED_1ST_MONTH,PP1 AS PRE_PROJECTED_2ND_MONTH FROM Model_Output”);

//1ST way to filter records

Dataset<Row> filteredRecords1=sqlDF.filter(sqlDF.col(“QUATER”).isInCollection(Arrays.asList(“2017Q1”, “2018Q1”, “2019Q1”)));

filteredRecords1.show();

//2nd way to filter records

sqlDF = spark.sql(“SELECT CountryName as COUNTRY, ReportingPurpuse AS RPT_PRD, “

+ “Quarter AS QUATER, Grain AS GRAIN, PP0 AS PRE_PROJECTED_1ST_MONTH,PP1 AS PRE_PROJECTED_2ND_MONTH “

+ “FROM Model_Output where Quarter not in (\”2016Q1\”)”);

sqlDF.show();

Dataset<Row> finalAggregatedResults = spark.sql(“SELECT CountryName as COUNTRY, ReportingPurpuse AS RPT_PRD, “

+ “Quarter AS QUATER, Grain AS GRAIN, sum(CAST(PP0 AS DOUBLE)) ,sum(CAST(PP1 AS DOUBLE)) 

+ “FROM Model_Output group by CountryName,ReportingPurpuse,Quarter,Grain”);

finalAggregatedResults.show();

sc.close();

}

Refer GIT URL for code: https://github.com/dpq1422/ModelDataAnalysis

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)