Analysing Stocks data and getting maximum selling price from stock dataset using MapReduce job

MaxClosingPricingMapReduceApp Here we are getting maximum selling price for each and every stock symbol for last 20 years

I have provided small sample dataset and run same progam with 10 GB data on cluster with 10 mappers and it took around 35 secs to process data

We have added Partioner just to understand how partition is partiioning data and mapper is being assigned to process that particular partition

We have used Map Reduce job and HDFS storage to get above stats later I am planning to compare it with Spark and we will See how Spark is speeding up process and how it will reduce I/O basically

Also sample data is present in inputFile directory in same project and to see what is insite in data

Sample Data:

ABCSE,B7J,2009-08-14,7.93,7.94,7.70,7.85,64600,7.68
ABCSE,B6J,2009-08-14,7.93,7.94,7.70,4.55,64600,6.68
ABCSE,B8J,2009-08-14,7.93,7.94,7.70,6.85,64600,4.68
ABCSE,B9J,2009-08-14,7.93,7.94,7.70,8.85,64600,73.68
ABCSE,A7J,2009-08-14,7.93,7.94,7.70,9.85,64600,7.68
ABCSE,S7J,2009-08-14,7.93,7.94,7.70,1.85,64600,2.68
ABCSE,D7J,2009-08-14,7.93,7.94,7.70,2.85,64600,7.68

Driver Program:

package com.citi.retail.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

/**
* MaxClosePrice.java
*
* This is a driver program to calculate Max Close Price from stock dataset using MapReduce
*/

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MaxClosePriceDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration c=new Configuration();
c.set(“mapred.map.tasks”, “6”);
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path input=new Path(files[0]);
Path output=new Path(files[1]);
Job j=new Job(c,”Stocks”);

j.setJarByClass(MaxClosePriceDriver.class);
j.setMapperClass(MaxClosePriceMapper.class);
j.setPartitionerClass(MaxClosePricePartitioner.class);
j.setReducerClass(MaxClosePriceReducer.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(FloatWritable.class);
j.setNumReduceTasks(1);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);

System.exit(j.waitForCompletion(true)?0:1);
}
}

Mapper:

package com.citi.retail.mapreduce.mapper;

/**

 * MaxClosePriceMapper.java

 * 

 * This is a Mapper program to calculate Max Close Price from stock dataset using MapReduce

 */

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MaxClosePriceMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] items = line.split(“,”);

Stringstock = items[1];

Float closePrice = Float.parseFloat(items[6]);

context.write(new Text(stock), new FloatWritable(closePrice));

}

}

Partitioner:

package com.citi.retail.mapreduce.partitioner;

import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class MaxClosePricePartitioner extends Partitioner<Text, FloatWritable>{

@Override

public int getPartition(Text key, FloatWritable value, int numOfReduceTasks) {

String partitionkey=key.toString();

System.out.println(“partitionkey:”+partitionkey);

if(numOfReduceTasks==0) {

return0;

}

if(partitionkey.startsWith(“B”)) {

return0;

}if(partitionkey.startsWith(“A”)) {

return 1;

}else {

return 2;

}

}

}

Reducer:

package com.citi.retail.mapreduce.reducer;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class MaxClosePriceReducer

extends Reducer<Text, FloatWritable, Text, FloatWritable> {

@Override

public void reduce(Text key, Iterable<FloatWritable> values, Context context)

throws IOException, InterruptedException {

float maxClosePrice = Float.MIN_VALUE;

 

//Iterate all temperatures for a year and calculate maximum

for (FloatWritable value : values) {

maxClosePrice= Math.max(maxClosePrice, value.get());

}

 

//Write output

context.write(key, new FloatWritable(maxClosePrice));

}

}

While triggering Mapreduce job ‘MaxClosePriceDriver’ will be supplied as driver class and below command will be triggered

we will push input data to hdfs on location /hdp/retail/inputData/

hdfs dfs -ls /hdp/retail/inputData/

/hdp/retail/process/Stocks.csv

hadoop jar MaxClosingPriceByMapreduce-0.0.1-SNAPSHOT.jar com.citi.retail.mapreduce.MaxClosePriceDriver /hdp/retail/process/Stocks.csv /temp/stockOutput1

Also we have provided partioner so all Stocks whose name  starts with ‘A’ will go to same partition and other Stocks will go to another different partitions.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)