Country Risk Data Analysis by Spark with Dataset

Here we will analyse country risk data and we will do some manipulation snd to do manipulation we will apply fxrate data to this dataset and to achieve performance benefits we will use broadcast variable

Sample Data:

AU,,2017Q1,Account,100.1020,2000.1040
KR,,2017Q1,Account,100.1020,2000.1040
US,,2017Q1,Account,100.1020,2000.1040
AU,,2018Q1,Account,100.1020,2000.1040
US,,2018Q1,Account,100.1020,2000.1040
AU,,2019Q1,Account,100.1020,2000.1040
KR,,2019Q1,Account,100.1020,2000.1040
AU,,2016Q1,Account,100.1020,2000.1040
KR,,2016Q1,Account,100.1020,2000.1040
AU,,2017Q1,Segment,100.1020,2000.1040
AU,,2017Q1,Segment,100.1020,2000.1040
US,,2017Q1,Account,100.1020,2000.1040

package com.dpq.country.data.driver;

import java.io.Serializable;

import java.math.BigDecimal;

import java.util.HashMap;

import java.util.Iterator;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFlatMapFunction;

import org.apache.spark.broadcast.Broadcast;

import scala.Tuple2;

public class CountryDataAnalysis {

static Map<String,BigDecimal> referenceData =new HashMap<>();

static{

referenceData.put(“US”,new BigDecimal(“1”));

referenceData.put(“AU”,new BigDecimal(“1.25”));

referenceData.put(“KR”,new BigDecimal(“1.75”));

}

public static void main(String[] args) throws InterruptedException {

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Count”).setMaster(“local”));

       

  // JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“RDD_EXAMPLES”).setMaster(“local”)); 

    Broadcast<Map<String, BigDecimal>> broadcastedMap = sc.broadcast(referenceData);

    //Country,Reporting_purpuse,Quarter,Grain,pp0,pp1

    JavaRDD<String> fileRdd = sc.textFile(“/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv”,4);

JavaRDD<ModelOutput> modelOutputRdd = fileRdd.map(data -> new ModelOutput(data.split(“,”)[0] ,data.split(“,”)[1] ,data.split(“,”)[2] 

,data.split(“,”)[3] ,new BigDecimal(data.split(“,”)[4]),new BigDecimal(data.split(“,”)[5]) ) );

//System.out.println(“\n\n\n\n\n\n\n\n\nmodelOutputRdd:”+modelOutputRdd.take(1));

JavaRDD<ModelOutput> modelOutputRddWithFxRate = modelOutputRdd.map(data -> getModelOutputWithFxRate(data,broadcastedMap));

//System.out.println(“\n\n\n\n\n\n\n\n\nmodelOutputRdd:”+modelOutputRddWithFxRate.take(1));

JavaRDD<ModelOutput> modelOutputFilteredRDD = modelOutputRddWithFxRate.filter(data -> !data.getQuarter().contains(“2019”));

//System.out.println(“\n\n\n\n\n\n\n\n\nmodelOutputRdd:”+modelOutputFilteredRDD.count());

JavaPairRDD<ModelOutputKey, ModelOutput> modeloutputPairRDD = modelOutputFilteredRDD.flatMapToPair(new PairFlatMapFunction<ModelOutput, ModelOutputKey, ModelOutput>(){

List<Tuple2<ModelOutputKey, ModelOutput>> tuplePairs = new LinkedList<>();

@Override

public Iterator<Tuple2<ModelOutputKey, ModelOutput>> call(ModelOutput input) throws Exception {

tuplePairs.add(new Tuple2<>(new ModelOutputKey(input.getCountry() , input.getQuarter() , input.getGrain()) , input));

return tuplePairs.iterator();

}

});

//System.out.println(“\n\n\n\n\n\n\nmodeloutputPairRDD:   “+modeloutputPairRDD.collect());

JavaPairRDD<ModelOutputKey, ModelOutput> modeloutputreducebyRDD = modeloutputPairRDD.reduceByKey(new Function2<ModelOutput, ModelOutput, ModelOutput>() {

@Override

public ModelOutput call(ModelOutput arg0, ModelOutput arg1) throws Exception {

arg0.setPp0(arg0.getPp0().add(arg1.getPp0()));

arg0.setPp1(arg0.getPp1().add(arg1.getPp1()));

returnarg0;

}

});

System.out.println(“modeloutputreducebyRDD######################     :    +modeloutputreducebyRDD.collect());

sc.close();

}

public static ModelOutput getModelOutputWithFxRate(ModelOutput modelOutputRdd,Broadcast<Map<String, BigDecimal>>  referenceData) {

modelOutputRdd.setPp0(modelOutputRdd.getPp0().multiply(referenceData.getValue().get(modelOutputRdd.getCountry())));

modelOutputRdd.setPp1(modelOutputRdd.getPp1().multiply(referenceData.getValue().get(modelOutputRdd.getCountry())));

return modelOutputRdd;

}

}

class ModelOutputKey implements Serializable{

privateString country;

privateString quarter;

privateString grain;

public ModelOutputKey(String country, String quarter, String grain) {

super();

this.country = country;

this.quarter = quarter;

this.grain = grain;

}

public String getCountry() {

return country;

}

publicvoidsetCountry(String country) {

this.country = country;

}

public String getQuarter() {

return quarter;

}

publicvoidsetQuarter(String quarter) {

this.quarter = quarter;

}

public String getGrain() {

returngrain;

}

publicvoidsetGrain(String grain) {

this.grain = grain;

}

@Override

publicinthashCode() {

finalintprime = 31;

int result = 1;

result= prime * result + ((country == null) ? 0 : country.hashCode());

result= prime * result + ((grain == null) ? 0 : grain.hashCode());

result= prime * result + ((quarter == null) ? 0 : quarter.hashCode());

returnresult;

}

@Override

publicbooleanequals(Object obj) {

if(this == obj)

returntrue;

if(obj == null)

returnfalse;

if (getClass() != obj.getClass())

returnfalse;

ModelOutputKeyother = (ModelOutputKey) obj;

if (country == null) {

if (other.country != null)

returnfalse;

} else if (!country.equals(other.country))

returnfalse;

if(grain == null) {

if (other.grain != null)

returnfalse;

} else if (!grain.equals(other.grain))

returnfalse;

if (quarter == null) {

if (other.quarter != null)

returnfalse;

} else if (!quarter.equals(other.quarter))

returnfalse;

returntrue;

}

@Override

public String toString() {

return “ModelOutput [country=” + country + “, quarter=” + quarter + “, grain=” + grain + “]”;

}

}

class ModelOutput implements Serializable{

privateString country;

private String reportingPurpose;

privateString quarter;

privateString grain;

private BigDecimal pp0;

private BigDecimal pp1;

public ModelOutput(String country, String reportingPurpose, String quarter, String grain, BigDecimal pp0,

BigDecimalpp1) {

super();

this.country = country;

this.reportingPurpose = reportingPurpose;

this.quarter = quarter;

this.grain = grain;

this.pp0 = pp0;

this.pp1 = pp1;

}

public String getCountry() {

return country;

}

publicvoidsetCountry(String country) {

this.country = country;

}

public String getReportingPurpose() {

return reportingPurpose;

}

public void setReportingPurpose(String reportingPurpose) {

this.reportingPurpose = reportingPurpose;

}

public String getQuarter() {

return quarter;

}

publicvoidsetQuarter(String quarter) {

this.quarter = quarter;

}

public String getGrain() {

returngrain;

}

publicvoidsetGrain(String grain) {

this.grain = grain;

}

public BigDecimal getPp0() {

returnpp0;

}

publicvoidsetPp0(BigDecimal pp0) {

this.pp0 = pp0;

}

public BigDecimal getPp1() {

returnpp1;

}

publicvoidsetPp1(BigDecimal pp1) {

this.pp1 = pp1;

}

@Override

public String toString() {

return “ModelOutput [country=” + country + “, reportingPurpose=” + reportingPurpose + “, quarter=” + quarter

+ “, grain=” + grain + “, pp0=” + pp0 + “, pp1=” + pp1 + “]”;

}

}

Git Repo:

https://github.com/dpq1422/CountryRiskDataAnalsis

Output:

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
21/10/01 02:14:25 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
21/10/01 02:14:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/dpq/.m2/repository/org/apache/spark/spark-unsafe_2.12/3.1.2/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use –illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/01 02:14:25 INFO SparkContext: Running Spark version 3.1.2
21/10/01 02:14:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
21/10/01 02:14:26 INFO ResourceUtils: ==============================================================
21/10/01 02:14:26 INFO ResourceUtils: No custom resources configured for spark.driver.
21/10/01 02:14:26 INFO ResourceUtils: ==============================================================
21/10/01 02:14:26 INFO SparkContext: Submitted application: Spark Count
21/10/01 02:14:26 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/10/01 02:14:26 INFO ResourceProfile: Limiting resource is cpu
21/10/01 02:14:26 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/10/01 02:14:26 INFO SecurityManager: Changing view acls to: dpq
21/10/01 02:14:26 INFO SecurityManager: Changing modify acls to: dpq
21/10/01 02:14:26 INFO SecurityManager: Changing view acls groups to:
21/10/01 02:14:26 INFO SecurityManager: Changing modify acls groups to:
21/10/01 02:14:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dpq); groups with view permissions: Set(); users with modify permissions: Set(dpq); groups with modify permissions: Set()
21/10/01 02:14:26 INFO Utils: Successfully started service ‘sparkDriver’ on port 50048.
21/10/01 02:14:26 INFO SparkEnv: Registering MapOutputTracker
21/10/01 02:14:26 INFO SparkEnv: Registering BlockManagerMaster
21/10/01 02:14:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/10/01 02:14:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/10/01 02:14:26 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/10/01 02:14:26 INFO DiskBlockManager: Created local directory at /private/var/folders/r_/b4vyn5dj269_2h9rqh1gcdrr0000gn/T/blockmgr-cbe72eb2-b741-41ea-9433-65194f2a4058
21/10/01 02:14:26 INFO MemoryStore: MemoryStore started with capacity 1048.8 MiB
21/10/01 02:14:26 INFO SparkEnv: Registering OutputCommitCoordinator
21/10/01 02:14:26 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.
21/10/01 02:14:26 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.4:4040
21/10/01 02:14:26 INFO Executor: Starting executor ID driver on host 192.168.1.4
21/10/01 02:14:26 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 50049.
21/10/01 02:14:26 INFO NettyBlockTransferService: Server created on 192.168.1.4:50049
21/10/01 02:14:26 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/10/01 02:14:26 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.4, 50049, None)
21/10/01 02:14:26 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.4:50049 with 1048.8 MiB RAM, BlockManagerId(driver, 192.168.1.4, 50049, None)
21/10/01 02:14:26 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.4, 50049, None)
21/10/01 02:14:26 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.4, 50049, None)
21/10/01 02:14:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 512.0 B, free 1048.8 MiB)
21/10/01 02:14:27 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 446.0 B, free 1048.8 MiB)
21/10/01 02:14:27 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.4:50049 (size: 446.0 B, free: 1048.8 MiB)
21/10/01 02:14:27 INFO SparkContext: Created broadcast 0 from broadcast at CountryDataAnalysis.java:34
21/10/01 02:14:27 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 175.9 KiB, free 1048.6 MiB)
21/10/01 02:14:27 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 27.1 KiB, free 1048.6 MiB)
21/10/01 02:14:27 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.4:50049 (size: 27.1 KiB, free: 1048.8 MiB)
21/10/01 02:14:27 INFO SparkContext: Created broadcast 1 from textFile at CountryDataAnalysis.java:36
21/10/01 02:14:27 INFO FileInputFormat: Total input files to process : 1
21/10/01 02:14:27 INFO SparkContext: Starting job: collect at CountryDataAnalysis.java:68
21/10/01 02:14:27 INFO DAGScheduler: Registering RDD 5 (flatMapToPair at CountryDataAnalysis.java:47) as input to shuffle 0
21/10/01 02:14:27 INFO DAGScheduler: Got job 0 (collect at CountryDataAnalysis.java:68) with 4 output partitions
21/10/01 02:14:27 INFO DAGScheduler: Final stage: ResultStage 1 (collect at CountryDataAnalysis.java:68)
21/10/01 02:14:27 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
21/10/01 02:14:27 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
21/10/01 02:14:27 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at flatMapToPair at CountryDataAnalysis.java:47), which has no missing parents
21/10/01 02:14:28 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 8.6 KiB, free 1048.6 MiB)
21/10/01 02:14:28 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.4 KiB, free 1048.6 MiB)
21/10/01 02:14:28 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.4:50049 (size: 4.4 KiB, free: 1048.8 MiB)
21/10/01 02:14:28 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1388
21/10/01 02:14:28 INFO DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at flatMapToPair at CountryDataAnalysis.java:47) (first 15 tasks are for partitions Vector(0, 1, 2, 3))
21/10/01 02:14:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks resource profile 0
21/10/01 02:14:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.1.4, executor driver, partition 0, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/10/01 02:14:28 INFO HadoopRDD: Input split: file:/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv:0+114
21/10/01 02:14:28 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1292 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.1.4, executor driver, partition 1, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/10/01 02:14:28 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 182 ms on 192.168.1.4 (executor driver) (1/4)
21/10/01 02:14:28 INFO HadoopRDD: Input split: file:/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv:114+114
21/10/01 02:14:28 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1249 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.1.4, executor driver, partition 2, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/10/01 02:14:28 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 27 ms on 192.168.1.4 (executor driver) (2/4)
21/10/01 02:14:28 INFO HadoopRDD: Input split: file:/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv:228+114
21/10/01 02:14:28 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1249 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.1.4, executor driver, partition 3, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/10/01 02:14:28 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 19 ms on 192.168.1.4 (executor driver) (3/4)
21/10/01 02:14:28 INFO HadoopRDD: Input split: file:/Users/dpq/springbootWrokspace/CountryDataAnalysis/resources/modeloutput.csv:342+114
21/10/01 02:14:28 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 1249 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 21 ms on 192.168.1.4 (executor driver) (4/4)
21/10/01 02:14:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
21/10/01 02:14:28 INFO DAGScheduler: ShuffleMapStage 0 (flatMapToPair at CountryDataAnalysis.java:47) finished in 0.321 s
21/10/01 02:14:28 INFO DAGScheduler: looking for newly runnable stages
21/10/01 02:14:28 INFO DAGScheduler: running: Set()
21/10/01 02:14:28 INFO DAGScheduler: waiting: Set(ResultStage 1)
21/10/01 02:14:28 INFO DAGScheduler: failed: Set()
21/10/01 02:14:28 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[6] at reduceByKey at CountryDataAnalysis.java:58), which has no missing parents
21/10/01 02:14:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 4.1 KiB, free 1048.6 MiB)
21/10/01 02:14:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.3 KiB, free 1048.6 MiB)
21/10/01 02:14:28 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.1.4:50049 (size: 2.3 KiB, free: 1048.8 MiB)
21/10/01 02:14:28 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1388
21/10/01 02:14:28 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (ShuffledRDD[6] at reduceByKey at CountryDataAnalysis.java:58) (first 15 tasks are for partitions Vector(0, 1, 2, 3))
21/10/01 02:14:28 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks resource profile 0
21/10/01 02:14:28 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4) (192.168.1.4, executor driver, partition 0, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Getting 3 (1897.0 B) non-empty blocks including 3 (1897.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 9 ms
21/10/01 02:14:28 INFO Executor: Finished task 0.0 in stage 1.0 (TID 4). 2329 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5) (192.168.1.4, executor driver, partition 1, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
21/10/01 02:14:28 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 64 ms on 192.168.1.4 (executor driver) (1/4)
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Getting 3 (1779.0 B) non-empty blocks including 3 (1779.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/10/01 02:14:28 INFO Executor: Finished task 1.0 in stage 1.0 (TID 5). 2329 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6) (192.168.1.4, executor driver, partition 2, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
21/10/01 02:14:28 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 5) in 12 ms on 192.168.1.4 (executor driver) (2/4)
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Getting 1 (593.0 B) non-empty blocks including 1 (593.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
21/10/01 02:14:28 INFO Executor: Finished task 2.0 in stage 1.0 (TID 6). 2042 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7) (192.168.1.4, executor driver, partition 3, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
21/10/01 02:14:28 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
21/10/01 02:14:28 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 6) in 21 ms on 192.168.1.4 (executor driver) (3/4)
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Getting 1 (593.0 B) non-empty blocks including 1 (593.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/10/01 02:14:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/10/01 02:14:28 INFO Executor: Finished task 3.0 in stage 1.0 (TID 7). 1998 bytes result sent to driver
21/10/01 02:14:28 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 7) in 14 ms on 192.168.1.4 (executor driver) (4/4)
21/10/01 02:14:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
21/10/01 02:14:28 INFO DAGScheduler: ResultStage 1 (collect at CountryDataAnalysis.java:68) finished in 0.119 s
21/10/01 02:14:28 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/10/01 02:14:28 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
21/10/01 02:14:28 INFO DAGScheduler: Job 0 finished: collect at CountryDataAnalysis.java:68, took 0.519144 s
21/10/01 02:14:28 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.4:50049 in memory (size: 4.4 KiB, free: 1048.8 MiB)
modeloutputreducebyRDD###################### : [(ModelOutput [country=KR, quarter=2016Q1, grain=Account],ModelOutput [country=KR, reportingPurpose=, quarter=2016Q1, grain=Account, pp0=350.357000, pp1=7000.364000]), (ModelOutput [country=AU, quarter=2017Q1, grain=Segment],ModelOutput [country=AU, reportingPurpose=, quarter=2017Q1, grain=Segment, pp0=375.382500, pp1=7500.390000]), (ModelOutput [country=US, quarter=2017Q1, grain=Account],ModelOutput [country=US, reportingPurpose=, quarter=2017Q1, grain=Account, pp0=300.3060, pp1=6000.3120]), (ModelOutput [country=US, quarter=2018Q1, grain=Account],ModelOutput [country=US, reportingPurpose=, quarter=2018Q1, grain=Account, pp0=100.1020, pp1=2000.1040]), (ModelOutput [country=KR, quarter=2017Q1, grain=Account],ModelOutput [country=KR, reportingPurpose=, quarter=2017Q1, grain=Account, pp0=700.714000, pp1=14000.728000]), (ModelOutput [country=AU, quarter=2016Q1, grain=Account],ModelOutput [country=AU, reportingPurpose=, quarter=2016Q1, grain=Account, pp0=500.510000, pp1=10000.520000]), (ModelOutput [country=AU, quarter=2017Q1, grain=Account],ModelOutput [country=AU, reportingPurpose=, quarter=2017Q1, grain=Account, pp0=1001.020000, pp1=20001.040000]), (ModelOutput [country=AU, quarter=2018Q1, grain=Account],ModelOutput [country=AU, reportingPurpose=, quarter=2018Q1, grain=Account, pp0=125.127500, pp1=2500.130000])]
21/10/01 02:14:28 INFO SparkUI: Stopped Spark web UI at http://192.168.1.4:4040
21/10/01 02:14:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/10/01 02:14:28 INFO MemoryStore: MemoryStore cleared
21/10/01 02:14:28 INFO BlockManager: BlockManager stopped
21/10/01 02:14:28 INFO BlockManagerMaster: BlockManagerMaster stopped
21/10/01 02:14:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/10/01 02:14:28 INFO SparkContext: Successfully stopped SparkContext
21/10/01 02:14:28 INFO ShutdownHookManager: Shutdown hook called
21/10/01 02:14:28 INFO ShutdownHookManager: Deleting directory /private/var/folders/r_/b4vyn5dj269_2h9rqh1gcdrr0000gn/T/spark-0e61e4ac-2fbb-4e0e-af02-50ad22cee32b

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)