Word count program by MapReduce job

This is simple Map Reduce Job to process any text file and give us word with occurrences as an output

Program:

package com.dpq.retail;

mport java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration c=new Configuration();

String[] files=new GenericOptionsParser(c,args).getRemainingArgs();

Path input=new Path(files[0]);

Path output=new Path(files[1]);

Job job=new Job(c,“wordcount”);

job.setJarByClass(WordCountDriver.class);

job.setMapperClass(MapForWordCount.class);

job.setCombinerClass(ReduceForWordCount.class);

//job.setReducerClass(ReduceForWordCount.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, input);

FileOutputFormat.setOutputPath(job, output);

System.exit(job.waitForCompletion(true)?0:1);

}

public static class MapForWordCount extends Mapper<LongWritable, Text, Text, IntWritable> {

public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(“,”);

for (String word : words) {

Text outputKey = new Text(word.toUpperCase().trim());

IntWritable outputValue = new IntWritable(1);

con.write(outputKey, outputValue);

}

}

}

public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text word, Iterable<IntWritable> values, Context con)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

con.write(word, new IntWritable(sum));

}

}

}

While triggering Mapreduce job ‘WordCount’ will be supplied as driver class and below command will be triggered

we will push input data to hdfs on location /hdp/retail/inputData/

hdfs dfs -ls /hdp/retail/inputData/

/hdp/retail/process/news.txt

hadoop jar WordCountMRJob-0.0.1-SNAPSHOT.jar com.dpq.retail.WordCountDriver /hdp/retail/process/news.txt /temp/newsOutput1

 

Popular posts from this blog

What is Garbage collection in Spark and its impact and resolution

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

Window function in PySpark with Joins example using 2 Dataframes (inner join)