Analysing Stocks data and getting maximum selling price from stock dataset using MapReduce job
MaxClosingPricingMapReduceApp Here we are getting maximum selling price for each and every stock symbol for last 20 years
I have provided small sample dataset and run same progam with 10 GB data on cluster with 10 mappers and it took around 35 secs to process data
We have added Partioner just to understand how partition is partiioning data and mapper is being assigned to process that particular partition
We have used Map Reduce job and HDFS storage to get above stats later I am planning to compare it with Spark and we will See how Spark is speeding up process and how it will reduce I/O basically
Also sample data is present in inputFile directory in same project and to see what is insite in data
Sample Data:
ABCSE,B7J,2009-08-14,7.93,7.94,7.70,7.85,64600,7.68
ABCSE,B6J,2009-08-14,7.93,7.94,7.70,4.55,64600,6.68
ABCSE,B8J,2009-08-14,7.93,7.94,7.70,6.85,64600,4.68
ABCSE,B9J,2009-08-14,7.93,7.94,7.70,8.85,64600,73.68
ABCSE,A7J,2009-08-14,7.93,7.94,7.70,9.85,64600,7.68
ABCSE,S7J,2009-08-14,7.93,7.94,7.70,1.85,64600,2.68
ABCSE,D7J,2009-08-14,7.93,7.94,7.70,2.85,64600,7.68
Driver Program:
package com.citi.retail.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; /** import org.apache.hadoop.fs.Path; public class MaxClosePriceDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration c=new Configuration(); j.setJarByClass(MaxClosePriceDriver.class); System.exit(j.waitForCompletion(true)?0:1); |
Mapper:
package com.citi.retail.mapreduce.mapper; /** * MaxClosePriceMapper.java * * This is a Mapper program to calculate Max Close Price from stock dataset using MapReduce */ import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxClosePriceMapper extends Mapper<LongWritable, Text, Text, FloatWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split(“,”); Stringstock = items[1]; Float closePrice = Float.parseFloat(items[6]); context.write(new Text(stock), new FloatWritable(closePrice)); } } |
Partitioner:
package com.citi.retail.mapreduce.partitioner; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MaxClosePricePartitioner extends Partitioner<Text, FloatWritable>{ @Override public int getPartition(Text key, FloatWritable value, int numOfReduceTasks) { String partitionkey=key.toString(); System.out.println(“partitionkey:”+partitionkey); if(numOfReduceTasks==0) { return0; } if(partitionkey.startsWith(“B”)) { return0; }if(partitionkey.startsWith(“A”)) { return 1; }else { return 2; } } } |
Reducer:
package com.citi.retail.mapreduce.reducer; import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxClosePriceReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> { @Override public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException { float maxClosePrice = Float.MIN_VALUE;
//Iterate all temperatures for a year and calculate maximum for (FloatWritable value : values) { maxClosePrice= Math.max(maxClosePrice, value.get()); }
//Write output context.write(key, new FloatWritable(maxClosePrice)); } } |
While triggering Mapreduce job ‘MaxClosePriceDriver’ will be supplied as driver class and below command will be triggered
we will push input data to hdfs on location /hdp/retail/inputData/
hdfs dfs -ls /hdp/retail/inputData/
/hdp/retail/process/Stocks.csv
hadoop jar MaxClosingPriceByMapreduce-0.0.1-SNAPSHOT.jar com.citi.retail.mapreduce.MaxClosePriceDriver /hdp/retail/process/Stocks.csv /temp/stockOutput1
Also we have provided partioner so all Stocks whose name starts with ‘A’ will go to same partition and other Stocks will go to another different partitions.