Covid Data Analysis with Bed availability and other details

Covid Data Analysis:

I have provided small sample dataset and run same progam with 10 GB data on cluster with 10 mappers and it took around 25 secs to process data

We have added Partioner just to understand how partition is partiioning data and mapper is being assigned to process that particular partition

Implemented cache for performance booster

  • Country wise total cases
  • Country wise new cases
  • Country wise other details like available beds, booster details etc

for more details please follow below git details:

https://github.com/Deepak-Bhardwaj-Architect/CovidDataAnalysis

Implementation:

package com.citi.covid.spark.driver;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class CovidDataAnalysis {
public static void main(String[] args) throws InterruptedException {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Covid Analysis”).setMaster(“local”));
SparkSession spark = SparkSession.builder().appName(“spark-covid-analysis”).getOrCreate();
Dataset<Row> covidSql = spark.read().option(“header”, true).csv(“/Users/dpq/git/CovidDataAnalysis/inputData/covid.csv”);
covidSql = covidSql.cache();
covidSql.show(5);

System.out.println(“COUNT::::::: “+covidSql.count() );
System.out.println(“Execution plan details before execution::::::: “+covidSql.queryExecution());
System.out.println(“Countrywise data”);
covidSql.groupBy(“iso_code”).count().orderBy(“count”).show();

covidSql.registerTempTable(“COVID_DATA”);

covidSql.sqlContext().sql(“Select distinct iso_code from COVID_DATA”).show();

covidSql.sqlContext().sql(“Select iso_code, count(*) from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(total_cases) as total_cases from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select * from COVID_DATA where iso_code=’NIU'”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(new_cases) as total_new_cases from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(total_deaths) as total_deaths from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(new_deaths) as new_deaths from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(total_cases_per_million) as total_cases_per_million from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(new_cases_per_million) as new_cases_per_million from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(total_deaths_per_million) as total_deaths_per_million from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(new_deaths_per_million) as new_deaths_per_million from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(total_tests) as total_tests from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code, sum(total_tests_per_thousand) as total_tests_per_thousand from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code as Country_Code, sum(total_tests_per_thousand) as total_tests_per_thousand from COVID_DATA group by iso_code”).show();

covidSql.sqlContext().sql(“Select iso_code as Country_Code,”
+ ” sum(total_cases) as total_cases,”
+ ” sum(new_cases) as new_cases,”
+ ” sum(total_deaths) as total_deaths,”
+ ” sum(new_deaths) as new_deaths,”
+ ” sum(new_cases_per_million) as new_cases_per_million,”
+ ” sum(total_deaths_per_million) as total_deaths_per_million,”
+ ” sum(new_cases_smoothed_per_million) as new_cases_smoothed_per_million,”
+ ” sum(icu_patients) as icu_patients,”
+ ” sum(total_tests) as total_tests,”
+ ” sum(new_tests) as new_tests,”
+ ” sum(people_fully_vaccinated_per_hundred) as people_fully_vaccinated_per_hundred,”
+ ” sum(weekly_hosp_admissions) as weekly_hosp_admissions,”
+ ” sum(people_vaccinated) as people_vaccinated,”
+ ” sum(total_boosters) as total_boosters”
+ ” from COVID_DATA group by iso_code”).show();

sc.close();
}
}

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)